Repository: beam Updated Branches: refs/heads/DSL_SQL d52df7471 -> 1e080e2ba
register table for both BeamSql.simpleQuery and BeamSql.query Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ce59eec7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ce59eec7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ce59eec7 Branch: refs/heads/DSL_SQL Commit: ce59eec7f88dfcbbdb16a0db420e0ce541b47468 Parents: d52df74 Author: mingmxu <[email protected]> Authored: Sat Jun 10 13:46:17 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue Jun 13 17:06:27 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 12 +++++++--- .../beam/dsls/sql/example/BeamSqlExample.java | 24 ++++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ce59eec7/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 809fed3..ae281ac 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -110,6 +110,15 @@ public class BeamSql { @Override public PCollection<BeamSqlRow> expand(PCollectionTuple input) { + //register tables + for (TupleTag<?> sourceTag : input.getAll().keySet()) { + PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); + BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); + + BeamSqlEnv.registerTable(sourceTag.getId(), + new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema().toRelDataType())); + } + BeamRelNode beamRelNode = null; try { beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery); @@ -149,13 +158,10 @@ public class BeamSql { } catch (SqlParseException e) { throw new IllegalStateException(e); } - BeamSqlRowCoder inputCoder = (BeamSqlRowCoder) input.getCoder(); if (sqlNode instanceof SqlSelect) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); - BeamSqlEnv.registerTable(tableName, - new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType())); return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input) .apply(BeamSql.query(sqlQuery)); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/ce59eec7/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 4d7328e..36e1aa9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; import org.apache.calcite.sql.type.SqlTypeName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,16 +60,30 @@ public class BeamSqlExample { PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row) .withCoder(new BeamSqlRowCoder(type))); - //run a simple SQL query over input PCollection; - String sql = "select c2, c3 from TABLE_A where c1=1"; - PCollection<BeamSqlRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql)); + //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; + PCollection<BeamSqlRow> outputStream = inputTable.apply( + BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1")); //log out the output record; outputStream.apply("log_result", MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { + public Void apply(BeamSqlRow input) { + System.out.println("TABLE_A: " + input); + return null; + } + })); + + //Case 2. run the query with BeamSql.query + PCollection<BeamSqlRow> outputStream2 = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable) + .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1")); + + //log out the output record; + outputStream2.apply("log_result", + MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { @Override public Void apply(BeamSqlRow input) { - LOG.info(input.valueInString()); + System.out.println("TABLE_B: " + input); return null; } }));
