This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 14b17ad [BEAM-3634] Refactor BeamRelNode to return a PTransform
(#4705)
14b17ad is described below
commit 14b17ad574342a875c8f99278e18c605aa5b4bc3
Author: Kenn Knowles <[email protected]>
AuthorDate: Fri Feb 23 16:39:21 2018 -0800
[BEAM-3634] Refactor BeamRelNode to return a PTransform (#4705)
* Refactor BeamRelNode to return a PTransform
* Refactor SQL toPTransform to have no parameters
---
.../beam/sdk/extensions/sql/QueryTransform.java | 3 +-
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 13 +-
.../sql/impl/planner/BeamQueryPlanner.java | 21 +-
.../extensions/sql/impl/planner/BeamRuleSets.java | 54 ++--
.../sql/impl/rel/BeamAggregationRel.java | 19 +-
.../sdk/extensions/sql/impl/rel/BeamFilterRel.java | 34 ++-
.../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java | 85 ++++--
.../extensions/sql/impl/rel/BeamIOSourceRel.java | 45 +--
.../extensions/sql/impl/rel/BeamIntersectRel.java | 33 ++-
.../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 328 ++++++++++++---------
.../sdk/extensions/sql/impl/rel/BeamMinusRel.java | 16 +-
.../extensions/sql/impl/rel/BeamProjectRel.java | 37 ++-
.../sdk/extensions/sql/impl/rel/BeamRelNode.java | 15 +-
.../sql/impl/rel/BeamSetOperatorRelBase.java | 14 +-
.../sdk/extensions/sql/impl/rel/BeamSortRel.java | 71 +++--
.../sdk/extensions/sql/impl/rel/BeamUnionRel.java | 15 +-
.../sdk/extensions/sql/impl/rel/BeamValuesRel.java | 40 +--
.../extensions/sql/impl/rule/BeamIOSinkRule.java | 58 ++--
.../extensions/sql/impl/rule/BeamIOSourceRule.java | 30 +-
.../sdk/extensions/sql/impl/rule/BeamJoinRule.java | 33 ++-
.../extensions/sql/impl/rule/BeamProjectRule.java | 37 +--
.../sdk/extensions/sql/BeamSqlDslJoinTest.java | 6 +-
.../interpreter/BeamSqlFnExecutorTestBase.java | 42 +--
.../transform/agg/VarianceAccumulatorTest.java | 6 +-
.../sql/impl/transform/agg/VarianceFnTest.java | 3 +-
25 files changed, 599 insertions(+), 459 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
index 8a7335f..67ccbb1 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
@@ -67,10 +67,11 @@ public abstract class QueryTransform extends
PTransform<PInput, PCollection<Row>
try {
return
+ inputTuple.apply(
sqlEnv
.getPlanner()
.convertToBeamRel(queryString())
- .buildBeamPipeline(inputTuple, sqlEnv);
+ .toPTransform());
} catch (Exception e) {
throw new IllegalStateException(e);
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 6c4c9a4..e81b927 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -55,11 +55,10 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
/**
- * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
- * {@link BeamSqlCli}.
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
{@link BeamSqlCli}.
*
- * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF
functions,
- * and a {@link BeamQueryPlanner} which parse/validate/optimize/translate
input SQL queries.
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF
functions, and a
+ * {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL
queries.
*/
public class BeamSqlEnv implements Serializable {
transient SchemaPlus schema;
@@ -69,7 +68,7 @@ public class BeamSqlEnv implements Serializable {
public BeamSqlEnv() {
tables = new HashMap<>(16);
schema = Frameworks.createRootSchema(true);
- planner = new BeamQueryPlanner(schema);
+ planner = new BeamQueryPlanner(this, schema);
}
/**
@@ -151,7 +150,7 @@ public class BeamSqlEnv implements Serializable {
schema.add(tableName, new BeamCalciteTable(table.getRowType()));
}
}
- planner = new BeamQueryPlanner(schema);
+ planner = new BeamQueryPlanner(this, schema);
}
/**
@@ -216,6 +215,6 @@ public class BeamSqlEnv implements Serializable {
tables = new HashMap<String, BeamSqlTable>(16);
schema = Frameworks.createRootSchema(true);
- planner = new BeamQueryPlanner(schema);
+ planner = new BeamQueryPlanner(this, schema);
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index b9b0fdb..7858c5c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -74,7 +74,7 @@ public class BeamQueryPlanner {
public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
- public BeamQueryPlanner(SchemaPlus schema) {
+ public BeamQueryPlanner(BeamSqlEnv sqlEnv, SchemaPlus schema) {
String defaultCharsetKey = "saffron.default.charset";
if (System.getProperty(defaultCharsetKey) == null) {
System.setProperty(defaultCharsetKey,
ConversionUtil.NATIVE_UTF16_CHARSET_NAME);
@@ -94,12 +94,17 @@ public class BeamQueryPlanner {
new CalciteCatalogReader(
CalciteSchema.from(schema), Collections.emptyList(), TYPE_FACTORY,
null));
- FrameworkConfig config = Frameworks.newConfigBuilder()
-
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .build();
+ FrameworkConfig config =
+ Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build())
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(BeamRuleSets.getRuleSets(sqlEnv))
+ .costFactory(null)
+ .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .build();
this.planner = Frameworks.getPlanner(config);
for (String t : schema.getTableNames()) {
@@ -124,7 +129,7 @@ public class BeamQueryPlanner {
BeamRelNode relNode = convertToBeamRel(sqlStatement);
// the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
- return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline),
sqlEnv);
+ return PCollectionTuple.empty(basePipeline).apply(relNode.toPTransform());
}
/**
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index d3c9871..1d10816 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -17,9 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.planner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamFilterRule;
@@ -32,44 +30,30 @@ import
org.apache.beam.sdk.extensions.sql.impl.rule.BeamProjectRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamValuesRule;
-import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.tools.RuleSets;
/**
- * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
- * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
- *
+ * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
Calcite {@link
+ * RelNode} tree, to represent with {@link BeamRelNode}
*/
public class BeamRuleSets {
- private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules =
ImmutableSet
- .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE,
BeamProjectRule.INSTANCE,
- BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
- BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE,
BeamValuesRule.INSTANCE,
- BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE,
BeamUnionRule.INSTANCE,
- BeamJoinRule.INSTANCE)
- .build();
- public static RuleSet[] getRuleSets() {
- return new RuleSet[] { new BeamRuleSet(
-
ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build())
};
+ public static RuleSet[] getRuleSets(BeamSqlEnv sqlEnv) {
+ return new RuleSet[] {
+ RuleSets.ofList(
+ BeamIOSourceRule.forSqlEnv(sqlEnv),
+ BeamProjectRule.INSTANCE,
+ BeamFilterRule.INSTANCE,
+ BeamIOSinkRule.forSqlEnv(sqlEnv),
+ BeamAggregationRule.INSTANCE,
+ BeamSortRule.INSTANCE,
+ BeamValuesRule.INSTANCE,
+ BeamIntersectRule.INSTANCE,
+ BeamMinusRule.INSTANCE,
+ BeamUnionRule.INSTANCE,
+ BeamJoinRule.forSqlEnv(sqlEnv))
+ };
}
-
- private static class BeamRuleSet implements RuleSet {
- final ImmutableSet<RelOptRule> rules;
-
- public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
- this.rules = rules;
- }
-
- public BeamRuleSet(ImmutableList<RelOptRule> rules) {
- this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
- }
-
- @Override
- public Iterator<RelOptRule> iterator() {
- return rules.iterator();
- }
- }
-
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index f1fb12d..a35e64c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -24,12 +24,12 @@ import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rule.AggregateWindowField;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.WithTimestamps;
@@ -77,15 +77,19 @@ public class BeamAggregationRel extends Aggregate
implements BeamRelNode {
}
@Override
- public PCollection<Row> buildBeamPipeline(
- PCollectionTuple inputPCollections,
- BeamSqlEnv sqlEnv) throws Exception {
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
+
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
+
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this) + "_";
+ String stageName = BeamSqlRelUtils.getStageName(BeamAggregationRel.this)
+ "_";
- PCollection<Row> upstream =
-
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections,
sqlEnv);
+ PCollection<Row> upstream =
+
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
if (windowField.isPresent()) {
upstream = upstream.apply(stageName + "assignEventTimestamp",
WithTimestamps
.of(new
BeamAggregationTransforms.WindowTimestampFn(windowFieldIndex))
@@ -188,6 +192,7 @@ public class BeamAggregationRel extends Aggregate
implements BeamRelNode {
RowType
.newField(aggCall.name,
CalciteUtils.toCoder(aggCall.type.getSqlTypeName()));
}
+ }
@Override
public Aggregate copy(
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
index d684636..ec21a9b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -49,23 +49,27 @@ public class BeamFilterRel extends Filter implements
BeamRelNode {
}
@Override
- public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
- PCollection<Row> upstream =
-
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections,
sqlEnv);
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
- BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(BeamFilterRel.this);
- PCollection<Row> filterStream = upstream
- .apply(
- stageName,
- ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-
filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+ PCollection<Row> upstream =
+
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
- return filterStream;
- }
+ BeamSqlExpressionExecutor executor = new
BeamSqlFnExecutor(BeamFilterRel.this);
+ PCollection<Row> filterStream =
+ upstream.apply(stageName, ParDo.of(new
BeamSqlFilterFn(getRelTypeName(), executor)));
+
filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+
+ return filterStream;
+ }
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index d38b2ac..6afa8b1 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
@@ -32,44 +33,76 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rex.RexNode;
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
+/** BeamRelNode to replace a {@code TableModify} node. */
public class BeamIOSinkRel extends TableModify implements BeamRelNode {
- public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable
table,
- Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
- List<String> updateColumnList, List<RexNode> sourceExpressionList,
boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation,
updateColumnList,
- sourceExpressionList, flattened);
+
+ private final BeamSqlEnv sqlEnv;
+
+ public BeamIOSinkRel(
+ BeamSqlEnv sqlEnv,
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelOptTable table,
+ Prepare.CatalogReader catalogReader,
+ RelNode child,
+ Operation operation,
+ List<String> updateColumnList,
+ List<RexNode> sourceExpressionList,
+ boolean flattened) {
+ super(
+ cluster,
+ traits,
+ table,
+ catalogReader,
+ child,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened);
+ this.sqlEnv = sqlEnv;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new BeamIOSinkRel(getCluster(), traitSet, getTable(),
getCatalogReader(), sole(inputs),
- getOperation(), getUpdateColumnList(), getSourceExpressionList(),
isFlattened());
+ return new BeamIOSinkRel(
+ sqlEnv,
+ getCluster(),
+ traitSet,
+ getTable(),
+ getCatalogReader(),
+ sole(inputs),
+ getOperation(),
+ getUpdateColumnList(),
+ getSourceExpressionList(),
+ isFlattened());
}
- /**
- * Note that {@code BeamIOSinkRel} returns the input PCollection,
- * which is the persisted PCollection.
- */
@Override
- public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
- PCollection<Row> upstream =
-
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections,
sqlEnv);
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+ /**
+ * Note that {@code BeamIOSinkRel} returns the input PCollection, which is
the persisted
+ * PCollection.
+ */
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(BeamIOSinkRel.this);
- BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
+ PCollection<Row> upstream =
+
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
- upstream.apply(stageName, targetTable.buildIOWriter());
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
- return upstream;
- }
+ BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
+ upstream.apply(stageName, targetTable.buildIOWriter());
+
+ return upstream;
+ }
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 23f6a4f..64bfd26 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
@@ -36,27 +37,37 @@ import org.apache.calcite.rel.core.TableScan;
*/
public class BeamIOSourceRel extends TableScan implements BeamRelNode {
- public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable table) {
+ private BeamSqlEnv sqlEnv;
+
+ public BeamIOSourceRel(
+ BeamSqlEnv sqlEnv, RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable table) {
super(cluster, traitSet, table);
+ this.sqlEnv = sqlEnv;
}
@Override
- public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- TupleTag<Row> sourceTupleTag = new TupleTag<>(sourceName);
- if (inputPCollections.has(sourceTupleTag)) {
- //choose PCollection from input PCollectionTuple if exists there.
- PCollection<Row> sourceStream = inputPCollections
- .get(new TupleTag<Row>(sourceName));
- return sourceStream;
- } else {
- //If not, the source PColection is provided with
BaseBeamTable.buildIOReader().
- BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
- return sourceTable.buildIOReader(inputPCollections.getPipeline())
- .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
- }
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
}
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+ TupleTag<Row> sourceTupleTag = new TupleTag<>(sourceName);
+ if (inputPCollections.has(sourceTupleTag)) {
+ // choose PCollection from input PCollectionTuple if exists there.
+ PCollection<Row> sourceStream = inputPCollections.get(new
TupleTag<Row>(sourceName));
+ return sourceStream;
+ } else {
+ // If not, the source PColection is provided with
BaseBeamTable.buildIOReader().
+ BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
+ return sourceTable
+ .buildIOReader(inputPCollections.getPipeline())
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+ }
+ }
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index 7c28ea7..b5002ea 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
@@ -32,27 +32,34 @@ import org.apache.calcite.rel.core.SetOp;
/**
* {@code BeamRelNode} to replace a {@code Intersect} node.
*
- * <p>This is used to combine two SELECT statements, but returns rows only
from the
- * first SELECT statement that are identical to a row in the second SELECT
statement.
+ * <p>This is used to combine two SELECT statements, but returns rows only
from the first SELECT
+ * statement that are identical to a row in the second SELECT statement.
*/
public class BeamIntersectRel extends Intersect implements BeamRelNode {
private BeamSetOperatorRelBase delegate;
+
public BeamIntersectRel(
- RelOptCluster cluster,
- RelTraitSet traits,
- List<RelNode> inputs,
- boolean all) {
+ RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean
all) {
super(cluster, traits, inputs, all);
- delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+ delegate =
+ new BeamSetOperatorRelBase(this,
BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
}
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs,
boolean all) {
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple
inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+ @Override
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
+
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ return delegate.buildBeamPipeline(inputPCollections);
+ }
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 89196ef..615463b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
@@ -92,128 +93,153 @@ import org.apache.calcite.util.Pair;
* </ul>
*/
public class BeamJoinRel extends Join implements BeamRelNode {
- public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left,
RelNode right,
- RexNode condition, Set<CorrelationId> variablesSet, JoinRelType
joinType) {
+ private final BeamSqlEnv sqlEnv;
+
+ public BeamJoinRel(
+ BeamSqlEnv sqlEnv,
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode left,
+ RelNode right,
+ RexNode condition,
+ Set<CorrelationId> variablesSet,
+ JoinRelType joinType) {
super(cluster, traits, left, right, condition, variablesSet, joinType);
+ this.sqlEnv = sqlEnv;
}
- @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr,
RelNode left,
- RelNode right, JoinRelType joinType, boolean semiJoinDone) {
- return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr,
variablesSet,
+ @Override
+ public Join copy(
+ RelTraitSet traitSet,
+ RexNode conditionExpr,
+ RelNode left,
+ RelNode right,
+ JoinRelType joinType,
+ boolean semiJoinDone) {
+ return new BeamJoinRel(sqlEnv, getCluster(), traitSet, left, right,
conditionExpr, variablesSet,
joinType);
}
- @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple
inputPCollections,
- BeamSqlEnv sqlEnv)
- throws Exception {
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- RowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
- final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+ @Override
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
- if (!seekable(leftRelNode, sqlEnv) && seekable(rightRelNode, sqlEnv)) {
- return joinAsLookup(leftRelNode, rightRelNode, inputPCollections, sqlEnv)
-
.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
- }
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
- PCollection<Row> leftRows =
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
- PCollection<Row> rightRows =
rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
- verifySupportedTrigger(leftRows);
- verifySupportedTrigger(rightRows);
-
- String stageName = BeamSqlRelUtils.getStageName(this);
- WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
-
- // extract the join fields
- List<Pair<Integer, Integer>> pairs = extractJoinColumns(
- leftRelNode.getRowType().getFieldCount());
-
- // build the extract key type
- // the name of the join field is not important
- RowType extractKeyRowType =
- pairs
- .stream()
- .map(pair ->
- RowType.newField(
- leftRowType.getFieldName(pair.getKey()),
- leftRowType.getFieldCoder(pair.getKey())))
- .collect(toRowType());
-
- Coder extractKeyRowCoder = extractKeyRowType.getRowCoder();
-
- // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
- PCollection<KV<Row, Row>> extractedLeftRows = leftRows
- .apply(stageName + "_left_ExtractJoinFields",
- MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true,
pairs)))
- .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
-
- PCollection<KV<Row, Row>> extractedRightRows = rightRows
- .apply(stageName + "_right_ExtractJoinFields",
- MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false,
pairs)))
- .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
-
- // prepare the NullRows
- Row leftNullRow = buildNullRow(leftRelNode);
- Row rightNullRow = buildNullRow(rightRelNode);
-
- // a regular join
- if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
- || (leftRows.isBounded() == UNBOUNDED
- && rightRows.isBounded() == UNBOUNDED)) {
- try {
- leftWinFn.verifyCompatibility(rightWinFn);
- } catch (IncompatibleWindowException e) {
- throw new IllegalArgumentException(
- "WindowFns must match for a
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
- }
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+ RowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+ final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- return standardJoin(extractedLeftRows, extractedRightRows,
- leftNullRow, rightNullRow, stageName);
- } else if (
- (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRows.isBounded() == UNBOUNDED)
- || (leftRows.isBounded() == UNBOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
- ) {
- // if one of the sides is Bounded & the other is Unbounded
- // then do a sideInput join
- // when doing a sideInput join, the windowFn does not need to match
- // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join
must be
- // the unbounded
- if (joinType == JoinRelType.FULL) {
- throw new UnsupportedOperationException("FULL OUTER JOIN is not
supported when join "
- + "a bounded table with an unbounded table.");
+ if (!seekable(leftRelNode, sqlEnv) && seekable(rightRelNode, sqlEnv)) {
+ return joinAsLookup(leftRelNode, rightRelNode, inputPCollections,
sqlEnv)
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
}
- if ((joinType == JoinRelType.LEFT
- && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
- || (joinType == JoinRelType.RIGHT
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+ PCollection<Row> leftRows = inputPCollections.apply("left",
leftRelNode.toPTransform());
+ PCollection<Row> rightRows =
+ inputPCollections.apply("right", rightRelNode.toPTransform());
+
+ verifySupportedTrigger(leftRows);
+ verifySupportedTrigger(rightRows);
+
+ String stageName = BeamSqlRelUtils.getStageName(BeamJoinRel.this);
+ WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
+ WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
+
+ // extract the join fields
+ List<Pair<Integer, Integer>> pairs =
+ extractJoinColumns(leftRelNode.getRowType().getFieldCount());
+
+ // build the extract key type
+ // the name of the join field is not important
+ RowType extractKeyRowType =
+ pairs
+ .stream()
+ .map(
+ pair ->
+ RowType.newField(
+ leftRowType.getFieldName(pair.getKey()),
+ leftRowType.getFieldCoder(pair.getKey())))
+ .collect(toRowType());
+
+ Coder extractKeyRowCoder = extractKeyRowType.getRowCoder();
+
+ // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
+ PCollection<KV<Row, Row>> extractedLeftRows =
+ leftRows
+ .apply(
+ stageName + "_left_ExtractJoinFields",
+ MapElements.via(new
BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+ .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
+
+ PCollection<KV<Row, Row>> extractedRightRows =
+ rightRows
+ .apply(
+ stageName + "_right_ExtractJoinFields",
+ MapElements.via(new
BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+ .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
+
+ // prepare the NullRows
+ Row leftNullRow = buildNullRow(leftRelNode);
+ Row rightNullRow = buildNullRow(rightRelNode);
+
+ // a regular join
+ if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ || (leftRows.isBounded() == UNBOUNDED && rightRows.isBounded() ==
UNBOUNDED)) {
+ try {
+ leftWinFn.verifyCompatibility(rightWinFn);
+ } catch (IncompatibleWindowException e) {
+ throw new IllegalArgumentException(
+ "WindowFns must match for a
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+ }
+
+ return standardJoin(
+ extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow,
stageName);
+ } else if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+ && rightRows.isBounded() == UNBOUNDED)
+ || (leftRows.isBounded() == UNBOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+ // if one of the sides is Bounded & the other is Unbounded
+ // then do a sideInput join
+ // when doing a sideInput join, the windowFn does not need to match
+ // Only support INNER JOIN & LEFT OUTER JOIN where left side of the
join must be
+ // the unbounded
+ if (joinType == JoinRelType.FULL) {
+ throw new UnsupportedOperationException(
+ "FULL OUTER JOIN is not supported when join "
+ + "a bounded table with an unbounded table.");
+ }
+
+ if ((joinType == JoinRelType.LEFT && leftRows.isBounded() ==
PCollection.IsBounded.BOUNDED)
+ || (joinType == JoinRelType.RIGHT
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+ throw new UnsupportedOperationException(
+ "LEFT side of an OUTER JOIN must be Unbounded table.");
+ }
+
+ return sideInputJoin(extractedLeftRows, extractedRightRows,
leftNullRow, rightNullRow);
+ } else {
throw new UnsupportedOperationException(
- "LEFT side of an OUTER JOIN must be Unbounded table.");
+ "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn
+ ", " + rightWinFn);
}
-
- return sideInputJoin(extractedLeftRows, extractedRightRows,
- leftNullRow, rightNullRow);
- } else {
- throw new UnsupportedOperationException(
- "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn +
", " + rightWinFn);
}
}
private void verifySupportedTrigger(PCollection<Row> pCollection) {
WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
- if (UNBOUNDED.equals(pCollection.isBounded())
- && !triggersOncePerWindow(windowingStrategy)) {
+ if (UNBOUNDED.equals(pCollection.isBounded()) &&
!triggersOncePerWindow(windowingStrategy)) {
throw new UnsupportedOperationException(
"Joining unbounded PCollections is currently only supported for "
+ "non-global windows with triggers that are known to produce
output once per window,"
+ "such as the default trigger with zero allowed lateness. "
+ "In these cases Beam can guarantee it joins all input elements
once per window. "
- + windowingStrategy + " is not supported");
+ + windowingStrategy
+ + " is not supported");
}
}
@@ -228,69 +254,79 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
private PCollection<Row> standardJoin(
PCollection<KV<Row, Row>> extractedLeftRows,
PCollection<KV<Row, Row>> extractedRightRows,
- Row leftNullRow, Row rightNullRow, String stageName) {
+ Row leftNullRow,
+ Row rightNullRow,
+ String stageName) {
PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
switch (joinType) {
case LEFT:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .leftOuterJoin(extractedLeftRows, extractedRightRows,
rightNullRow);
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
+ extractedLeftRows, extractedRightRows, rightNullRow);
break;
case RIGHT:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .rightOuterJoin(extractedLeftRows, extractedRightRows,
leftNullRow);
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(
+ extractedLeftRows, extractedRightRows, leftNullRow);
break;
case FULL:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
- rightNullRow);
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(
+ extractedLeftRows, extractedRightRows, leftNullRow,
rightNullRow);
break;
case INNER:
default:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .innerJoin(extractedLeftRows, extractedRightRows);
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(
+ extractedLeftRows, extractedRightRows);
break;
}
- PCollection<Row> ret = joinedRows
- .apply(stageName + "_JoinParts2WholeRow",
- MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
- .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+ PCollection<Row> ret =
+ joinedRows
+ .apply(
+ stageName + "_JoinParts2WholeRow",
+ MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
return ret;
}
public PCollection<Row> sideInputJoin(
PCollection<KV<Row, Row>> extractedLeftRows,
PCollection<KV<Row, Row>> extractedRightRows,
- Row leftNullRow, Row rightNullRow) {
+ Row leftNullRow,
+ Row rightNullRow) {
// we always make the Unbounded table on the left to do the sideInput join
// (will convert the result accordingly before return)
boolean swapped = (extractedLeftRows.isBounded() ==
PCollection.IsBounded.BOUNDED);
JoinRelType realJoinType =
(swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT :
joinType;
- PCollection<KV<Row, Row>> realLeftRows =
- swapped ? extractedRightRows : extractedLeftRows;
- PCollection<KV<Row, Row>> realRightRows =
- swapped ? extractedLeftRows : extractedRightRows;
+ PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows :
extractedLeftRows;
+ PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows :
extractedRightRows;
Row realRightNullRow = swapped ? leftNullRow : rightNullRow;
// swapped still need to pass down because, we need to swap the result
back.
- return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
- realRightNullRow, swapped);
+ return sideInputJoinHelper(
+ realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
}
private PCollection<Row> sideInputJoinHelper(
JoinRelType joinType,
PCollection<KV<Row, Row>> leftRows,
PCollection<KV<Row, Row>> rightRows,
- Row rightNullRow, boolean swapped) {
- final PCollectionView<Map<Row, Iterable<Row>>> rowsView =
- rightRows.apply(View.asMultimap());
-
- PCollection<Row> ret = leftRows
- .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
- joinType, rightNullRow, rowsView,
swapped)).withSideInputs(rowsView))
- .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+ Row rightNullRow,
+ boolean swapped) {
+ final PCollectionView<Map<Row, Iterable<Row>>> rowsView =
rightRows.apply(View.asMultimap());
+
+ PCollection<Row> ret =
+ leftRows
+ .apply(
+ ParDo.of(
+ new BeamJoinTransforms.SideInputJoinDoFn(
+ joinType, rightNullRow, rowsView, swapped))
+ .withSideInputs(rowsView))
+ .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
return ret;
}
@@ -324,28 +360,34 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
return pairs;
}
- private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
- int leftRowColumnCount) {
+ private Pair<Integer, Integer> extractOneJoinColumn(
+ RexCall oneCondition, int leftRowColumnCount) {
List<RexNode> operands = oneCondition.getOperands();
- final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
- ((RexInputRef) operands.get(1)).getIndex());
+ final int leftIndex =
+ Math.min(
+ ((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef)
operands.get(1)).getIndex());
- final int rightIndex1 = Math.max(((RexInputRef)
operands.get(0)).getIndex(),
- ((RexInputRef) operands.get(1)).getIndex());
+ final int rightIndex1 =
+ Math.max(
+ ((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef)
operands.get(1)).getIndex());
final int rightIndex = rightIndex1 - leftRowColumnCount;
return new Pair<>(leftIndex, rightIndex);
}
- private PCollection<Row> joinAsLookup(BeamRelNode leftRelNode,
- BeamRelNode rightRelNode,
- PCollectionTuple inputPCollections,
- BeamSqlEnv sqlEnv) throws Exception {
- PCollection<Row> factStream =
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ private PCollection<Row> joinAsLookup(
+ BeamRelNode leftRelNode,
+ BeamRelNode rightRelNode,
+ PCollectionTuple inputPCollections,
+ BeamSqlEnv sqlEnv) {
+ PCollection<Row> factStream =
inputPCollections.apply(leftRelNode.toPTransform());
BeamSqlSeekableTable seekableTable =
getSeekableTableFromRelNode(rightRelNode, sqlEnv);
- return factStream.apply("join_as_lookup",
- new BeamJoinTransforms.JoinAsLookup(condition, seekableTable,
+ return factStream.apply(
+ "join_as_lookup",
+ new BeamJoinTransforms.JoinAsLookup(
+ condition,
+ seekableTable,
CalciteUtils.toBeamRowType(rightRelNode.getRowType()),
CalciteUtils.toBeamRowType(leftRelNode.getRowType()).getFieldCount()));
}
@@ -357,9 +399,7 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
return (BeamSqlSeekableTable) sourceTable;
}
- /**
- * check if {@code BeamRelNode} implements {@code BeamSeekableTable}.
- */
+ /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
private boolean seekable(BeamRelNode relNode, BeamSqlEnv sqlEnv) {
if (relNode instanceof BeamIOSourceRel) {
BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
@@ -370,5 +410,5 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
}
}
return false;
-}
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 9fdafda..7f4fc2c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
@@ -49,8 +49,16 @@ public class BeamMinusRel extends Minus implements
BeamRelNode {
return new BeamMinusRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple
inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+ @Override
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
+
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ return delegate.buildBeamPipeline(inputPCollections);
+ }
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
index ea89874..b8bf644 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -18,11 +18,11 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -59,22 +59,31 @@ public class BeamProjectRel extends Project implements
BeamRelNode {
}
@Override
- public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
- PCollection<Row> upstream =
-
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections,
sqlEnv);
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
- BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(BeamProjectRel.this);
- PCollection<Row> projectStream = upstream.apply(stageName, ParDo
- .of(new BeamSqlProjectFn(getRelTypeName(), executor,
- CalciteUtils.toBeamRowType(rowType))));
-
projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+ PCollection<Row> upstream =
+
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
- return projectStream;
- }
+ BeamSqlExpressionExecutor executor = new
BeamSqlFnExecutor(BeamProjectRel.this);
+ PCollection<Row> projectStream =
+ upstream.apply(
+ stageName,
+ ParDo.of(
+ new BeamSqlProjectFn(
+ getRelTypeName(), executor,
CalciteUtils.toBeamRowType(rowType))));
+
projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+
+ return projectStream;
+ }
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index aa56745..1cdde33 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -17,23 +17,18 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.rel.RelNode;
-/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is
added.
- */
+/** A {@link RelNode} that can also give a {@link PTransform} that implements
the expression. */
public interface BeamRelNode extends RelNode {
/**
- * A {@link BeamRelNode} is a recursive structure, the
- * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
- * algorithm.
+ * A {@link BeamRelNode} is a recursive structure, the {@code
BeamQueryPlanner} visits it with a
+ * DFS(Depth-First-Search) algorithm.
*/
- PCollection<Row> buildBeamPipeline(
- PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
- throws Exception;
+ PTransform<PCollectionTuple, PCollection<Row>> toPTransform();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index 0a9af42..ea536ac 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.io.Serializable;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
@@ -62,12 +61,13 @@ public class BeamSetOperatorRelBase {
this.all = all;
}
- public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- PCollection<Row> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
- .buildBeamPipeline(inputPCollections, sqlEnv);
- PCollection<Row> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
- .buildBeamPipeline(inputPCollections, sqlEnv);
+ public PCollection<Row> buildBeamPipeline(PCollectionTuple
inputPCollections) {
+ PCollection<Row> leftRows =
+ inputPCollections.apply(
+ "left",
BeamSqlRelUtils.getBeamRelInput(inputs.get(0)).toPTransform());
+ PCollection<Row> rightRows =
+ inputPCollections.apply(
+ "right",
BeamSqlRelUtils.getBeamRelInput(inputs.get(1)).toPTransform());
WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 16cdc7e..0633668 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -119,41 +119,49 @@ public class BeamSortRel extends Sort implements
BeamRelNode {
}
}
- @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple
inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- PCollection<Row> upstream = BeamSqlRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(inputPCollections, sqlEnv);
- Type windowType = upstream.getWindowingStrategy().getWindowFn()
- .getWindowTypeDescriptor().getType();
- if (!windowType.equals(GlobalWindow.class)) {
- throw new UnsupportedOperationException(
- "`ORDER BY` is only supported for GlobalWindow, actual window: " +
windowType);
- }
+ @Override
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
- BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices,
orientation,
- nullsFirst);
- // first find the top (offset + count)
- PCollection<List<Row>> rawStream =
- upstream
- .apply(
- "extractTopOffsetAndFetch",
- Top.of(startIndex + count, comparator).withoutDefaults())
- .setCoder(ListCoder.of(upstream.getCoder()));
-
- // strip the `leading offset`
- if (startIndex > 0) {
- rawStream =
- rawStream
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ RelNode input = getInput();
+ PCollection<Row> upstream =
+
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+ Type windowType =
+
upstream.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor().getType();
+ if (!windowType.equals(GlobalWindow.class)) {
+ throw new UnsupportedOperationException(
+ "`ORDER BY` is only supported for GlobalWindow, actual window: " +
windowType);
+ }
+
+ BeamSqlRowComparator comparator =
+ new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
+ // first find the top (offset + count)
+ PCollection<List<Row>> rawStream =
+ upstream
.apply(
- "stripLeadingOffset", ParDo.of(new SubListFn<>(startIndex,
startIndex + count)))
+ "extractTopOffsetAndFetch",
+ Top.of(startIndex + count, comparator).withoutDefaults())
.setCoder(ListCoder.of(upstream.getCoder()));
- }
- PCollection<Row> orderedStream = rawStream.apply("flatten",
Flatten.iterables());
-
orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+ // strip the `leading offset`
+ if (startIndex > 0) {
+ rawStream =
+ rawStream
+ .apply(
+ "stripLeadingOffset", ParDo.of(new SubListFn<>(startIndex,
startIndex + count)))
+ .setCoder(ListCoder.of(upstream.getCoder()));
+ }
+
+ PCollection<Row> orderedStream = rawStream.apply("flatten",
Flatten.iterables());
+
orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
- return orderedStream;
+ return orderedStream;
+ }
}
private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
@@ -233,4 +241,5 @@ public class BeamSortRel extends Sort implements
BeamRelNode {
return 0;
}
}
+
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index f8d34c2..f828597 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -76,8 +76,15 @@ public class BeamUnionRel extends Union implements
BeamRelNode {
return new BeamUnionRel(getCluster(), traitSet, inputs, all);
}
- @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple
inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+ @Override
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
+
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+ return delegate.buildBeamPipeline(inputPCollections);
+ }
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 1e98968..6d87998 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -25,9 +25,9 @@ import static org.apache.beam.sdk.values.Row.toRow;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.stream.IntStream;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
@@ -58,28 +58,30 @@ public class BeamValuesRel extends Values implements
BeamRelNode {
}
- @Override public PCollection<Row> buildBeamPipeline(
- PCollectionTuple inputPCollections,
- BeamSqlEnv sqlEnv) throws Exception {
+ @Override
+ public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+ return new Transform();
+ }
- String stageName = BeamSqlRelUtils.getStageName(this);
- if (tuples.isEmpty()) {
- throw new IllegalStateException("Values with empty tuples!");
- }
+ private class Transform extends PTransform<PCollectionTuple,
PCollection<Row>> {
- RowType rowType = CalciteUtils.toBeamRowType(this.getRowType());
+ @Override
+ public PCollection<Row> expand(PCollectionTuple inputPCollections) {
- List<Row> rows =
- tuples
- .stream()
- .map(tuple -> tupleToRow(rowType, tuple))
- .collect(toList());
+ String stageName = BeamSqlRelUtils.getStageName(BeamValuesRel.this);
+ if (tuples.isEmpty()) {
+ throw new IllegalStateException("Values with empty tuples!");
+ }
- return
- inputPCollections
- .getPipeline()
- .apply(stageName, Create.of(rows))
- .setCoder(rowType.getRowCoder());
+ RowType rowType = CalciteUtils.toBeamRowType(getRowType());
+
+ List<Row> rows = tuples.stream().map(tuple -> tupleToRow(rowType,
tuple)).collect(toList());
+
+ return inputPCollections
+ .getPipeline()
+ .apply(stageName, Create.of(rows))
+ .setCoder(rowType.getRowCoder());
+ }
}
private Row tupleToRow(RowType rowType, ImmutableList<RexLiteral> tuple) {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
index 77f4bdd..866298f 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rule;
import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.calcite.plan.Convention;
@@ -32,17 +33,22 @@ import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.Table;
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
+/** A {@code ConverterRule} to replace {@link TableModify} with {@link
BeamIOSinkRel}. */
public class BeamIOSinkRule extends ConverterRule {
- public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
- private BeamIOSinkRule() {
- super(LogicalTableModify.class, Convention.NONE,
BeamLogicalConvention.INSTANCE,
+ private final BeamSqlEnv sqlEnv;
+
+ public static BeamIOSinkRule forSqlEnv(BeamSqlEnv sqlEnv) {
+ return new BeamIOSinkRule(sqlEnv);
+ }
+
+ private BeamIOSinkRule(BeamSqlEnv sqlEnv) {
+ super(
+ LogicalTableModify.class,
+ Convention.NONE,
+ BeamLogicalConvention.INSTANCE,
"BeamIOSinkRule");
+ this.sqlEnv = sqlEnv;
}
@Override
@@ -54,8 +60,8 @@ public class BeamIOSinkRule extends ConverterRule {
final RelTraitSet traitSet =
tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
final RelOptTable relOptTable = tableModify.getTable();
final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput = convert(input,
- input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+ final RelNode convertedInput =
+ convert(input,
input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
final TableModify.Operation operation = tableModify.getOperation();
final List<String> updateColumnList = tableModify.getUpdateColumnList();
final List<RexNode> sourceExpressionList =
tableModify.getSourceExpressionList();
@@ -64,18 +70,26 @@ public class BeamIOSinkRule extends ConverterRule {
final Table table = tableModify.getTable().unwrap(Table.class);
switch (table.getJdbcTableType()) {
- case TABLE:
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(
- String.format("Streams doesn't support %s modify operation",
operation));
- }
- return new BeamIOSinkRel(cluster, traitSet,
- relOptTable, catalogReader, convertedInput, operation,
updateColumnList,
- sourceExpressionList, flattened);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported table type: %s",
table.getJdbcTableType()));
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation",
operation));
+ }
+ return new BeamIOSinkRel(
+ sqlEnv,
+ cluster,
+ traitSet,
+ relOptTable,
+ catalogReader,
+ convertedInput,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s",
table.getJdbcTableType()));
}
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
index a257d3d..7dc0b18 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.calcite.plan.Convention;
@@ -25,25 +26,32 @@ import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
+/** A {@code ConverterRule} to replace {@link TableScan} with {@link
BeamIOSourceRel}. */
public class BeamIOSourceRule extends ConverterRule {
- public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
- private BeamIOSourceRule() {
- super(LogicalTableScan.class, Convention.NONE,
BeamLogicalConvention.INSTANCE,
+ private final BeamSqlEnv sqlEnv;
+
+ public static BeamIOSourceRule forSqlEnv(BeamSqlEnv sqlEnv) {
+ return new BeamIOSourceRule(sqlEnv);
+ }
+
+ private BeamIOSourceRule(BeamSqlEnv sqlEnv) {
+ super(
+ LogicalTableScan.class,
+ Convention.NONE,
+ BeamLogicalConvention.INSTANCE,
"BeamIOSourceRule");
+ this.sqlEnv = sqlEnv;
}
@Override
public RelNode convert(RelNode rel) {
final TableScan scan = (TableScan) rel;
- return new BeamIOSourceRel(scan.getCluster(),
- scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
scan.getTable());
+ return new BeamIOSourceRel(
+ sqlEnv,
+ scan.getCluster(),
+ scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ scan.getTable());
}
-
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
index 4d9dd20..ff8029e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rule;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.calcite.plan.Convention;
@@ -26,28 +27,32 @@ import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.logical.LogicalJoin;
-/**
- * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
- */
+/** {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. */
public class BeamJoinRule extends ConverterRule {
- public static final BeamJoinRule INSTANCE = new BeamJoinRule();
- private BeamJoinRule() {
- super(LogicalJoin.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+ private final BeamSqlEnv sqlEnv;
+
+ public static BeamJoinRule forSqlEnv(BeamSqlEnv sqlEnv) {
+ return new BeamJoinRule(sqlEnv);
+ }
+
+ private BeamJoinRule(BeamSqlEnv sqlEnv) {
+ super(LogicalJoin.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
"BeamJoinRule");
+ this.sqlEnv = sqlEnv;
}
- @Override public RelNode convert(RelNode rel) {
+ @Override
+ public RelNode convert(RelNode rel) {
Join join = (Join) rel;
return new BeamJoinRel(
+ sqlEnv,
join.getCluster(),
join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(join.getLeft(),
-
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- convert(join.getRight(),
-
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(
+ join.getLeft(),
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(
+ join.getRight(),
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
join.getCondition(),
join.getVariablesSet(),
- join.getJoinType()
- );
+ join.getJoinType());
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
index d19a01d..ef52f3f 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
@@ -1,18 +1,15 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
@@ -25,11 +22,7 @@ import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalProject;
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
+/** A {@code ConverterRule} to replace {@link Project} with {@link
BeamProjectRel}. */
public class BeamProjectRule extends ConverterRule {
public static final BeamProjectRule INSTANCE = new BeamProjectRule();
@@ -42,9 +35,11 @@ public class BeamProjectRule extends ConverterRule {
final Project project = (Project) rel;
final RelNode input = project.getInput();
- return new BeamProjectRel(project.getCluster(),
+ return new BeamProjectRel(
+ project.getCluster(),
project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
convert(input,
input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- project.getProjects(), project.getRowType());
+ project.getProjects(),
+ project.getRowType());
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index adbed07..414eabb 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -19,10 +19,8 @@
package org.apache.beam.sdk.extensions.sql;
import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
-import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest
- .ORDER_DETAILS1;
-import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest
- .ORDER_DETAILS2;
+import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index b6ac343..2b53432 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
@@ -45,9 +46,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.junit.BeforeClass;
-/**
- * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link
BeamSqlExpression}.
- */
+/** base class to test {@link BeamSqlFnExecutor} and subclasses of {@link
BeamSqlExpression}. */
public class BeamSqlFnExecutorTestBase {
static final JavaTypeFactory TYPE_FACTORY = new
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
@@ -59,30 +58,35 @@ public class BeamSqlFnExecutorTestBase {
@BeforeClass
public static void prepare() {
- relDataType = TYPE_FACTORY.builder()
- .add("order_id", SqlTypeName.BIGINT)
- .add("site_id", SqlTypeName.INTEGER)
- .add("price", SqlTypeName.DOUBLE)
- .add("order_time", SqlTypeName.BIGINT).build();
+ relDataType =
+ TYPE_FACTORY
+ .builder()
+ .add("order_id", SqlTypeName.BIGINT)
+ .add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE)
+ .add("order_time", SqlTypeName.BIGINT)
+ .build();
row =
- Row
- .withRowType(CalciteUtils.toBeamRowType(relDataType))
- .addValues(
- 1234567L,
- 0,
- 8.9,
- 1234567L)
+ Row.withRowType(CalciteUtils.toBeamRowType(relDataType))
+ .addValues(1234567L, 0, 8.9, 1234567L)
.build();
+ BeamSqlEnv sqlEnv = new BeamSqlEnv();
SchemaPlus schema = Frameworks.createRootSchema(true);
final List<RelTraitDef> traitDefs = new ArrayList<>();
traitDefs.add(ConventionTraitDef.INSTANCE);
traitDefs.add(RelCollationTraitDef.INSTANCE);
- FrameworkConfig config = Frameworks.newConfigBuilder()
-
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+ FrameworkConfig config =
+ Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build())
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(BeamRuleSets.getRuleSets(sqlEnv))
+ .costFactory(null)
+ .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+ .build();
relBuilder = RelBuilder.create(config);
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
index 73cff75..d4d0bc2 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
@@ -20,10 +20,8 @@ package
org.apache.beam.sdk.extensions.sql.impl.transform.agg;
import static java.math.BigDecimal.ONE;
import static java.math.BigDecimal.ZERO;
-import static
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator
- .newVarianceAccumulator;
-import static
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator
- .ofSingleElement;
+import static
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator.newVarianceAccumulator;
+import static
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator.ofSingleElement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
index 6bb9aff..a3a73c9 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
@@ -19,8 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.transform.agg;
import static java.math.BigDecimal.ZERO;
-import static
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator
- .newVarianceAccumulator;
+import static
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator.newVarianceAccumulator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
--
To stop receiving notification emails like this one, please contact
[email protected].