Repository: beam Updated Branches: refs/heads/DSL_SQL b8fa0addc -> aa265e62a
[BEAM-2550] add UnitTest for JOIN in DSL Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9dc5ea8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9dc5ea8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9dc5ea8 Branch: refs/heads/DSL_SQL Commit: e9dc5ea81cbbde39bf11ee183e5403b869d21f50 Parents: b8fa0ad Author: James Xu <[email protected]> Authored: Thu Jul 6 11:29:41 2017 +0800 Committer: Tyler Akidau <[email protected]> Committed: Tue Jul 11 23:46:37 2017 -0700 ---------------------------------------------------------------------- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 5 +- .../apache/beam/dsls/sql/rel/BeamJoinRel.java | 3 - .../beam/dsls/sql/BeamSqlDslJoinTest.java | 191 +++++++++++++++++++ .../org/apache/beam/dsls/sql/TestUtils.java | 25 ++- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 72 +++---- 5 files changed, 255 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index d323d82..b26d2b8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -21,6 +21,8 @@ import com.google.common.base.Joiner; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -53,7 +55,8 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); - return sourceTable.buildIOReader(inputPCollections.getPipeline()); + return sourceTable.buildIOReader(inputPCollections.getPipeline()) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java index e85368e..3c92e42 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -99,12 +99,9 @@ public class BeamJoinRel extends Join implements BeamRelNode { BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType()); PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); - leftRows.setCoder(new BeamSqlRowCoder(leftRowType)); final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); - BeamSqlRecordType rightRowType = CalciteUtils.toBeamRecordType(right.getRowType()); PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); - rightRows.setCoder(new BeamSqlRowCoder(rightRowType)); String stageName = BeamSqlRelUtils.getStageName(this); WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java new file mode 100644 index 0000000..ae5f4e5 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql; + +import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1; +import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2; + +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for joins in queries. + */ +public class BeamSqlDslJoinTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + private static final BeamSqlRecordType SOURCE_RECORD_TYPE = + BeamSqlRecordType.create( + Arrays.asList( + "order_id", "site_id", "price" + ), + Arrays.asList( + Types.INTEGER, Types.INTEGER, Types.INTEGER + ) + ); + + private static final BeamSqlRowCoder SOURCE_CODER = + new BeamSqlRowCoder(SOURCE_RECORD_TYPE); + + private static final BeamSqlRecordType RESULT_RECORD_TYPE = + BeamSqlRecordType.create( + Arrays.asList( + "order_id", "site_id", "price", "order_id0", "site_id0", "price0" + ), + Arrays.asList( + Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER + , Types.INTEGER, Types.INTEGER + ) + ); + + private static final BeamSqlRowCoder RESULT_CODER = + new BeamSqlRowCoder(RESULT_RECORD_TYPE); + + @Test + public void testInnerJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 2, 3, 3, 1, 2, 3 + ).getRows()); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " LEFT OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 1, 2, 3, null, null, null, + 2, 3, 3, 1, 2, 3, + 3, 4, 5, null, null, null + ).getRows()); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 2, 3, 3, 1, 2, 3, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " FULL OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 2, 3, 3, 1, 2, 3, + 1, 2, 3, null, null, null, + 3, 4, 5, null, null, null, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); + pipeline.run(); + } + + @Test(expected = IllegalStateException.class) + public void testException_nonEqualJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id>o2.site_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + queryFromOrderTables(sql); + pipeline.run(); + } + + @Test(expected = IllegalStateException.class) + public void testException_crossJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; + + pipeline.enableAbandonedNodeEnforcement(false); + queryFromOrderTables(sql); + pipeline.run(); + } + + private PCollection<BeamSqlRow> queryFromOrderTables(String sql) { + return PCollectionTuple + .of( + new TupleTag<BeamSqlRow>("ORDER_DETAILS1"), + ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER) + ) + .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"), + ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER) + ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java index cfad333..3294592 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -75,7 +75,13 @@ public class TestUtils { /** * Create a RowsBuilder with the specified row type info. * - * <p>Note: check the class javadoc for for detailed example. + * <p>For example: + * <pre>{@code + * TestUtils.RowsBuilder.of( + * Types.INTEGER, "order_id", + * Types.INTEGER, "sum_site_id", + * Types.VARCHAR, "buyer" + * )}</pre> * * @args pairs of column type and column names. */ @@ -88,6 +94,23 @@ public class TestUtils { } /** + * Create a RowsBuilder with the specified row type info. + * + * <p>For example: + * <pre>{@code + * TestUtils.RowsBuilder.of( + * beamSqlRecordType + * )}</pre> + * @beamSQLRecordType the record type. + */ + public static RowsBuilder of(final BeamSqlRecordType beamSQLRecordType) { + RowsBuilder builder = new RowsBuilder(); + builder.type = beamSQLRecordType; + + return builder; + } + + /** * Add rows to the builder. * * <p>Note: check the class javadoc for for detailed example. http://git-wip-us.apache.org/repos/asf/beam/blob/e9dc5ea8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java index d15cb81..24a3256 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -39,40 +39,40 @@ public class BeamJoinRelBoundedVsBoundedTest { public final TestPipeline pipeline = TestPipeline.create(); private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final MockedBoundedTable ORDER_DETAILS1 = + MockedBoundedTable.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price" + ).addRows( + 1, 2, 3, + 2, 3, 3, + 3, 4, 5 + ); + + public static final MockedBoundedTable ORDER_DETAILS2 = + MockedBoundedTable.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price" + ).addRows( + 1, 2, 3, + 2, 3, 3, + 3, 4, 5 + ); + @BeforeClass public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price" - ).addRows( - 1, 2, 3, - 2, 3, 3, - 3, 4, 5 - ) - ); - - beamSqlEnv.registerTable("ORDER_DETAILS0", - MockedBoundedTable.of( - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 1, 2, 3, - 2, 3, 3, - 3, 4, 5 - ) - ); - + beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1); + beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2); } @Test public void testInnerJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS o1" - + " JOIN ORDER_DETAILS o2" + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + " on " + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; @@ -96,10 +96,10 @@ public class BeamJoinRelBoundedVsBoundedTest { public void testLeftOuterJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS o1" - + " LEFT OUTER JOIN ORDER_DETAILS0 o2" + + "FROM ORDER_DETAILS1 o1" + + " LEFT OUTER JOIN ORDER_DETAILS2 o2" + " on " - + " o1.order_id=o2.site_id0 AND o2.price0=o1.site_id" + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); @@ -124,8 +124,8 @@ public class BeamJoinRelBoundedVsBoundedTest { public void testRightOuterJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS o1" - + " RIGHT OUTER JOIN ORDER_DETAILS o2" + + "FROM ORDER_DETAILS1 o1" + + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" + " on " + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; @@ -151,8 +151,8 @@ public class BeamJoinRelBoundedVsBoundedTest { public void testFullOuterJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS o1" - + " FULL OUTER JOIN ORDER_DETAILS o2" + + "FROM ORDER_DETAILS1 o1" + + " FULL OUTER JOIN ORDER_DETAILS2 o2" + " on " + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; @@ -180,8 +180,8 @@ public class BeamJoinRelBoundedVsBoundedTest { public void testException_nonEqualJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS o1" - + " JOIN ORDER_DETAILS o2" + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + " on " + " o1.order_id>o2.site_id" ; @@ -195,7 +195,7 @@ public class BeamJoinRelBoundedVsBoundedTest { public void testException_crossJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS o1, ORDER_DETAILS o2"; + + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; pipeline.enableAbandonedNodeEnforcement(false); BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
