Repository: beam
Updated Branches:
  refs/heads/DSL_SQL b8fa0addc -> aa265e62a


[BEAM-2550] add UnitTest for JOIN in DSL


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9dc5ea8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9dc5ea8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9dc5ea8

Branch: refs/heads/DSL_SQL
Commit: e9dc5ea81cbbde39bf11ee183e5403b869d21f50
Parents: b8fa0ad
Author: James Xu <[email protected]>
Authored: Thu Jul 6 11:29:41 2017 +0800
Committer: Tyler Akidau <[email protected]>
Committed: Tue Jul 11 23:46:37 2017 -0700

----------------------------------------------------------------------
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |   5 +-
 .../apache/beam/dsls/sql/rel/BeamJoinRel.java   |   3 -
 .../beam/dsls/sql/BeamSqlDslJoinTest.java       | 191 +++++++++++++++++++
 .../org/apache/beam/dsls/sql/TestUtils.java     |  25 ++-
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    |  72 +++----
 5 files changed, 255 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index d323d82..b26d2b8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -21,6 +21,8 @@ import com.google.common.base.Joiner;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -53,7 +55,8 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
     } else {
       //If not, the source PColection is provided with 
BaseBeamTable.buildIOReader().
       BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
-      return sourceTable.buildIOReader(inputPCollections.getPipeline());
+      return sourceTable.buildIOReader(inputPCollections.getPipeline())
+          .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
index e85368e..3c92e42 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -99,12 +99,9 @@ public class BeamJoinRel extends Join implements BeamRelNode 
{
     BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
     BeamSqlRecordType leftRowType = 
CalciteUtils.toBeamRecordType(left.getRowType());
     PCollection<BeamSqlRow> leftRows = 
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-    leftRows.setCoder(new BeamSqlRowCoder(leftRowType));
 
     final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    BeamSqlRecordType rightRowType = 
CalciteUtils.toBeamRecordType(right.getRowType());
     PCollection<BeamSqlRow> rightRows = 
rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-    rightRows.setCoder(new BeamSqlRowCoder(rightRowType));
 
     String stageName = BeamSqlRelUtils.getStageName(this);
     WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();

http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
new file mode 100644
index 0000000..ae5f4e5
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import static 
org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static 
org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for joins in queries.
+ */
+public class BeamSqlDslJoinTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
+      BeamSqlRecordType.create(
+          Arrays.asList(
+              "order_id", "site_id", "price"
+          ),
+          Arrays.asList(
+              Types.INTEGER, Types.INTEGER, Types.INTEGER
+          )
+      );
+
+  private static final BeamSqlRowCoder SOURCE_CODER =
+      new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
+
+  private static final BeamSqlRecordType RESULT_RECORD_TYPE =
+      BeamSqlRecordType.create(
+          Arrays.asList(
+          "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
+          ),
+          Arrays.asList(
+              Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER
+              , Types.INTEGER, Types.INTEGER
+          )
+      );
+
+  private static final BeamSqlRowCoder RESULT_CODER =
+      new BeamSqlRowCoder(RESULT_RECORD_TYPE);
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            2, 3, 3, 1, 2, 3
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            1, 2, 3, null, null, null,
+            2, 3, 3, 1, 2, 3,
+            3, 4, 5, null, null, null
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            2, 3, 3, 1, 2, 3,
+            null, null, null, 2, 3, 3,
+            null, null, null, 3, 4, 5
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " FULL OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            2, 3, 3, 1, 2, 3,
+            1, 2, 3, null, null, null,
+            3, 4, 5, null, null, null,
+            null, null, null, 2, 3, 3,
+            null, null, null, 3, 4, 5
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testException_nonEqualJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id>o2.site_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    queryFromOrderTables(sql);
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testException_crossJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    queryFromOrderTables(sql);
+    pipeline.run();
+  }
+
+  private PCollection<BeamSqlRow> queryFromOrderTables(String sql) {
+    return PCollectionTuple
+        .of(
+            new TupleTag<BeamSqlRow>("ORDER_DETAILS1"),
+            ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)
+        )
+        .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"),
+            ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)
+        ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index cfad333..3294592 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -75,7 +75,13 @@ public class TestUtils {
     /**
      * Create a RowsBuilder with the specified row type info.
      *
-     * <p>Note: check the class javadoc for for detailed example.
+     * <p>For example:
+     * <pre>{@code
+     * TestUtils.RowsBuilder.of(
+     *   Types.INTEGER, "order_id",
+     *   Types.INTEGER, "sum_site_id",
+     *   Types.VARCHAR, "buyer"
+     * )}</pre>
      *
      * @args pairs of column type and column names.
      */
@@ -88,6 +94,23 @@ public class TestUtils {
     }
 
     /**
+     * Create a RowsBuilder with the specified row type info.
+     *
+     * <p>For example:
+     * <pre>{@code
+     * TestUtils.RowsBuilder.of(
+     *   beamSqlRecordType
+     * )}</pre>
+     * @beamSQLRecordType the record type.
+     */
+    public static RowsBuilder of(final BeamSqlRecordType beamSQLRecordType) {
+      RowsBuilder builder = new RowsBuilder();
+      builder.type = beamSQLRecordType;
+
+      return builder;
+    }
+
+    /**
      * Add rows to the builder.
      *
      * <p>Note: check the class javadoc for for detailed example.

http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
index d15cb81..24a3256 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -39,40 +39,40 @@ public class BeamJoinRelBoundedVsBoundedTest {
   public final TestPipeline pipeline = TestPipeline.create();
   private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
 
+  public static final MockedBoundedTable ORDER_DETAILS1 =
+      MockedBoundedTable.of(
+          Types.INTEGER, "order_id",
+          Types.INTEGER, "site_id",
+          Types.INTEGER, "price"
+      ).addRows(
+          1, 2, 3,
+          2, 3, 3,
+          3, 4, 5
+      );
+
+  public static final MockedBoundedTable ORDER_DETAILS2 =
+      MockedBoundedTable.of(
+          Types.INTEGER, "order_id",
+          Types.INTEGER, "site_id",
+          Types.INTEGER, "price"
+      ).addRows(
+          1, 2, 3,
+          2, 3, 3,
+          3, 4, 5
+      );
+
   @BeforeClass
   public static void prepare() {
-    beamSqlEnv.registerTable("ORDER_DETAILS",
-        MockedBoundedTable.of(
-            Types.INTEGER, "order_id",
-            Types.INTEGER, "site_id",
-            Types.INTEGER, "price"
-        ).addRows(
-            1, 2, 3,
-            2, 3, 3,
-            3, 4, 5
-        )
-    );
-
-    beamSqlEnv.registerTable("ORDER_DETAILS0",
-        MockedBoundedTable.of(
-            Types.INTEGER, "order_id0",
-            Types.INTEGER, "site_id0",
-            Types.INTEGER, "price0"
-        ).addRows(
-            1, 2, 3,
-            2, 3, 3,
-            3, 4, 5
-        )
-    );
-
+    beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
   }
 
   @Test
   public void testInnerJoin() throws Exception {
     String sql =
         "SELECT *  "
-        + "FROM ORDER_DETAILS o1"
-        + " JOIN ORDER_DETAILS o2"
+        + "FROM ORDER_DETAILS1 o1"
+        + " JOIN ORDER_DETAILS2 o2"
         + " on "
         + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
@@ -96,10 +96,10 @@ public class BeamJoinRelBoundedVsBoundedTest {
   public void testLeftOuterJoin() throws Exception {
     String sql =
         "SELECT *  "
-            + "FROM ORDER_DETAILS o1"
-            + " LEFT OUTER JOIN ORDER_DETAILS0 o2"
+            + "FROM ORDER_DETAILS1 o1"
+            + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
             + " on "
-            + " o1.order_id=o2.site_id0 AND o2.price0=o1.site_id"
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
beamSqlEnv);
@@ -124,8 +124,8 @@ public class BeamJoinRelBoundedVsBoundedTest {
   public void testRightOuterJoin() throws Exception {
     String sql =
         "SELECT *  "
-            + "FROM ORDER_DETAILS o1"
-            + " RIGHT OUTER JOIN ORDER_DETAILS o2"
+            + "FROM ORDER_DETAILS1 o1"
+            + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
             + " on "
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
@@ -151,8 +151,8 @@ public class BeamJoinRelBoundedVsBoundedTest {
   public void testFullOuterJoin() throws Exception {
     String sql =
         "SELECT *  "
-            + "FROM ORDER_DETAILS o1"
-            + " FULL OUTER JOIN ORDER_DETAILS o2"
+            + "FROM ORDER_DETAILS1 o1"
+            + " FULL OUTER JOIN ORDER_DETAILS2 o2"
             + " on "
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
@@ -180,8 +180,8 @@ public class BeamJoinRelBoundedVsBoundedTest {
   public void testException_nonEqualJoin() throws Exception {
     String sql =
         "SELECT *  "
-            + "FROM ORDER_DETAILS o1"
-            + " JOIN ORDER_DETAILS o2"
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
             + " on "
             + " o1.order_id>o2.site_id"
         ;
@@ -195,7 +195,7 @@ public class BeamJoinRelBoundedVsBoundedTest {
   public void testException_crossJoin() throws Exception {
     String sql =
         "SELECT *  "
-            + "FROM ORDER_DETAILS o1, ORDER_DETAILS o2";
+            + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
 
     pipeline.enableAbandonedNodeEnforcement(false);
     BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);

Reply via email to