Repository: beam Updated Branches: refs/heads/DSL_SQL 7ba77dd43 -> ca2bc723d
BeamSql: refactor the MockedBeamSqlTable and related tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21497194 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21497194 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21497194 Branch: refs/heads/DSL_SQL Commit: 21497194db3ddce37a4747b3de2714b02684c57e Parents: 7ba77dd Author: James Xu <[email protected]> Authored: Tue Jun 27 10:42:40 2017 +0800 Committer: Luke Cwik <[email protected]> Committed: Wed Jul 5 09:33:40 2017 -0700 ---------------------------------------------------------------------- .../dsls/sql/planner/MockedBeamSqlTable.java | 21 +++--- .../beam/dsls/sql/rel/BeamMinusRelTest.java | 1 - .../beam/dsls/sql/rel/BeamSortRelTest.java | 79 ++++++++------------ .../beam/dsls/sql/rel/BeamValuesRelTest.java | 6 -- 4 files changed, 42 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index fa80cc1..bb10369 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -40,23 +40,16 @@ import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; /** - * A mock table use to check input/output. - * + * Mocked table for bounded data sources. */ public class MockedBeamSqlTable extends BaseBeamTable { - public static final AtomicInteger COUNTER = new AtomicInteger(); - public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); - + private static final AtomicInteger COUNTER = new AtomicInteger(); + private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); private List<BeamSqlRow> inputRecords; public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) { super(beamSqlRecordType); } - public MockedBeamSqlTable withInputRecords(List<BeamSqlRow> inputRecords){ - this.inputRecords = inputRecords; - return this; - } - /** * Convenient way to build a mocked table with mock data: * @@ -81,6 +74,9 @@ public class MockedBeamSqlTable extends BaseBeamTable { * 10L, 100, 10.0, new Date()) * }</pre> */ + // FIXME: refactor this method + // 1) use Types rather than SqlTypeName + // 2) use RowsBuilder rather than duplicate the logic here public static MockedBeamSqlTable of(final Object... args){ final RelProtoDataType protoRowType = new RelProtoDataType() { @Override @@ -112,7 +108,10 @@ public class MockedBeamSqlTable extends BaseBeamTable { } rows.add(row); } - return new MockedBeamSqlTable(beamSQLRecordType).withInputRecords(rows); + MockedBeamSqlTable table = new MockedBeamSqlTable(beamSQLRecordType); + table.inputRecords = rows; + + return table; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index 688ff8e..bb5e7ee 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -62,7 +62,6 @@ public class BeamMinusRelTest { public void setUp() { sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); - MockedBeamSqlTable.CONTENT.clear(); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index 2519984..d5c18fc 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -18,16 +18,15 @@ package org.apache.beam.dsls.sql.rel; -import java.util.Collection; import java.util.Date; -import java.util.Iterator; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -70,20 +69,17 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4"; - System.out.println(sql); - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( + SqlTypeName.BIGINT, "order_id", + SqlTypeName.INTEGER, "site_id", + SqlTypeName.DOUBLE, "price", + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0 + ).getInputRecords()); pipeline.run().waitUntilFinish(); - - assertEquals( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - 1L, 2, 1.0, - 1L, 1, 2.0, - 2L, 4, 3.0, - 2L, 1, 4.0 - ).getInputRecords(), MockedBeamSqlTable.CONTENT); } @Test @@ -108,10 +104,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - pipeline.run().waitUntilFinish(); - - assertEquals( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", @@ -121,7 +115,9 @@ public class BeamSortRelTest { 1L, 2, 1.0, 2L, null, 4.0, 2L, 1, 3.0 - ).getInputRecords(), MockedBeamSqlTable.CONTENT); + ).getInputRecords() + ); + pipeline.run().waitUntilFinish(); } @Test @@ -146,10 +142,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - pipeline.run().waitUntilFinish(); - - assertEquals( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", @@ -159,7 +153,9 @@ public class BeamSortRelTest { 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0 - ).getInputRecords(), MockedBeamSqlTable.CONTENT); + ).getInputRecords() + ); + pipeline.run().waitUntilFinish(); } @Test @@ -169,10 +165,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - pipeline.run().waitUntilFinish(); - - assertEquals( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", @@ -182,7 +176,9 @@ public class BeamSortRelTest { 6L, 6, 6.0, 7L, 7, 7.0, 8L, 8888, 8.0 - ).getInputRecords(), MockedBeamSqlTable.CONTENT); + ).getInputRecords() + ); + pipeline.run().waitUntilFinish(); } @Test @@ -192,10 +188,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - pipeline.run().waitUntilFinish(); - - assertEquals( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", @@ -211,7 +205,9 @@ public class BeamSortRelTest { 8L, 8888, 8.0, 8L, 999, 9.0, 10L, 100, 10.0 - ).getInputRecords(), MockedBeamSqlTable.CONTENT); + ).getInputRecords() + ); + pipeline.run().waitUntilFinish(); } @Test(expected = UnsupportedOperationException.class) @@ -230,16 +226,5 @@ public class BeamSortRelTest { public void prepare() { sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); - MockedBeamSqlTable.CONTENT.clear(); - } - - private void assertEquals(Collection<BeamSqlRow> rows1, Collection<BeamSqlRow> rows2) { - Assert.assertEquals(rows1.size(), rows2.size()); - - Iterator<BeamSqlRow> it1 = rows1.iterator(); - Iterator<BeamSqlRow> it2 = rows2.iterator(); - while (it1.hasNext()) { - Assert.assertEquals(it1.next(), it2.next()); - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/21497194/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index 9a5070a..81b1a13 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -89,9 +88,4 @@ public class BeamValuesRelTest { sqlEnv.registerTable("string_table", stringTable); sqlEnv.registerTable("int_table", intTable); } - - @Before - public void prepare() { - MockedBeamSqlTable.CONTENT.clear(); - } }
