Repository: beam Updated Branches: refs/heads/DSL_SQL 8e9b930bc -> bd99528af
use static table name PCOLLECTION in BeamSql.simpleQuery. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f612049 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f612049 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f612049 Branch: refs/heads/DSL_SQL Commit: 1f612049b83a67070d13aae790d61e0f71d79ca7 Parents: 8e9b930 Author: mingmxu <[email protected]> Authored: Thu Jun 22 16:50:58 2017 -0700 Committer: mingmxu <[email protected]> Committed: Thu Jun 22 16:50:58 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 27 +++++++++++++++----- .../dsls/sql/BeamSqlDslAggregationTest.java | 6 ++--- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 2 +- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 2 +- 4 files changed, 25 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index e68188b..5f90380 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -50,9 +50,8 @@ PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/path .apply(...); //run a simple query, and register the output as a table in BeamSql; -String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; -PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1) - .withUdf("MY_FUNC", myFunc)); +String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; +PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)); //run a JOIN with one table from TextIO, and one table from another query PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of( @@ -91,6 +90,8 @@ public class BeamSql { * * <p>This is a simplified form of {@link #query(String)} where the query must reference * a single input table. + * + * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>. */ public static PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> simpleQuery(String sqlQuery) throws Exception { @@ -151,15 +152,20 @@ public class BeamSql { */ private static class SimpleQueryTransform extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> { + private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; + validateQuery(); } - @Override - public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { + // public SimpleQueryTransform withUdf(String udfName){ + // throw new UnsupportedOperationException("Pending for UDF support"); + // } + + private void validateQuery() { SqlNode sqlNode; try { sqlNode = sqlEnv.planner.parseQuery(sqlQuery); @@ -171,12 +177,19 @@ public class BeamSql { if (sqlNode instanceof SqlSelect) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); - return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input) - .apply(new QueryTransform(sqlQuery, sqlEnv)); + if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) { + throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME); + } } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); } } + + @Override + public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { + return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input) + .apply(new QueryTransform(sqlQuery, sqlEnv)); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index f7349c6..b0509ae 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testAggregationWithoutWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2"; + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamSqlRow> result = inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); @@ -125,7 +125,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testDistinct() throws Exception { - String sql = "SELECT distinct f_int, f_long FROM TABLE_A "; + String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; PCollection<BeamSqlRow> result = inputA1.apply("testDistinct", BeamSql.simpleQuery(sql)); @@ -190,7 +190,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testHopWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION " + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection<BeamSqlRow> result = inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java index b68e526..254b96d 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { */ @Test public void testSingleFilter() throws Exception { - String sql = "SELECT * FROM TABLE_A WHERE f_int = 1"; + String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; PCollection<BeamSqlRow> result = inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql)); http://git-wip-us.apache.org/repos/asf/beam/blob/1f612049/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 2998682..1faa4d0 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { */ @Test public void testSelectAll() throws Exception { - String sql = "SELECT * FROM TABLE_A"; + String sql = "SELECT * FROM PCOLLECTION"; PCollection<BeamSqlRow> result = inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));
