Repository: beam
Updated Branches:
  refs/heads/DSL_SQL ca2bc723d -> b8fa0addc


Test unsupported/invalid cases in DSL tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/794f1901
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/794f1901
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/794f1901

Branch: refs/heads/DSL_SQL
Commit: 794f1901dcf5fb520a849adce7ce436e8b2f8535
Parents: ca2bc72
Author: mingmxu <ming...@ebay.com>
Authored: Sun Jul 9 22:26:29 2017 -0700
Committer: Tyler Akidau <taki...@apache.org>
Committed: Tue Jul 11 21:04:07 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 30 ++++++++++++++
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    | 17 +++++---
 .../beam/dsls/sql/BeamSqlDslFilterTest.java     | 41 ++++++++++++++++++++
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    | 15 +++++++
 4 files changed, 98 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index b0509ae..f92c803 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -257,4 +257,34 @@ public class BeamSqlDslAggregationTest extends 
BeamSqlDslBase {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  public void testWindowOnNonTimestampField() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage(
+        "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, 
<INTERVAL HOUR>)'");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+        + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testWindowOnNonTimestampField", BeamSql.query(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testUnsupportedDistinct() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Encountered \"*\"");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION 
GROUP BY f_int2";
+
+    PCollection<BeamSqlRow> result =
+        inputA1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
index d62bdc4..308dcb6 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -31,8 +31,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 
 /**
  * prepare input records to test {@link BeamSql}.
@@ -43,14 +45,16 @@ import org.junit.ClassRule;
 public class BeamSqlDslBase {
   public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
 
-  @ClassRule
-  public static TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public ExpectedException exceptions = ExpectedException.none();
 
   public static BeamSqlRecordType recordTypeInTableA;
   public static List<BeamSqlRow> recordsInTableA;
 
-  public static PCollection<BeamSqlRow> inputA1;
-  public static PCollection<BeamSqlRow> inputA2;
+  public PCollection<BeamSqlRow> inputA1;
+  public PCollection<BeamSqlRow> inputA2;
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
@@ -61,7 +65,10 @@ public class BeamSqlDslBase {
             Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER));
 
     recordsInTableA = prepareInputRecordsInTableA();
+  }
 
+  @Before
+  public void preparePCollections(){
     inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA)
         .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
index 254b96d..f46f6c5 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
@@ -75,4 +75,45 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  public void testFromInvalidTableName1() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Object 'TABLE_B' not found");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testFromInvalidTableName1", BeamSql.query(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testFromInvalidTableName2() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Use fixed table name PCOLLECTION");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT * FROM PCOLLECTION_NA";
+
+    PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testInvalidFilter() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Column 'f_int_na' not found in any table");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
+
+    PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/794f1901/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 1faa4d0..877fa4f 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -160,4 +160,19 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  public void testProjectUnknownField() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Column 'f_int_na' not found in any table");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT f_int_na FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+        .apply("testProjectUnknownField", BeamSql.query(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
 }

Reply via email to