Repository: beam Updated Branches: refs/heads/DSL_SQL dcd769c8a -> 738eb4dd0
Update filter/project/aggregation tests to use BeamSql Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86dea078 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86dea078 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86dea078 Branch: refs/heads/DSL_SQL Commit: 86dea078eeb29ba92085dc6cd299aca00a23e7e9 Parents: dcd769c Author: mingmxu <[email protected]> Authored: Thu Jun 15 18:10:06 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Fri Jun 16 18:25:23 2017 -0700 ---------------------------------------------------------------------- .../dsls/sql/BeamSqlDslAggregationTest.java | 260 +++++++++++++++++++ .../apache/beam/dsls/sql/BeamSqlDslBase.java | 125 +++++++++ .../beam/dsls/sql/BeamSqlDslFilterTest.java | 78 ++++++ .../beam/dsls/sql/BeamSqlDslProjectTest.java | 163 ++++++++++++ .../beam/dsls/sql/planner/BasePlanner.java | 108 -------- .../sql/planner/BeamGroupByExplainTest.java | 106 -------- .../sql/planner/BeamGroupByPipelineTest.java | 111 -------- .../sql/planner/BeamInvalidGroupByTest.java | 51 ---- .../BeamPlannerAggregationSubmitTest.java | 152 ----------- .../sql/planner/BeamPlannerExplainTest.java | 67 ----- .../dsls/sql/planner/BeamPlannerSubmitTest.java | 56 ---- .../sql/schema/BeamPCollectionTableTest.java | 73 ------ 12 files changed, 626 insertions(+), 724 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java new file mode 100644 index 0000000..f7349c6 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.junit.Test; + +/** + * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window. + */ +public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + /** + * GROUP-BY with single aggregation function. + */ + @Test + public void testAggregationWithoutWindow() throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2"; + + PCollection<BeamSqlRow> result = + inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int2", 0); + record.addField("size", 4L); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with multiple aggregation functions. + */ + @Test + public void testAggregationFunctions() throws Exception{ + String sql = "select f_int2, count(*) as size, " + + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," + + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," + + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3," + + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4," + + "sum(f_double) as sum5, avg(f_double) as avg5, " + + "max(f_double) as max5, min(f_double) as min5," + + "max(f_timestamp) as max6, min(f_timestamp) as min6 " + + "FROM TABLE_A group by f_int2"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testAggregationFunctions", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create( + Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", + "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", + "max5", "min5", "max6", "min6"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, + Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT, + Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, + Types.TIMESTAMP, Types.TIMESTAMP)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int2", 0); + record.addField("size", 4L); + + record.addField("sum1", 10000L); + record.addField("avg1", 2500L); + record.addField("max1", 4000L); + record.addField("min1", 1000L); + + record.addField("sum2", (short) 10); + record.addField("avg2", (short) 2); + record.addField("max2", (short) 4); + record.addField("min2", (short) 1); + + record.addField("sum3", (byte) 10); + record.addField("avg3", (byte) 2); + record.addField("max3", (byte) 4); + record.addField("min3", (byte) 1); + + record.addField("sum4", 10.0F); + record.addField("avg4", 2.5F); + record.addField("max4", 4.0F); + record.addField("min4", 1.0F); + + record.addField("sum5", 10.0); + record.addField("avg5", 2.5); + record.addField("max5", 4.0); + record.addField("min5", 1.0); + + record.addField("max6", FORMAT.parse("2017-01-01 02:04:03")); + record.addField("min6", FORMAT.parse("2017-01-01 01:01:03")); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * Implicit GROUP-BY with DISTINCT. + */ + @Test + public void testDistinct() throws Exception { + String sql = "SELECT distinct f_int, f_long FROM TABLE_A "; + + PCollection<BeamSqlRow> result = + inputA1.apply("testDistinct", BeamSql.simpleQuery(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int", 1); + record1.addField("f_long", 1000L); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int", 2); + record2.addField("f_long", 2000L); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int", 3); + record3.addField("f_long", 3000L); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int", 4); + record4.addField("f_long", 4000L); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with TUMBLE window(akka fix_time_window). + */ + @Test + public void testTumbleWindow() throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testTumbleWindow", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int2", 0); + record1.addField("size", 3L); + record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); + record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int2", 0); + record2.addField("size", 1L); + record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); + + PAssert.that(result).containsInAnyOrder(record1, record2); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with HOP window(akka sliding_window). + */ + @Test + public void testHopWindow() throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; + PCollection<BeamSqlRow> result = + inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int2", 0); + record1.addField("size", 3L); + record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime())); + record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int2", 0); + record2.addField("size", 3L); + record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); + record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int2", 0); + record3.addField("size", 1L); + record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); + record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime())); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int2", 0); + record4.addField("size", 1L); + record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with SESSION window. + */ + @Test + public void testSessionWindow() throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testSessionWindow", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int2", 0); + record1.addField("size", 3L); + record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime())); + record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime())); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int2", 0); + record2.addField("size", 1L); + record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime())); + record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime())); + + PAssert.that(result).containsInAnyOrder(record1, record2); + + pipeline.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java new file mode 100644 index 0000000..d62bdc4 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import java.sql.Types; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.ClassRule; + +/** + * prepare input records to test {@link BeamSql}. + * + * <p>Note that, any change in these records would impact tests in this package. + * + */ +public class BeamSqlDslBase { + public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + @ClassRule + public static TestPipeline pipeline = TestPipeline.create(); + + public static BeamSqlRecordType recordTypeInTableA; + public static List<BeamSqlRow> recordsInTableA; + + public static PCollection<BeamSqlRow> inputA1; + public static PCollection<BeamSqlRow> inputA2; + + @BeforeClass + public static void prepareClass() throws ParseException { + recordTypeInTableA = BeamSqlRecordType.create( + Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", + "f_timestamp", "f_int2"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, + Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER)); + + recordsInTableA = prepareInputRecordsInTableA(); + + inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA) + .withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + + inputA2 = PBegin.in(pipeline).apply("inputA2", Create.of(recordsInTableA.get(0)) + .withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + } + + private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{ + List<BeamSqlRow> rows = new ArrayList<>(); + + BeamSqlRow row1 = new BeamSqlRow(recordTypeInTableA); + row1.addField(0, 1); + row1.addField(1, 1000L); + row1.addField(2, Short.valueOf("1")); + row1.addField(3, Byte.valueOf("1")); + row1.addField(4, 1.0f); + row1.addField(5, 1.0); + row1.addField(6, "string_row1"); + row1.addField(7, FORMAT.parse("2017-01-01 01:01:03")); + row1.addField(8, 0); + rows.add(row1); + + BeamSqlRow row2 = new BeamSqlRow(recordTypeInTableA); + row2.addField(0, 2); + row2.addField(1, 2000L); + row2.addField(2, Short.valueOf("2")); + row2.addField(3, Byte.valueOf("2")); + row2.addField(4, 2.0f); + row2.addField(5, 2.0); + row2.addField(6, "string_row2"); + row2.addField(7, FORMAT.parse("2017-01-01 01:02:03")); + row2.addField(8, 0); + rows.add(row2); + + BeamSqlRow row3 = new BeamSqlRow(recordTypeInTableA); + row3.addField(0, 3); + row3.addField(1, 3000L); + row3.addField(2, Short.valueOf("3")); + row3.addField(3, Byte.valueOf("3")); + row3.addField(4, 3.0f); + row3.addField(5, 3.0); + row3.addField(6, "string_row3"); + row3.addField(7, FORMAT.parse("2017-01-01 01:06:03")); + row3.addField(8, 0); + rows.add(row3); + + BeamSqlRow row4 = new BeamSqlRow(recordTypeInTableA); + row4.addField(0, 4); + row4.addField(1, 4000L); + row4.addField(2, Short.valueOf("4")); + row4.addField(3, Byte.valueOf("4")); + row4.addField(4, 4.0f); + row4.addField(5, 4.0); + row4.addField(6, "string_row4"); + row4.addField(7, FORMAT.parse("2017-01-01 02:04:03")); + row4.addField(8, 0); + rows.add(row4); + + return rows; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java new file mode 100644 index 0000000..b68e526 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; + +/** + * Tests for WHERE queries. + */ +public class BeamSqlDslFilterTest extends BeamSqlDslBase { + /** + * single filter. + */ + @Test + public void testSingleFilter() throws Exception { + String sql = "SELECT * FROM TABLE_A WHERE f_int = 1"; + + PCollection<BeamSqlRow> result = + inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); + + pipeline.run().waitUntilFinish(); + } + + /** + * composite filters. + */ + @Test + public void testCompositeFilter() throws Exception { + String sql = "SELECT * FROM TABLE_A" + + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testCompositeFilter", BeamSql.query(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); + + pipeline.run().waitUntilFinish(); + } + + /** + * nothing return with filters. + */ + @Test + public void testNoReturnFilter() throws Exception { + String sql = "SELECT * FROM TABLE_A WHERE f_int < 1"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testNoReturnFilter", BeamSql.query(sql)); + + PAssert.that(result).empty(); + + pipeline.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java new file mode 100644 index 0000000..2998682 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; + +/** + * Tests for field-project in queries. + */ +public class BeamSqlDslProjectTest extends BeamSqlDslBase { + /** + * select all fields. + */ + @Test + public void testSelectAll() throws Exception { + String sql = "SELECT * FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); + + pipeline.run().waitUntilFinish(); + } + + /** + * select partial fields. + */ + @Test + public void testPartialFields() throws Exception { + String sql = "SELECT f_int, f_long FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2) + .apply("testPartialFields", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); + record.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * select partial fields for multiple rows. + */ + @Test + public void testPartialFieldsInMultipleRow() throws Exception { + String sql = "SELECT f_int, f_long FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); + record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); + record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); + record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); + record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * select partial fields. + */ + @Test + public void testPartialFieldsInRows() throws Exception { + String sql = "SELECT f_int, f_long FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testPartialFieldsInRows", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); + record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); + record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); + record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); + record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * select literal field. + */ + @Test + public void testLiteralField() throws Exception { + String sql = "SELECT 1 as literal_field FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2) + .apply("testLiteralField", BeamSql.query(sql)); + + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"), + Arrays.asList(Types.INTEGER)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("literal_field", 1); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java deleted file mode 100644 index 2c5b555..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; - -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.junit.BeforeClass; - -/** - * prepare {@code BeamSqlRunner} for test. - * - */ -public class BasePlanner { - @BeforeClass - public static void prepareClass() { - registerTable("ORDER_DETAILS", getTable()); - registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); - registerTable("SUB_ORDER_RAM", getTable()); - } - - private static BaseBeamTable getTable() { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); - } - }; - - BeamSqlRecordType dataType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - BeamSqlRow row1 = new BeamSqlRow(dataType); - row1.addField(0, 12345L); - row1.addField(1, 0); - row1.addField(2, 10.5); - row1.addField(3, new Date()); - - BeamSqlRow row2 = new BeamSqlRow(dataType); - row2.addField(0, 12345L); - row2.addField(1, 1); - row2.addField(2, 20.5); - row2.addField(3, new Date()); - - BeamSqlRow row3 = new BeamSqlRow(dataType); - row3.addField(0, 12345L); - row3.addField(1, 0); - row3.addField(2, 20.5); - row3.addField(3, new Date()); - - BeamSqlRow row4 = new BeamSqlRow(dataType); - row4.addField(0, null); - row4.addField(1, null); - row4.addField(2, 20.5); - row4.addField(3, new Date()); - - return new MockedBeamSqlTable(dataType).withInputRecords( - Arrays.asList(row1, row2, row3, row4)); - } - - public static BaseBeamTable getTable(String bootstrapServer, String topic) { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); - } - }; - - BeamSqlRecordType dataType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - - Map<String, Object> consumerPara = new HashMap<String, Object>(); - consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - - return new BeamKafkaCSVTable(dataType, bootstrapServer, Arrays.asList(topic)) - .updateConsumerProperties(consumerPara); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java deleted file mode 100644 index 4ea0662..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; -import org.junit.Test; - -/** - * Test group-by methods. - * - */ -public class BeamGroupByExplainTest extends BasePlanner { - - /** - * GROUP-BY without window operation, and grouped fields. - */ - @Test - public void testSimpleGroupExplain() throws Exception { - String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 "; - String plan = BeamSqlCli.explainQuery(sql); - } - - /** - * GROUP-BY without window operation, and grouped fields. - */ - @Test - public void testSimpleGroup2Explain() throws Exception { - String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - String plan = BeamSqlCli.explainQuery(sql); - } - - /** - * GROUP-BY with TUMBLE window. - */ - @Test - public void testTumbleExplain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - String plan = BeamSqlCli.explainQuery(sql); - } - - /** - * GROUP-BY with TUMBLE window. - */ - @Test - public void testTumbleWithDelayExplain() throws Exception { - String sql = "SELECT order_id, site_id, " - + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" - + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " - + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - String plan = BeamSqlCli.explainQuery(sql); - } - - /** - * GROUP-BY with HOP window. - */ - @Test - public void testHopExplain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" - + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)"; - String plan = BeamSqlCli.explainQuery(sql); - } - - /** - * GROUP-BY with SESSION window. - */ - @Test - public void testSessionExplain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" - + ", SESSION(order_time, INTERVAL '5' MINUTE)"; - String plan = BeamSqlCli.explainQuery(sql); - } - - /** - * Query with UDF. - */ - @Test - public void testUdf() throws Exception { - BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); - String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; - - String plan = BeamSqlCli.explainQuery(sql); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java deleted file mode 100644 index 8db65d1..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Test; - -/** - * Test group-by methods. - * - */ -public class BeamGroupByPipelineTest extends BasePlanner { - public final TestPipeline pipeline = TestPipeline.create(); - - /** - * GROUP-BY without window operation, and grouped fields. - */ - @Test - public void testSimpleGroupExplain() throws Exception { - String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 "; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - /** - * GROUP-BY without window operation, and grouped fields. - */ - @Test - public void testSimpleGroup2Explain() throws Exception { - String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - /** - * GROUP-BY with TUMBLE window. - */ - @Test - public void testTumbleExplain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - /** - * GROUP-BY with TUMBLE window. - */ - @Test - public void testTumbleWithDelayExplain() throws Exception { - String sql = "SELECT order_id, site_id, " - + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" - + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " - + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - /** - * GROUP-BY with HOP window. - */ - @Test - public void testHopExplain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" - + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - /** - * GROUP-BY with SESSION window. - */ - @Test - public void testSessionExplain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" - + ", SESSION(order_time, INTERVAL '5' MINUTE)"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - /** - * Query with UDF. - */ - @Test - public void testUdf() throws Exception { - BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); - String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; - - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java deleted file mode 100644 index adb454c..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.tools.ValidationException; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test group-by methods. - * - */ -public class BeamInvalidGroupByTest extends BasePlanner { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @Test(expected = ValidationException.class) - public void testTumble2Explain() throws Exception { - String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - - @Test(expected = ValidationException.class) - public void testTumble3Explain() throws Exception { - String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)" - + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " - + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java deleted file mode 100644 index f98517b..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Tests to execute a query. - * - */ -public class BeamPlannerAggregationSubmitTest { - public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepareClass() throws ParseException { - BeamSqlEnv.registerTable("ORDER_DETAILS", getOrderTable()); - BeamSqlEnv.registerTable("ORDER_SUMMARY", getSummaryTable()); - } - - @Before - public void prepare() throws ParseException { - MockedBeamSqlTable.CONTENT.clear(); - } - - private static BaseBeamTable getOrderTable() throws ParseException { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT) - .add("site_id", SqlTypeName.INTEGER) - .add("order_time", SqlTypeName.TIMESTAMP).build(); - } - }; - - BeamSqlRecordType dataType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - BeamSqlRow row1 = new BeamSqlRow(dataType); - row1.addField(0, 12345L); - row1.addField(1, 1); - row1.addField(2, format.parse("2017-01-01 01:02:03")); - - BeamSqlRow row2 = new BeamSqlRow(dataType); - row2.addField(0, 12345L); - row2.addField(1, 0); - row2.addField(2, format.parse("2017-01-01 01:03:04")); - - BeamSqlRow row3 = new BeamSqlRow(dataType); - row3.addField(0, 12345L); - row3.addField(1, 0); - row3.addField(2, format.parse("2017-01-01 02:03:04")); - - BeamSqlRow row4 = new BeamSqlRow(dataType); - row4.addField(0, 2132L); - row4.addField(1, 0); - row4.addField(2, format.parse("2017-01-01 03:04:05")); - - return new MockedBeamSqlTable(dataType).withInputRecords( - Arrays.asList(row1 - , row2, row3, row4 - )); - - } - - private static BaseBeamTable getSummaryTable() { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder() - .add("site_id", SqlTypeName.INTEGER) - .add("agg_hour", SqlTypeName.TIMESTAMP) - .add("size", SqlTypeName.BIGINT).build(); - } - }; - BeamSqlRecordType dataType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - - return new MockedBeamSqlTable(dataType); - } - - - @Test - public void selectWithWindowAggregation() throws Exception{ - String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, agg_hour, SIZE) " - + "SELECT site_id, TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" - + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 1 " + "GROUP BY site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - - BeamSqlCli.compilePipeline(sql, pipeline); - - pipeline.run().waitUntilFinish(); - - Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1); - BeamSqlRow result = MockedBeamSqlTable.CONTENT.peek(); - Assert.assertEquals(1, result.getInteger(0)); - Assert.assertEquals(format.parse("2017-01-01 01:00:00"), result.getDate(1)); - Assert.assertEquals(1L, result.getLong(2)); - } - - @Test - public void selectWithoutWindowAggregation() throws Exception{ - String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, SIZE) " - + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - - BeamSqlCli.compilePipeline(sql, pipeline); - - pipeline.run().waitUntilFinish(); - - Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1); - Assert.assertEquals("site_id=0,agg_hour=null,size=3", - MockedBeamSqlTable.CONTENT.peek().valueInString()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java deleted file mode 100644 index e617ff2..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests to explain queries. - * - */ -public class BeamPlannerExplainTest extends BasePlanner { - @Test - public void selectAll() throws Exception { - String sql = "SELECT * FROM ORDER_DETAILS"; - String plan = BeamSqlCli.explainQuery(sql); - - String expectedPlan = - "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" - + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; - Assert.assertEquals("explain doesn't match", expectedPlan, plan); - } - - @Test - public void selectWithFilter() throws Exception { - String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 and price > 20"; - String plan = BeamSqlCli.explainQuery(sql); - - String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" - + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" - + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; - Assert.assertEquals("explain doesn't match", expectedPlan, plan); - } - - @Test - public void insertSelectFilter() throws Exception { - String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " - + " order_id, site_id, price " + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 and price > 20"; - String plan = BeamSqlCli.explainQuery(sql); - - String expectedPlan = - "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" - + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n" - + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" - + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" - + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; - Assert.assertEquals("explain doesn't match", expectedPlan, plan); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java deleted file mode 100644 index 4df7f8a..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -/** - * Tests to execute a query. - * - */ -public class BeamPlannerSubmitTest extends BasePlanner { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @Before - public void prepare() { - MockedBeamSqlTable.CONTENT.clear(); - } - - @Test - public void insertSelectFilter() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - - pipeline.run().waitUntilFinish(); - - Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1); - Assert.assertTrue(MockedBeamSqlTable.CONTENT.peek().valueInString() - .contains("order_id=12345,site_id=0,price=20.5,order_time=")); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java deleted file mode 100644 index 8dc8439..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.BasePlanner; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test case for BeamPCollectionTable. - */ -public class BeamPCollectionTableTest extends BasePlanner{ - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @Before - public void prepareTable(){ - RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("c1", SqlTypeName.INTEGER) - .add("c2", SqlTypeName.VARCHAR).build(); - } - }; - BeamSqlRecordType dataType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - - BeamSqlRow row = new BeamSqlRow(CalciteUtils.toBeamRecordType( - protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY))); - row.addField(0, 1); - row.addField(1, "hello world."); - PCollection<BeamSqlRow> inputStream = PBegin.in(pipeline).apply(Create.of(row)); - BeamSqlEnv.registerTable("COLLECTION_TABLE", - new BeamPCollectionTable(inputStream, dataType)); - } - - @Test - public void testSelectFromPCollectionTable() throws Exception{ - String sql = "select c1, c2 from COLLECTION_TABLE"; - PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); - - pipeline.run().waitUntilFinish(); - } - -}
