Repository: beam Updated Branches: refs/heads/DSL_SQL 887cf3a1a -> a680904a4
restrict the scope of BeamSqlEnv Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5db2777 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5db2777 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5db2777 Branch: refs/heads/DSL_SQL Commit: c5db2777a4cc7d6d230fba74e7c12fd18fcc07c3 Parents: 887cf3a Author: mingmxu <[email protected]> Authored: Fri Jun 16 18:49:18 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Thu Jun 22 13:18:12 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 57 +++++++++++--------- .../org/apache/beam/dsls/sql/BeamSqlCli.java | 18 +++---- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 12 ++--- .../beam/dsls/sql/example/BeamSqlExample.java | 4 +- .../math/BeamSqlMathBinaryExpression.java | 1 - .../beam/dsls/sql/planner/BeamQueryPlanner.java | 7 +-- .../beam/dsls/sql/rel/BeamAggregationRel.java | 7 +-- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 8 +-- .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 9 ++-- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 7 ++- .../beam/dsls/sql/rel/BeamIntersectRel.java | 8 +-- .../apache/beam/dsls/sql/rel/BeamMinusRel.java | 8 +-- .../beam/dsls/sql/rel/BeamProjectRel.java | 7 +-- .../apache/beam/dsls/sql/rel/BeamRelNode.java | 7 +-- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 10 ++-- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 7 +-- .../apache/beam/dsls/sql/rel/BeamUnionRel.java | 8 +-- .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 6 +-- .../beam/dsls/sql/utils/CalciteUtils.java | 1 - .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 10 ++-- .../beam/dsls/sql/rel/BeamMinusRelTest.java | 10 ++-- .../sql/rel/BeamSetOperatorRelBaseTest.java | 8 +-- .../beam/dsls/sql/rel/BeamSortRelTest.java | 26 ++++----- .../beam/dsls/sql/rel/BeamUnionRelTest.java | 8 +-- .../beam/dsls/sql/rel/BeamValuesRelTest.java | 12 +++-- 25 files changed, 141 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 04fe055..e68188b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -17,9 +17,6 @@ */ package org.apache.beam.dsls.sql; -import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; -import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -47,17 +44,15 @@ PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); //create table from TextIO; -TableSchema tableASchema = ...; PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha")) - .apply(BeamSql.fromTextRow(tableASchema)); -TableSchema tableBSchema = ...; + .apply(...); PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb")) - .apply(BeamSql.fromTextRow(tableBSchema)); + .apply(...); //run a simple query, and register the output as a table in BeamSql; String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; -PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)) - .withUdf("MY_FUNC", myFunc); +PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1) + .withUdf("MY_FUNC", myFunc)); //run a JOIN with one table from TextIO, and one table from another query PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of( @@ -107,35 +102,47 @@ public class BeamSql { */ private static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSqlRow>> { + private BeamSqlEnv sqlEnv; private String sqlQuery; + public QueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; + sqlEnv = new BeamSqlEnv(); + } + + public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) { + this.sqlQuery = sqlQuery; + this.sqlEnv = sqlEnv; } @Override public PCollection<BeamSqlRow> expand(PCollectionTuple input) { - //register tables - for (TupleTag<?> sourceTag : input.getAll().keySet()) { - PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); - BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); - - registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); - } + registerTables(input); BeamRelNode beamRelNode = null; try { - beamRelNode = planner.convertToBeamRel(sqlQuery); + beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery); } catch (ValidationException | RelConversionException | SqlParseException e) { throw new IllegalStateException(e); } try { - return beamRelNode.buildBeamPipeline(input); + return beamRelNode.buildBeamPipeline(input, sqlEnv); } catch (Exception e) { throw new IllegalStateException(e); } } + + //register tables, related with input PCollections. + private void registerTables(PCollectionTuple input){ + for (TupleTag<?> sourceTag : input.getAll().keySet()) { + PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); + BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); + + sqlEnv.registerTable(sourceTag.getId(), + new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); + } + } } /** @@ -144,21 +151,19 @@ public class BeamSql { */ private static class SimpleQueryTransform extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> { + BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; + public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; } - public SimpleQueryTransform withUdf(String udfName){ - throw new UnsupportedOperationException("Pending for UDF support"); - } - @Override public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { SqlNode sqlNode; try { - sqlNode = planner.parseQuery(sqlQuery); - planner.getPlanner().close(); + sqlNode = sqlEnv.planner.parseQuery(sqlQuery); + sqlEnv.planner.getPlanner().close(); } catch (SqlParseException e) { throw new IllegalStateException(e); } @@ -167,7 +172,7 @@ public class BeamSql { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input) - .apply(BeamSql.query(sqlQuery)); + .apply(new QueryTransform(sqlQuery, sqlEnv)); } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java index dbf9a59..50da244 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -17,8 +17,6 @@ */ package org.apache.beam.dsls.sql; -import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; @@ -33,12 +31,11 @@ import org.apache.calcite.plan.RelOptUtil; */ @Experimental public class BeamSqlCli { - /** * Returns a human readable representation of the query execution plan. */ - public static String explainQuery(String sqlString) throws Exception { - BeamRelNode exeTree = planner.convertToBeamRel(sqlString); + public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { + BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); String beamPlan = RelOptUtil.toString(exeTree); return beamPlan; } @@ -46,22 +43,23 @@ public class BeamSqlCli { /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement) throws Exception{ + public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) + throws Exception{ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() .as(PipelineOptions.class); // FlinkPipelineOptions.class options.setJobName("BeamPlanCreator"); Pipeline pipeline = Pipeline.create(options); - return compilePipeline(sqlStatement, pipeline); + return compilePipeline(sqlStatement, pipeline, sqlEnv); } /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline) - throws Exception{ + public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception{ PCollection<BeamSqlRow> resultStream = - planner.compileBeamPipeline(sqlStatement, basePipeline); + sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); return resultStream; } } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index d7715c7..baa2617 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -42,10 +42,10 @@ import org.apache.calcite.tools.Frameworks; * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. */ public class BeamSqlEnv { - static SchemaPlus schema; - static BeamQueryPlanner planner; + SchemaPlus schema; + BeamQueryPlanner planner; - static { + public BeamSqlEnv() { schema = Frameworks.createRootSchema(true); planner = new BeamQueryPlanner(schema); } @@ -53,7 +53,7 @@ public class BeamSqlEnv { /** * Register a UDF function which can be used in SQL expression. */ - public static void registerUdf(String functionName, Class<?> clazz, String methodName) { + public void registerUdf(String functionName, Class<?> clazz, String methodName) { schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName)); } @@ -61,7 +61,7 @@ public class BeamSqlEnv { * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. * */ - public static void registerTable(String tableName, BaseBeamTable table) { + public void registerTable(String tableName, BaseBeamTable table) { schema.add(tableName, new BeamCalciteTable(table.getRecordType())); planner.getSourceTables().put(tableName, table); } @@ -69,7 +69,7 @@ public class BeamSqlEnv { /** * Find {@link BaseBeamTable} by table name. */ - public static BaseBeamTable findTable(String tableName){ + public BaseBeamTable findTable(String tableName){ return planner.getSourceTables().get(tableName); } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 31f8302..5f09fdd 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -63,13 +63,13 @@ class BeamSqlExample { //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection<BeamSqlRow> outputStream = inputTable.apply( - BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1")); + BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1")); //log out the output record; outputStream.apply("log_result", MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { public Void apply(BeamSqlRow input) { - System.out.println("TABLE_A: " + input); + System.out.println("PCOLLECTION: " + input); return null; } })); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java index 11b867a..f79bcf6 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java @@ -57,7 +57,6 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression { /** * The method to check whether operands are numeric or not. - * @param opType */ public boolean isOperandNumeric(SqlTypeName opType) { return SqlTypeName.NUMERIC_TYPES.contains(opType); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 2eaf9e7..6ae8a1e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; @@ -106,12 +107,12 @@ public class BeamQueryPlanner { * which is linked with the given {@code pipeline}. The final output stream is returned as * {@code PCollection} so more operations can be applied. */ - public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline) - throws Exception { + public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. - return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline)); + return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index bcdc44f..7a1d003a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.ArrayList; import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -72,13 +73,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 40fe05c..07b5c7c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.rel; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -49,14 +50,13 @@ public class BeamFilterRel extends Filter implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index 88fff63..58539f8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -56,18 +56,17 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { * which is the persisted PCollection. */ @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName); + BaseBeamTable targetTable = sqlEnv.findTable(sourceName); PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter()); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index ed2bf12..a664ce1 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -40,9 +40,8 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); String stageName = BeamSqlRelUtils.getStageName(this); @@ -55,7 +54,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName); + BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java index 01e1c33..7cab171 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -51,8 +51,8 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode { return new BeamIntersectRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java index bee6c11..b558f4b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -49,8 +49,8 @@ public class BeamMinusRel extends Minus implements BeamRelNode { return new BeamMinusRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index e6331c6..2cdfc72 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -18,6 +18,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -59,13 +60,13 @@ public class BeamProjectRel extends Project implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index aed4b06..d4c98a3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -17,14 +17,14 @@ */ package org.apache.beam.dsls.sql.rel; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.rel.RelNode; /** - * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's - * called by {@code BeamQueryPlanner}. + * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. */ public interface BeamRelNode extends RelNode { @@ -33,5 +33,6 @@ public interface BeamRelNode extends RelNode { * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) * algorithm. */ - PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception; + PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + throws Exception; } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java index 3d41e3a..939c9c8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.rel; import java.io.Serializable; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; @@ -62,12 +62,12 @@ public class BeamSetOperatorRelBase { this.all = all; } - public PCollection<BeamSqlRow> buildBeamPipeline( - PCollectionTuple inputPCollections) throws Exception { + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index 6c7be0b..75f9717 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.utils.CalciteUtils; @@ -119,11 +120,11 @@ public class BeamSortRel extends Sort implements BeamRelNode { } } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); Type windowType = upstream.getWindowingStrategy().getWindowFn() .getWindowTypeDescriptor().getType(); if (!windowType.equals(GlobalWindow.class)) { http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java index 63cf11a..c661585 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; @@ -81,8 +81,8 @@ public class BeamUnionRel extends Union implements BeamRelNode { return new BeamUnionRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index ce75768..030d2c8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -57,8 +57,8 @@ public class BeamValuesRel extends Values implements BeamRelNode { } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { List<BeamSqlRow> rows = new ArrayList<>(tuples.size()); String stageName = BeamSqlRelUtils.getStageName(this); if (tuples.isEmpty()) { http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java index 69ca44b..ac395d3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -74,7 +74,6 @@ public class CalciteUtils { /** * Get the {@code SqlTypeName} for the specified column of a table. - * @return */ public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { return toCalciteType(schema.getFieldsType().get(index)); http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java index 02223c2..47fdc16 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -34,6 +34,8 @@ import org.junit.Test; * Test for {@code BeamIntersectRel}. */ public class BeamIntersectRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable @@ -57,8 +59,8 @@ public class BeamIntersectRelTest { @BeforeClass public static void setUp() { - BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); } @Test @@ -70,7 +72,7 @@ public class BeamIntersectRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +95,7 @@ public class BeamIntersectRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index cd6ba16..688ff8e 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -34,6 +34,8 @@ import org.junit.Test; * Test for {@code BeamMinusRel}. */ public class BeamMinusRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable @@ -58,8 +60,8 @@ public class BeamMinusRelTest { @Before public void setUp() { - BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); MockedBeamSqlTable.CONTENT.clear(); } @@ -72,7 +74,7 @@ public class BeamMinusRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +95,7 @@ public class BeamMinusRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java index 4936062..f10a767 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -42,6 +42,8 @@ import org.junit.Test; * Test for {@code BeamSetOperatorRelBase}. */ public class BeamSetOperatorRelBaseTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); public static final Date THE_DATE = new Date(); @@ -57,7 +59,7 @@ public class BeamSetOperatorRelBaseTest { @BeforeClass public static void prepare() { THE_DATE.setTime(100000); - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); } @Test @@ -71,7 +73,7 @@ public class BeamSetOperatorRelBaseTest { + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); List<BeamSqlRow> expRows = MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -100,7 +102,7 @@ public class BeamSetOperatorRelBaseTest { // use a real pipeline rather than the TestPipeline because we are // testing exceptions, the pipeline will not actually run. Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); - BeamSqlCli.compilePipeline(sql, pipeline1); + BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index cfdbd53..2519984 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -36,6 +36,8 @@ import org.junit.Test; * Test for {@code BeamSortRel}. */ public class BeamSortRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); @@ -69,7 +71,7 @@ public class BeamSortRelTest { + "ORDER BY order_id asc, site_id desc limit 4"; System.out.println(sql); - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -86,7 +88,7 @@ public class BeamSortRelTest { @Test public void testOrderBy_nullsFirst() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -96,7 +98,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -106,7 +108,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -124,7 +126,7 @@ public class BeamSortRelTest { @Test public void testOrderBy_nullsLast() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -134,7 +136,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -144,7 +146,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -167,7 +169,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -190,7 +192,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -221,13 +223,13 @@ public class BeamSortRelTest { + "ORDER BY order_id asc limit 11"; TestPipeline pipeline = TestPipeline.create(); - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); } @Before public void prepare() { - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); + sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); MockedBeamSqlTable.CONTENT.clear(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java index c2a0597..c5aa132 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -34,6 +34,8 @@ import org.junit.Test; * Test for {@code BeamUnionRel}. */ public class BeamUnionRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable @@ -46,7 +48,7 @@ public class BeamUnionRelTest { @BeforeClass public static void prepare() { - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); } @Test @@ -58,7 +60,7 @@ public class BeamUnionRelTest { + " order_id, site_id, price " + "FROM ORDER_DETAILS "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -81,7 +83,7 @@ public class BeamUnionRelTest { + " SELECT order_id, site_id, price " + "FROM ORDER_DETAILS"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index 4557e8e..9a5070a 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -35,6 +35,8 @@ import org.junit.Test; * Test for {@code BeamValuesRel}. */ public class BeamValuesRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable stringTable = MockedBeamSqlTable @@ -49,7 +51,7 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description", @@ -61,7 +63,7 @@ public class BeamValuesRelTest { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "c0", SqlTypeName.INTEGER, "c1", @@ -73,7 +75,7 @@ public class BeamValuesRelTest { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "EXPR$0", SqlTypeName.CHAR, "EXPR$1", @@ -84,8 +86,8 @@ public class BeamValuesRelTest { @BeforeClass public static void prepareClass() { - BeamSqlEnv.registerTable("string_table", stringTable); - BeamSqlEnv.registerTable("int_table", intTable); + sqlEnv.registerTable("string_table", stringTable); + sqlEnv.registerTable("int_table", intTable); } @Before
