This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e99dd29 [BEAM-7100] BeamValuesRel should accept empty tuples new 3128cf5 Merge pull request #8339 from amaliujia/rw_empty_join_on_one_side e99dd29 is described below commit e99dd2970e1dc075c0bb3e553c5afb8a1f317b40 Author: amaliujia <amaliu...@gmail.com> AuthorDate: Wed Apr 17 14:21:46 2019 -0700 [BEAM-7100] BeamValuesRel should accept empty tuples --- .../sdk/extensions/sql/impl/rel/BeamValuesRel.java | 6 --- .../impl/rel/BeamJoinRelBoundedVsBoundedTest.java | 54 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index dbb5bd8..b681738 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -80,14 +80,8 @@ public class BeamValuesRel extends Values implements BeamRelNode { BeamValuesRel.class.getSimpleName(), pinput); - if (tuples.isEmpty()) { - throw new IllegalStateException("Values with empty tuples!"); - } - Schema schema = CalciteUtils.toSchema(getRowType()); - List<Row> rows = tuples.stream().map(tuple -> tupleToRow(schema, tuple)).collect(toList()); - return pinput.getPipeline().begin().apply(Create.of(rows)).setRowSchema(schema); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java index 3286042..162b0ef 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -106,6 +106,60 @@ public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest { } @Test + public void testLeftOuterJoinWithEmptyTuplesOnRightSide() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " LEFT OUTER JOIN (SELECT * FROM ORDER_DETAILS2 WHERE FALSE) o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> rows = compilePipeline(sql, pipeline); + pipeline.enableAbandonedNodeEnforcement(false); + PAssert.that(rows) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Schema.builder() + .addField("order_id", Schema.FieldType.INT32) + .addField("site_id", Schema.FieldType.INT32) + .addField("price", Schema.FieldType.INT32) + .addNullableField("order_id0", Schema.FieldType.INT32) + .addNullableField("site_id0", Schema.FieldType.INT32) + .addNullableField("price0", Schema.FieldType.INT32) + .build()) + .addRows( + 1, 2, 3, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5, null, null, null) + .getRows()); + pipeline.run(); + } + + @Test + public void testInnerJoinWithEmptyTuplesOnRightSide() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " INNER JOIN (SELECT * FROM ORDER_DETAILS2 WHERE FALSE) o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + PCollection<Row> rows = compilePipeline(sql, pipeline); + pipeline.enableAbandonedNodeEnforcement(false); + PAssert.that(rows) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Schema.builder() + .addField("order_id", Schema.FieldType.INT32) + .addField("site_id", Schema.FieldType.INT32) + .addField("price", Schema.FieldType.INT32) + .addNullableField("order_id0", Schema.FieldType.INT32) + .addNullableField("site_id0", Schema.FieldType.INT32) + .addNullableField("price0", Schema.FieldType.INT32) + .build()) + .getRows()); + pipeline.run(); + } + + @Test public void testRightOuterJoin() throws Exception { String sql = "SELECT * "