Repository: beam Updated Branches: refs/heads/DSL_SQL a96c3a01f -> 8defe6f21
Test queries on unbounded PCollections with BeamSql DSL API. Also add getTYPE(fieldName) overrides to BeamSqlRow. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0580e8b6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0580e8b6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0580e8b6 Branch: refs/heads/DSL_SQL Commit: 0580e8b639ef77c7a6534b7b91ecad493950a3aa Parents: a96c3a0 Author: mingmxu <[email protected]> Authored: Wed Jul 12 00:08:35 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 10:01:34 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 84 ++++++++---- .../dsls/sql/BeamSqlDslAggregationTest.java | 127 +++++++++++++++---- .../apache/beam/dsls/sql/BeamSqlDslBase.java | 45 ++++++- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 62 +++++++-- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 94 +++++++++++--- 5 files changed, 327 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index 2d7e350..db0ce04 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -152,48 +152,48 @@ public class BeamSqlRow implements Serializable { dataValues.set(index, fieldValue); } - public byte getByte(int idx) { - return (Byte) getFieldValue(idx); + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); } - public short getShort(int idx) { - return (Short) getFieldValue(idx); + public byte getByte(String fieldName) { + return (Byte) getFieldValue(fieldName); } - public int getInteger(int idx) { - return (Integer) getFieldValue(idx); + public short getShort(String fieldName) { + return (Short) getFieldValue(fieldName); } - public float getFloat(int idx) { - return (Float) getFieldValue(idx); + public int getInteger(String fieldName) { + return (Integer) getFieldValue(fieldName); } - public double getDouble(int idx) { - return (Double) getFieldValue(idx); + public float getFloat(String fieldName) { + return (Float) getFieldValue(fieldName); } - public long getLong(int idx) { - return (Long) getFieldValue(idx); + public double getDouble(String fieldName) { + return (Double) getFieldValue(fieldName); } - public String getString(int idx) { - return (String) getFieldValue(idx); + public long getLong(String fieldName) { + return (Long) getFieldValue(fieldName); } - public Date getDate(int idx) { - return (Date) getFieldValue(idx); + public String getString(String fieldName) { + return (String) getFieldValue(fieldName); } - public GregorianCalendar getGregorianCalendar(int idx) { - return (GregorianCalendar) getFieldValue(idx); + public Date getDate(String fieldName) { + return (Date) getFieldValue(fieldName); } - public BigDecimal getBigDecimal(int idx) { - return (BigDecimal) getFieldValue(idx); + public GregorianCalendar getGregorianCalendar(String fieldName) { + return (GregorianCalendar) getFieldValue(fieldName); } - public Object getFieldValue(String fieldName) { - return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + public BigDecimal getBigDecimal(String fieldName) { + return (BigDecimal) getFieldValue(fieldName); } public Object getFieldValue(int fieldIdx) { @@ -281,6 +281,46 @@ public class BeamSqlRow implements Serializable { } } + public byte getByte(int idx) { + return (Byte) getFieldValue(idx); + } + + public short getShort(int idx) { + return (Short) getFieldValue(idx); + } + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public float getFloat(int idx) { + return (Float) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + public int size() { return dataValues.size(); } http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index ac0b1cb..471a856 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -29,18 +29,31 @@ import org.joda.time.Instant; import org.junit.Test; /** - * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window. + * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window + * with BOUNDED PCollection. */ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { /** - * GROUP-BY with single aggregation function. + * GROUP-BY with single aggregation function with bounded PCollection. */ @Test - public void testAggregationWithoutWindow() throws Exception { + public void testAggregationWithoutWindowWithBounded() throws Exception { + runAggregationWithoutWindow(boundedInput1); + } + + /** + * GROUP-BY with single aggregation function with unbounded PCollection. + */ + @Test + public void testAggregationWithoutWindowWithUnbounded() throws Exception { + runAggregationWithoutWindow(unboundedInput1); + } + + private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamSqlRow> result = - inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); + input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -55,10 +68,22 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } /** - * GROUP-BY with multiple aggregation functions. + * GROUP-BY with multiple aggregation functions with bounded PCollection. */ @Test - public void testAggregationFunctions() throws Exception{ + public void testAggregationFunctionsWithBounded() throws Exception{ + runAggregationFunctions(boundedInput1); + } + + /** + * GROUP-BY with multiple aggregation functions with unbounded PCollection. + */ + @Test + public void testAggregationFunctionsWithUnbounded() throws Exception{ + runAggregationFunctions(unboundedInput1); + } + + private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{ String sql = "select f_int2, count(*) as size, " + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," @@ -70,7 +95,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + "FROM TABLE_A group by f_int2"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testAggregationFunctions", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create( @@ -121,14 +146,26 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } /** - * Implicit GROUP-BY with DISTINCT. + * Implicit GROUP-BY with DISTINCT with bounded PCollection. + */ + @Test + public void testDistinctWithBounded() throws Exception { + runDistinct(boundedInput1); + } + + /** + * Implicit GROUP-BY with DISTINCT with unbounded PCollection. */ @Test - public void testDistinct() throws Exception { + public void testDistinctWithUnbounded() throws Exception { + runDistinct(unboundedInput1); + } + + private void runDistinct(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; PCollection<BeamSqlRow> result = - inputA1.apply("testDistinct", BeamSql.simpleQuery(sql)); + input.apply("testDistinct", BeamSql.simpleQuery(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -155,16 +192,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } /** - * GROUP-BY with TUMBLE window(akka fix_time_window). + * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection. */ @Test - public void testTumbleWindow() throws Exception { + public void testTumbleWindowWithBounded() throws Exception { + runTumbleWindow(boundedInput1); + } + + /** + * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection. + */ + @Test + public void testTumbleWindowWithUnbounded() throws Exception { + runTumbleWindow(unboundedInput1); + } + + private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size`," + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" - + " FROM TABLE_A " - + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; + + " FROM TABLE_A" + + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testTumbleWindow", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create( @@ -191,16 +240,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } /** - * GROUP-BY with HOP window(akka sliding_window). + * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection. */ @Test - public void testHopWindow() throws Exception { + public void testHopWindowWithBounded() throws Exception { + runHopWindow(boundedInput1); + } + + /** + * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection. + */ + @Test + public void testHopWindowWithUnbounded() throws Exception { + runHopWindow(unboundedInput1); + } + + private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size`," + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" - + " FROM PCOLLECTION " - + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; + + " FROM PCOLLECTION" + + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection<BeamSqlRow> result = - inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); + input.apply("testHopWindow", BeamSql.simpleQuery(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create( Arrays.asList("f_int2", "size", "window_start"), @@ -240,16 +301,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } /** - * GROUP-BY with SESSION window. + * GROUP-BY with SESSION window with bounded PCollection. + */ + @Test + public void testSessionWindowWithBounded() throws Exception { + runSessionWindow(boundedInput1); + } + + /** + * GROUP-BY with SESSION window with unbounded PCollection. */ @Test - public void testSessionWindow() throws Exception { + public void testSessionWindowWithUnbounded() throws Exception { + runSessionWindow(unboundedInput1); + } + + private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int2, COUNT(*) AS `size`," + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" - + " FROM TABLE_A " - + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; + + " FROM TABLE_A" + + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testSessionWindow", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create( @@ -285,7 +358,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); pipeline.run().waitUntilFinish(); @@ -300,7 +373,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamSqlRow> result = - inputA1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); + boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java index 308dcb6..57fcbc3 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java @@ -28,9 +28,11 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -53,8 +55,13 @@ public class BeamSqlDslBase { public static BeamSqlRecordType recordTypeInTableA; public static List<BeamSqlRow> recordsInTableA; - public PCollection<BeamSqlRow> inputA1; - public PCollection<BeamSqlRow> inputA2; + //bounded PCollections + public PCollection<BeamSqlRow> boundedInput1; + public PCollection<BeamSqlRow> boundedInput2; + + //unbounded PCollections + public PCollection<BeamSqlRow> unboundedInput1; + public PCollection<BeamSqlRow> unboundedInput2; @BeforeClass public static void prepareClass() throws ParseException { @@ -69,11 +76,37 @@ public class BeamSqlDslBase { @Before public void preparePCollections(){ - inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA) - .withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + boundedInput1 = PBegin.in(pipeline).apply("boundedInput1", + Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + + boundedInput2 = PBegin.in(pipeline).apply("boundedInput2", + Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + + unboundedInput1 = prepareUnboundedPCollection1(); + unboundedInput2 = prepareUnboundedPCollection2(); + } + + private PCollection<BeamSqlRow> prepareUnboundedPCollection1() { + TestStream.Builder<BeamSqlRow> values = TestStream + .create(new BeamSqlRowCoder(recordTypeInTableA)); + + for (BeamSqlRow row : recordsInTableA) { + values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); + values = values.addElements(row); + } + + return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity()); + } + + private PCollection<BeamSqlRow> prepareUnboundedPCollection2() { + TestStream.Builder<BeamSqlRow> values = TestStream + .create(new BeamSqlRowCoder(recordTypeInTableA)); + + BeamSqlRow row = recordsInTableA.get(0); + values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); + values = values.addElements(row); - inputA2 = PBegin.in(pipeline).apply("inputA2", Create.of(recordsInTableA.get(0)) - .withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity()); } private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{ http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java index f46f6c5..b4b50c1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -25,18 +25,30 @@ import org.apache.beam.sdk.values.TupleTag; import org.junit.Test; /** - * Tests for WHERE queries. + * Tests for WHERE queries with BOUNDED PCollection. */ public class BeamSqlDslFilterTest extends BeamSqlDslBase { /** - * single filter. + * single filter with bounded PCollection. */ @Test - public void testSingleFilter() throws Exception { + public void testSingleFilterWithBounded() throws Exception { + runSingleFilter(boundedInput1); + } + + /** + * single filter with unbounded PCollection. + */ + @Test + public void testSingleFilterWithUnbounded() throws Exception { + runSingleFilter(unboundedInput1); + } + + private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; PCollection<BeamSqlRow> result = - inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql)); + input.apply("testSingleFilter", BeamSql.simpleQuery(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); @@ -44,15 +56,27 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { } /** - * composite filters. + * composite filters with bounded PCollection. */ @Test - public void testCompositeFilter() throws Exception { + public void testCompositeFilterWithBounded() throws Exception { + runCompositeFilter(boundedInput1); + } + + /** + * composite filters with unbounded PCollection. + */ + @Test + public void testCompositeFilterWithUnbounded() throws Exception { + runCompositeFilter(unboundedInput1); + } + + private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT * FROM TABLE_A" + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testCompositeFilter", BeamSql.query(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); @@ -61,14 +85,26 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { } /** - * nothing return with filters. + * nothing return with filters in bounded PCollection. + */ + @Test + public void testNoReturnFilterWithBounded() throws Exception { + runNoReturnFilter(boundedInput1); + } + + /** + * nothing return with filters in unbounded PCollection. */ @Test - public void testNoReturnFilter() throws Exception { + public void testNoReturnFilterWithUnbounded() throws Exception { + runNoReturnFilter(unboundedInput1); + } + + private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT * FROM TABLE_A WHERE f_int < 1"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testNoReturnFilter", BeamSql.query(sql)); PAssert.that(result).empty(); @@ -85,7 +121,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM TABLE_B WHERE f_int < 1"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) .apply("testFromInvalidTableName1", BeamSql.query(sql)); pipeline.run().waitUntilFinish(); @@ -99,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION_NA"; - PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql)); + PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); pipeline.run().waitUntilFinish(); } @@ -112,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; - PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql)); + PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 877fa4f..10f61b0 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -28,18 +28,30 @@ import org.apache.beam.sdk.values.TupleTag; import org.junit.Test; /** - * Tests for field-project in queries. + * Tests for field-project in queries with BOUNDED PCollection. */ public class BeamSqlDslProjectTest extends BeamSqlDslBase { /** - * select all fields. + * select all fields with bounded PCollection. */ @Test - public void testSelectAll() throws Exception { + public void testSelectAllWithBounded() throws Exception { + runSelectAll(boundedInput2); + } + + /** + * select all fields with unbounded PCollection. + */ + @Test + public void testSelectAllWithUnbounded() throws Exception { + runSelectAll(unboundedInput2); + } + + private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT * FROM PCOLLECTION"; PCollection<BeamSqlRow> result = - inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql)); + input.apply("testSelectAll", BeamSql.simpleQuery(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); @@ -47,14 +59,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { } /** - * select partial fields. + * select partial fields with bounded PCollection. + */ + @Test + public void testPartialFieldsWithBounded() throws Exception { + runPartialFields(boundedInput2); + } + + /** + * select partial fields with unbounded PCollection. */ @Test - public void testPartialFields() throws Exception { + public void testPartialFieldsWithUnbounded() throws Exception { + runPartialFields(unboundedInput2); + } + + private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int, f_long FROM TABLE_A"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testPartialFields", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), @@ -70,14 +94,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { } /** - * select partial fields for multiple rows. + * select partial fields for multiple rows with bounded PCollection. */ @Test - public void testPartialFieldsInMultipleRow() throws Exception { + public void testPartialFieldsInMultipleRowWithBounded() throws Exception { + runPartialFieldsInMultipleRow(boundedInput1); + } + + /** + * select partial fields for multiple rows with unbounded PCollection. + */ + @Test + public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception { + runPartialFieldsInMultipleRow(unboundedInput1); + } + + private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int, f_long FROM TABLE_A"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), @@ -105,14 +141,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { } /** - * select partial fields. + * select partial fields with bounded PCollection. */ @Test - public void testPartialFieldsInRows() throws Exception { + public void testPartialFieldsInRowsWithBounded() throws Exception { + runPartialFieldsInRows(boundedInput1); + } + + /** + * select partial fields with unbounded PCollection. + */ + @Test + public void testPartialFieldsInRowsWithUnbounded() throws Exception { + runPartialFieldsInRows(unboundedInput1); + } + + private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT f_int, f_long FROM TABLE_A"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testPartialFieldsInRows", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), @@ -140,14 +188,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { } /** - * select literal field. + * select literal field with bounded PCollection. + */ + @Test + public void testLiteralFieldWithBounded() throws Exception { + runLiteralField(boundedInput2); + } + + /** + * select literal field with unbounded PCollection. */ @Test - public void testLiteralField() throws Exception { + public void testLiteralFieldWithUnbounded() throws Exception { + runLiteralField(unboundedInput2); + } + + public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception { String sql = "SELECT 1 as literal_field FROM TABLE_A"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testLiteralField", BeamSql.query(sql)); BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"), @@ -170,7 +230,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { String sql = "SELECT f_int_na FROM TABLE_A"; PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2) + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) .apply("testProjectUnknownField", BeamSql.query(sql)); pipeline.run().waitUntilFinish();
