Repository: beam Updated Branches: refs/heads/DSL_SQL c2acb54f6 -> 448a32306
rename simpleQuery to query and query to queryMulti Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c580b521 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c580b521 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c580b521 Branch: refs/heads/DSL_SQL Commit: c580b52198d475dd56272bdf4d751991c0c51a15 Parents: c2acb54 Author: mingmxu <[email protected]> Authored: Wed Aug 23 17:39:18 2017 -0700 Committer: mingmxu <[email protected]> Committed: Wed Aug 23 17:39:18 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/extensions/sql/BeamSql.java | 10 +++++----- .../sdk/extensions/sql/example/BeamSqlExample.java | 4 ++-- .../extensions/sql/BeamSqlDslAggregationTest.java | 16 ++++++++-------- .../sdk/extensions/sql/BeamSqlDslFilterTest.java | 12 ++++++------ .../beam/sdk/extensions/sql/BeamSqlDslJoinTest.java | 2 +- .../sdk/extensions/sql/BeamSqlDslProjectTest.java | 12 ++++++------ .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java | 8 ++++---- .../BeamSqlBuiltinFunctionsIntegrationTestBase.java | 2 +- .../BeamSqlDateFunctionsIntegrationTest.java | 2 +- 9 files changed, 34 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index 34355fb..fc80df5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -55,14 +55,14 @@ PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/path //run a simple query, and register the output as a table in BeamSql; String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; PCollection<BeamSqlRow> outputTableA = inputTableA.apply( - BeamSql.simpleQuery(sql1) + BeamSql.query(sql1) .withUdf("MY_FUNC", MY_FUNC.class, "FUNC")); //run a JOIN with one table from TextIO, and one table from another query PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of( new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA) .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB) - .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ...")); + .apply(BeamSql.queryMulti("select * from TABLE_O_A JOIN TABLE_B where ...")); //output the final result with TextIO outputTableB.apply(...).apply(TextIO.write().to("/my/output/path")); @@ -91,19 +91,19 @@ public class BeamSql { * of the current query call.</li> * </ul> */ - public static QueryTransform query(String sqlQuery) { + public static QueryTransform queryMulti(String sqlQuery) { return new QueryTransform(sqlQuery); } /** * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. * - * <p>This is a simplified form of {@link #query(String)} where the query must reference + * <p>This is a simplified form of {@link #queryMulti(String)} where the query must reference * a single input table. * * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>. */ - public static SimpleQueryTransform simpleQuery(String sqlQuery) { + public static SimpleQueryTransform query(String sqlQuery) { return new SimpleQueryTransform(sqlQuery); } http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index 0c5dae1..350bb7b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -63,7 +63,7 @@ class BeamSqlExample { //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection<BeamRecord> outputStream = inputTable.apply( - BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1 > 1")); + BeamSql.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); //print the output record of case 1; outputStream.apply("log_result", @@ -80,7 +80,7 @@ class BeamSqlExample { //Case 2. run the query with BeamSql.query over result PCollection of case 1. PCollection<BeamRecord> outputStream2 = PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream) - .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2")); + .apply(BeamSql.queryMulti("select c2, sum(c3) from CASE1_RESULT group by c2")); //print the output record of case 2; outputStream2.apply("log_result", http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index d99ec20..c0b857d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -51,7 +51,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamRecord> result = - input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); + input.apply("testAggregationWithoutWindow", BeamSql.query(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -93,7 +93,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testAggregationFunctions", BeamSql.query(sql)); + .apply("testAggregationFunctions", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", @@ -139,7 +139,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; PCollection<BeamRecord> result = - input.apply("testDistinct", BeamSql.simpleQuery(sql)); + input.apply("testDistinct", BeamSql.query(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -177,7 +177,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testTumbleWindow", BeamSql.query(sql)); + .apply("testTumbleWindow", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "window_start"), @@ -213,7 +213,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + " FROM PCOLLECTION" + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection<BeamRecord> result = - input.apply("testHopWindow", BeamSql.simpleQuery(sql)); + input.apply("testHopWindow", BeamSql.query(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "window_start"), @@ -252,7 +252,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testSessionWindow", BeamSql.query(sql)); + .apply("testSessionWindow", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "window_start"), @@ -277,7 +277,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) - .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); + .apply("testWindowOnNonTimestampField", BeamSql.queryMulti(sql)); pipeline.run().waitUntilFinish(); } @@ -292,7 +292,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + "FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamRecord> result = - boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); + boundedInput1.apply("testUnsupportedDistinct", BeamSql.query(sql)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java index e1d463b..bd430e5 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java @@ -48,7 +48,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; PCollection<BeamRecord> result = - input.apply("testSingleFilter", BeamSql.simpleQuery(sql)); + input.apply("testSingleFilter", BeamSql.query(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); @@ -77,7 +77,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testCompositeFilter", BeamSql.query(sql)); + .apply("testCompositeFilter", BeamSql.queryMulti(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); @@ -105,7 +105,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testNoReturnFilter", BeamSql.query(sql)); + .apply("testNoReturnFilter", BeamSql.queryMulti(sql)); PAssert.that(result).empty(); @@ -122,7 +122,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) - .apply("testFromInvalidTableName1", BeamSql.query(sql)); + .apply("testFromInvalidTableName1", BeamSql.queryMulti(sql)); pipeline.run().waitUntilFinish(); } @@ -135,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION_NA"; - PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); + PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.query(sql)); pipeline.run().waitUntilFinish(); } @@ -148,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; - PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); + PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.query(sql)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index 47109e0..bbfa3d3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -183,6 +183,6 @@ public class BeamSqlDslJoinTest { ) .and(new TupleTag<BeamRecord>("ORDER_DETAILS2"), ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER) - ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER); + ).apply("join", BeamSql.queryMulti(sql)).setCoder(RESULT_CODER); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java index e36eb2b..b288270 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java @@ -50,7 +50,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { String sql = "SELECT * FROM PCOLLECTION"; PCollection<BeamRecord> result = - input.apply("testSelectAll", BeamSql.simpleQuery(sql)); + input.apply("testSelectAll", BeamSql.query(sql)); PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); @@ -78,7 +78,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testPartialFields", BeamSql.query(sql)); + .apply("testPartialFields", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -112,7 +112,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); + .apply("testPartialFieldsInMultipleRow", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -155,7 +155,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testPartialFieldsInRows", BeamSql.query(sql)); + .apply("testPartialFieldsInRows", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); @@ -198,7 +198,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) - .apply("testLiteralField", BeamSql.query(sql)); + .apply("testLiteralField", BeamSql.queryMulti(sql)); BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("literal_field"), Arrays.asList(Types.INTEGER)); @@ -220,7 +220,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) - .apply("testProjectUnknownField", BeamSql.query(sql)); + .apply("testProjectUnknownField", BeamSql.queryMulti(sql)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 8db9d7a..0d8bc12 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -47,7 +47,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { + " FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamRecord> result1 = boundedInput1.apply("testUdaf1", - BeamSql.simpleQuery(sql1).withUdaf("squaresum1", new SquareSum())); + BeamSql.query(sql1).withUdaf("squaresum1", new SquareSum())); PAssert.that(result1).containsInAnyOrder(record); String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`" @@ -55,7 +55,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { PCollection<BeamRecord> result2 = PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1) .apply("testUdaf2", - BeamSql.query(sql2).withUdaf("squaresum2", new SquareSum())); + BeamSql.queryMulti(sql2).withUdaf("squaresum2", new SquareSum())); PAssert.that(result2).containsInAnyOrder(record); pipeline.run().waitUntilFinish(); @@ -74,14 +74,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; PCollection<BeamRecord> result1 = boundedInput1.apply("testUdf1", - BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class)); + BeamSql.query(sql1).withUdf("cubic1", CubicInteger.class)); PAssert.that(result1).containsInAnyOrder(record); String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; PCollection<BeamRecord> result2 = PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1) .apply("testUdf2", - BeamSql.query(sql2).withUdf("cubic2", new CubicIntegerFn())); + BeamSql.queryMulti(sql2).withUdf("cubic2", new CubicIntegerFn())); PAssert.that(result2).containsInAnyOrder(record); pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index a64afa6..3395269 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -152,7 +152,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { values.add(pair.getValue()); } - PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); + PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.query(getSql())); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder .of(BeamRecordSqlType.create(names, types)) http://git-wip-us.apache.org/repos/asf/beam/blob/c580b521/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java index cda6a3c..1fdb35f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java @@ -64,7 +64,7 @@ public class BeamSqlDateFunctionsIntegrationTest + " FROM PCOLLECTION" ; PCollection<BeamRecord> rows = getTestPCollection().apply( - BeamSql.simpleQuery(sql)); + BeamSql.query(sql)); PAssert.that(rows).satisfies(new Checker()); pipeline.run(); }
