Repository: beam Updated Branches: refs/heads/DSL_SQL ca2bc723d -> b8fa0addc
Test unsupported/invalid cases in DSL tests. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/794f1901 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/794f1901 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/794f1901 Branch: refs/heads/DSL_SQL Commit: 794f1901dcf5fb520a849adce7ce436e8b2f8535 Parents: ca2bc72 Author: mingmxu <ming...@ebay.com> Authored: Sun Jul 9 22:26:29 2017 -0700 Committer: Tyler Akidau <taki...@apache.org> Committed: Tue Jul 11 21:04:07 2017 -0700 ---------------------------------------------------------------------- .../dsls/sql/BeamSqlDslAggregationTest.java | 30 ++++++++++++++ .../apache/beam/dsls/sql/BeamSqlDslBase.java | 17 +++++--- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 41 ++++++++++++++++++++ .../beam/dsls/sql/BeamSqlDslProjectTest.java | 15 +++++++ 4 files changed, 98 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index b0509ae..f92c803 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -257,4 +257,34 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { pipeline.run().waitUntilFinish(); } + + @Test + public void testWindowOnNonTimestampField() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage( + "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testUnsupportedDistinct() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Encountered \"*\""); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; + + PCollection<BeamSqlRow> result = + inputA1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); + + pipeline.run().waitUntilFinish(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java index d62bdc4..308dcb6 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java @@ -31,8 +31,10 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; import org.junit.BeforeClass; -import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.ExpectedException; /** * prepare input records to test {@link BeamSql}. @@ -43,14 +45,16 @@ import org.junit.ClassRule; public class BeamSqlDslBase { public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - @ClassRule - public static TestPipeline pipeline = TestPipeline.create(); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + @Rule + public ExpectedException exceptions = ExpectedException.none(); public static BeamSqlRecordType recordTypeInTableA; public static List<BeamSqlRow> recordsInTableA; - public static PCollection<BeamSqlRow> inputA1; - public static PCollection<BeamSqlRow> inputA2; + public PCollection<BeamSqlRow> inputA1; + public PCollection<BeamSqlRow> inputA2; @BeforeClass public static void prepareClass() throws ParseException { @@ -61,7 +65,10 @@ public class BeamSqlDslBase { Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER)); recordsInTableA = prepareInputRecordsInTableA(); + } + @Before + public void preparePCollections(){ inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA) .withCoder(new BeamSqlRowCoder(recordTypeInTableA))); http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java index 254b96d..f46f6c5 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -75,4 +75,45 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { pipeline.run().waitUntilFinish(); } + + @Test + public void testFromInvalidTableName1() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Object 'TABLE_B' not found"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT * FROM TABLE_B WHERE f_int < 1"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + .apply("testFromInvalidTableName1", BeamSql.query(sql)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testFromInvalidTableName2() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Use fixed table name PCOLLECTION"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT * FROM PCOLLECTION_NA"; + + PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testInvalidFilter() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Column 'f_int_na' not found in any table"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; + + PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql)); + + pipeline.run().waitUntilFinish(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 1faa4d0..877fa4f 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -160,4 +160,19 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { pipeline.run().waitUntilFinish(); } + + @Test + public void testProjectUnknownField() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Column 'f_int_na' not found in any table"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT f_int_na FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2) + .apply("testProjectUnknownField", BeamSql.query(sql)); + + pipeline.run().waitUntilFinish(); + } }