This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e99dd29  [BEAM-7100] BeamValuesRel should accept empty tuples
     new 3128cf5  Merge pull request #8339 from 
amaliujia/rw_empty_join_on_one_side
e99dd29 is described below

commit e99dd2970e1dc075c0bb3e553c5afb8a1f317b40
Author: amaliujia <amaliu...@gmail.com>
AuthorDate: Wed Apr 17 14:21:46 2019 -0700

    [BEAM-7100] BeamValuesRel should accept empty tuples
---
 .../sdk/extensions/sql/impl/rel/BeamValuesRel.java |  6 ---
 .../impl/rel/BeamJoinRelBoundedVsBoundedTest.java  | 54 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index dbb5bd8..b681738 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -80,14 +80,8 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
           BeamValuesRel.class.getSimpleName(),
           pinput);
 
-      if (tuples.isEmpty()) {
-        throw new IllegalStateException("Values with empty tuples!");
-      }
-
       Schema schema = CalciteUtils.toSchema(getRowType());
-
       List<Row> rows = tuples.stream().map(tuple -> tupleToRow(schema, 
tuple)).collect(toList());
-
       return 
pinput.getPipeline().begin().apply(Create.of(rows)).setRowSchema(schema);
     }
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index 3286042..162b0ef 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -106,6 +106,60 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
   }
 
   @Test
+  public void testLeftOuterJoinWithEmptyTuplesOnRightSide() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " LEFT OUTER JOIN (SELECT * FROM ORDER_DETAILS2 WHERE FALSE) o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.builder()
+                        .addField("order_id", Schema.FieldType.INT32)
+                        .addField("site_id", Schema.FieldType.INT32)
+                        .addField("price", Schema.FieldType.INT32)
+                        .addNullableField("order_id0", Schema.FieldType.INT32)
+                        .addNullableField("site_id0", Schema.FieldType.INT32)
+                        .addNullableField("price0", Schema.FieldType.INT32)
+                        .build())
+                .addRows(
+                    1, 2, 3, null, null, null, 2, 3, 3, null, null, null, 3, 
4, 5, null, null, null)
+                .getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testInnerJoinWithEmptyTuplesOnRightSide() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " INNER JOIN (SELECT * FROM ORDER_DETAILS2 WHERE FALSE) o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.builder()
+                        .addField("order_id", Schema.FieldType.INT32)
+                        .addField("site_id", Schema.FieldType.INT32)
+                        .addField("price", Schema.FieldType.INT32)
+                        .addNullableField("order_id0", Schema.FieldType.INT32)
+                        .addNullableField("site_id0", Schema.FieldType.INT32)
+                        .addNullableField("price0", Schema.FieldType.INT32)
+                        .build())
+                .getRows());
+    pipeline.run();
+  }
+
+  @Test
   public void testRightOuterJoin() throws Exception {
     String sql =
         "SELECT *  "

Reply via email to