[
https://issues.apache.org/jira/browse/BEAM-4044?focusedWorklogId=94439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94439
]
ASF GitHub Bot logged work on BEAM-4044:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/18 03:35
Start Date: 24/Apr/18 03:35
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5154: [BEAM-4044]
[SQL] Make BeamCalciteTable self planning
URL: https://github.com/apache/beam/pull/5154
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 7f77668379c..eeaf36e7af3 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -20,8 +20,8 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collection;
+import java.util.List;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
@@ -29,6 +29,8 @@
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -39,19 +41,22 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.tools.Frameworks;
/**
* {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
{@link BeamSqlCli}.
@@ -60,21 +65,19 @@
* {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL
queries.
*/
public class BeamSqlEnv implements Serializable {
- transient SchemaPlus schema;
+ transient CalciteSchema schema;
transient BeamQueryPlanner planner;
- transient Map<String, BeamSqlTable> tables;
public BeamSqlEnv() {
- tables = new HashMap<>(16);
- schema = Frameworks.createRootSchema(true);
- planner = new BeamQueryPlanner(this, schema);
+ schema = CalciteSchema.createRootSchema(true);
+ planner = new BeamQueryPlanner(this, schema.plus());
}
/**
* Register a UDF function which can be used in SQL expression.
*/
public void registerUdf(String functionName, Class<?> clazz, String method) {
- schema.add(functionName, ScalarFunctionImpl.create(clazz, method));
+ schema.plus().add(functionName, ScalarFunctionImpl.create(clazz, method));
}
/**
@@ -97,7 +100,7 @@ public void registerUdf(String functionName,
SerializableFunction sfn) {
* See {@link org.apache.beam.sdk.transforms.Combine.CombineFn} on how to
implement a UDAF.
*/
public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
- schema.add(functionName, new UdafImpl(combineFn));
+ schema.plus().add(functionName, new UdafImpl(combineFn));
}
/**
@@ -134,73 +137,60 @@ public void registerTable(String tableName,
PCollection<Row> pCollection, Schema
* Registers a {@link BaseBeamTable} which can be used for all subsequent
queries.
*/
public void registerTable(String tableName, BeamSqlTable table) {
- tables.put(tableName, table);
- schema.add(tableName, new BeamCalciteTable(table.getSchema()));
- planner.getSourceTables().put(tableName, table);
+ schema.add(tableName, new BeamCalciteTable(table));
}
public void deregisterTable(String targetTableName) {
- // reconstruct the schema
- schema = Frameworks.createRootSchema(true);
- for (Map.Entry<String, BeamSqlTable> entry : tables.entrySet()) {
- String tableName = entry.getKey();
- BeamSqlTable table = entry.getValue();
- if (!tableName.equals(targetTableName)) {
- schema.add(tableName, new BeamCalciteTable(table.getSchema()));
- }
- }
- planner = new BeamQueryPlanner(this, schema);
- }
-
- /**
- * Find {@link BaseBeamTable} by table name.
- */
- public BeamSqlTable findTable(String tableName) {
- return planner.getSourceTables().get(tableName);
+ schema.removeTable(targetTableName);
}
- private static class BeamCalciteTable implements ScannableTable,
Serializable {
- private Schema beamSchema;
+ private static class BeamCalciteTable extends AbstractQueryableTable
+ implements ModifiableTable, TranslatableTable {
+ private BeamSqlTable beamTable;
- public BeamCalciteTable(Schema beamSchema) {
- this.beamSchema = beamSchema;
+ public BeamCalciteTable(BeamSqlTable beamTable) {
+ super(Object[].class);
+ this.beamTable = beamTable;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.toCalciteRowType(this.beamSchema,
BeamQueryPlanner.TYPE_FACTORY);
+ return CalciteUtils.toCalciteRowType(this.beamTable.getSchema(),
+ BeamQueryPlanner.TYPE_FACTORY);
}
@Override
- public Enumerable<Object[]> scan(DataContext root) {
- // not used as Beam SQL uses its own execution engine
- return null;
+ public RelNode toRel(
+ RelOptTable.ToRelContext context,
+ RelOptTable relOptTable) {
+ return new BeamIOSourceRel(
+ context.getCluster(), relOptTable, beamTable);
}
- /**
- * Not used {@link Statistic} to optimize the plan.
- */
@Override
- public Statistic getStatistic() {
- return Statistics.UNKNOWN;
+ public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+ SchemaPlus schema, String tableName) {
+ throw new UnsupportedOperationException();
}
- /**
- * all sources are treated as TABLE in Beam SQL.
- */
@Override
- public org.apache.calcite.schema.Schema.TableType getJdbcTableType() {
- return org.apache.calcite.schema.Schema.TableType.TABLE;
- }
-
- @Override public boolean isRolledUp(String column) {
- return false;
+ public Collection getModifiableCollection() {
+ return null;
}
- @Override public boolean rolledUpColumnValidInsideAgg(String column,
- SqlCall call,
SqlNode parent,
-
CalciteConnectionConfig config) {
- return false;
+ @Override
+ public TableModify toModificationRel(
+ RelOptCluster cluster,
+ RelOptTable table,
+ Prepare.CatalogReader catalogReader,
+ RelNode child,
+ TableModify.Operation operation,
+ List<String> updateColumnList,
+ List<RexNode> sourceExpressionList,
+ boolean flattened) {
+ return new BeamIOSinkRel(
+ cluster, table, catalogReader, child, operation, updateColumnList,
+ sourceExpressionList, flattened, beamTable);
}
}
@@ -211,8 +201,7 @@ public BeamQueryPlanner getPlanner() {
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
in.defaultReadObject();
- tables = new HashMap<String, BeamSqlTable>(16);
- schema = Frameworks.createRootSchema(true);
- planner = new BeamQueryPlanner(this, schema);
+ schema = CalciteSchema.createRootSchema(true);
+ planner = new BeamQueryPlanner(this, schema.plus());
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index 7858c5c21f6..348223e3851 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -19,15 +19,11 @@
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
@@ -69,7 +65,6 @@
private static final Logger LOG =
LoggerFactory.getLogger(BeamQueryPlanner.class);
protected final Planner planner;
- private Map<String, BeamSqlTable> sourceTables = new HashMap<>();
public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
@@ -106,10 +101,6 @@ public BeamQueryPlanner(BeamSqlEnv sqlEnv, SchemaPlus
schema) {
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
.build();
this.planner = Frameworks.getPlanner(config);
-
- for (String t : schema.getTableNames()) {
- sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
- }
}
/**
@@ -173,10 +164,6 @@ private SqlNode validateNode(SqlNode sqlNode) throws
ValidationException {
return planner.validate(sqlNode);
}
- public Map<String, BeamSqlTable> getSourceTables() {
- return sourceTables;
- }
-
public Planner getPlanner() {
return planner;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 1d10816fd74..af74dae7510 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -21,8 +21,6 @@
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamFilterRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSourceRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamMinusRule;
@@ -43,17 +41,15 @@
public static RuleSet[] getRuleSets(BeamSqlEnv sqlEnv) {
return new RuleSet[] {
RuleSets.ofList(
- BeamIOSourceRule.forSqlEnv(sqlEnv),
BeamProjectRule.INSTANCE,
BeamFilterRule.INSTANCE,
- BeamIOSinkRule.forSqlEnv(sqlEnv),
BeamAggregationRule.INSTANCE,
BeamSortRule.INSTANCE,
BeamValuesRule.INSTANCE,
BeamIntersectRule.INSTANCE,
BeamMinusRule.INSTANCE,
BeamUnionRule.INSTANCE,
- BeamJoinRule.forSqlEnv(sqlEnv))
+ BeamJoinRule.INSTANCE)
};
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 6afa8b1964e..2841163750c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -17,41 +17,43 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
-import com.google.common.base.Joiner;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql2rel.RelStructuredTypeFlattener;
/** BeamRelNode to replace a {@code TableModify} node. */
-public class BeamIOSinkRel extends TableModify implements BeamRelNode {
+public class BeamIOSinkRel extends TableModify
+ implements BeamRelNode, RelStructuredTypeFlattener.SelfFlatteningRel {
- private final BeamSqlEnv sqlEnv;
+ private final BeamSqlTable sqlTable;
+ private boolean isFlattening = false;
public BeamIOSinkRel(
- BeamSqlEnv sqlEnv,
RelOptCluster cluster,
- RelTraitSet traits,
RelOptTable table,
Prepare.CatalogReader catalogReader,
RelNode child,
Operation operation,
List<String> updateColumnList,
List<RexNode> sourceExpressionList,
- boolean flattened) {
+ boolean flattened,
+ BeamSqlTable sqlTable) {
super(
cluster,
- traits,
+ cluster.traitSetOf(BeamLogicalConvention.INSTANCE),
table,
catalogReader,
child,
@@ -59,22 +61,39 @@ public BeamIOSinkRel(
updateColumnList,
sourceExpressionList,
flattened);
- this.sqlEnv = sqlEnv;
+ this.sqlTable = sqlTable;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new BeamIOSinkRel(
- sqlEnv,
+ boolean flattened = isFlattened() || isFlattening;
+ BeamIOSinkRel newRel = new BeamIOSinkRel(
getCluster(),
- traitSet,
getTable(),
getCatalogReader(),
sole(inputs),
getOperation(),
getUpdateColumnList(),
getSourceExpressionList(),
- isFlattened());
+ flattened,
+ sqlTable);
+ newRel.traitSet = traitSet;
+ return newRel;
+ }
+
+ @Override
+ public void flattenRel(RelStructuredTypeFlattener flattener) {
+ // rewriteGeneric calls this.copy. Setting isFlattining passes
+ // this context into copy for modification of the flattened flag.
+ isFlattening = true;
+ flattener.rewriteGeneric(this);
+ isFlattening = false;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ planner.addRule(BeamIOSinkRule.INSTANCE);
+ super.register(planner);
}
@Override
@@ -96,11 +115,7 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode>
inputs) {
PCollection<Row> upstream =
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
-
- upstream.apply(stageName, targetTable.buildIOWriter());
+ upstream.apply(stageName, sqlTable.buildIOWriter());
return upstream;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index fe09d7a4825..0a8db3997ba 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -19,7 +19,6 @@
import com.google.common.base.Joiner;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -28,7 +27,6 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.core.TableScan;
/**
@@ -37,12 +35,12 @@
*/
public class BeamIOSourceRel extends TableScan implements BeamRelNode {
- private BeamSqlEnv sqlEnv;
+ private BeamSqlTable sqlTable;
public BeamIOSourceRel(
- BeamSqlEnv sqlEnv, RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable table) {
- super(cluster, traitSet, table);
- this.sqlEnv = sqlEnv;
+ RelOptCluster cluster, RelOptTable table, BeamSqlTable sqlTable) {
+ super(cluster, cluster.traitSetOf(BeamLogicalConvention.INSTANCE), table);
+ this.sqlTable = sqlTable;
}
@Override
@@ -61,13 +59,15 @@ public BeamIOSourceRel(
// choose PCollection from input PCollectionTuple if exists there.
PCollection<Row> sourceStream = inputPCollections.get(new
TupleTag<Row>(sourceName));
return sourceStream;
- } else {
- // If not, the source PColection is provided with
BaseBeamTable.buildIOReader().
- BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
- return sourceTable
- .buildIOReader(inputPCollections.getPipeline())
- .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
}
+ // If not, the source PColection is provided with
BaseBeamTable.buildIOReader().
+ return sqlTable
+ .buildIOReader(inputPCollections.getPipeline())
+ .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
}
}
+
+ protected BeamSqlTable getBeamSqlTable() {
+ return sqlTable;
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 45f3247f33d..1890dc07356 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -22,7 +22,6 @@
import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
import static org.joda.time.Duration.ZERO;
-import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -31,7 +30,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
@@ -93,10 +91,8 @@
* </ul>
*/
public class BeamJoinRel extends Join implements BeamRelNode {
- private final BeamSqlEnv sqlEnv;
public BeamJoinRel(
- BeamSqlEnv sqlEnv,
RelOptCluster cluster,
RelTraitSet traits,
RelNode left,
@@ -105,7 +101,6 @@ public BeamJoinRel(
Set<CorrelationId> variablesSet,
JoinRelType joinType) {
super(cluster, traits, left, right, condition, variablesSet, joinType);
- this.sqlEnv = sqlEnv;
}
@Override
@@ -116,7 +111,7 @@ public Join copy(
RelNode right,
JoinRelType joinType,
boolean semiJoinDone) {
- return new BeamJoinRel(sqlEnv, getCluster(), traitSet, left, right,
conditionExpr, variablesSet,
+ return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr,
variablesSet,
joinType);
}
@@ -133,8 +128,8 @@ public Join copy(
Schema leftSchema = CalciteUtils.toBeamSchema(left.getRowType());
final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- if (!seekable(leftRelNode, sqlEnv) && seekable(rightRelNode, sqlEnv)) {
- return joinAsLookup(leftRelNode, rightRelNode, inputPCollections,
sqlEnv)
+ if (!seekable(leftRelNode) && seekable(rightRelNode)) {
+ return joinAsLookup(leftRelNode, rightRelNode, inputPCollections)
.setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
}
@@ -374,10 +369,10 @@ private Row buildNullRow(BeamRelNode relNode) {
private PCollection<Row> joinAsLookup(
BeamRelNode leftRelNode,
BeamRelNode rightRelNode,
- PCollectionTuple inputPCollections,
- BeamSqlEnv sqlEnv) {
+ PCollectionTuple inputPCollections) {
PCollection<Row> factStream =
inputPCollections.apply(leftRelNode.toPTransform());
- BeamSqlSeekableTable seekableTable =
getSeekableTableFromRelNode(rightRelNode, sqlEnv);
+ BeamIOSourceRel srcRel = (BeamIOSourceRel) rightRelNode;
+ BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable)
srcRel.getBeamSqlTable();
return factStream.apply(
"join_as_lookup",
@@ -388,19 +383,11 @@ private Row buildNullRow(BeamRelNode relNode) {
CalciteUtils.toBeamSchema(leftRelNode.getRowType()).getFieldCount()));
}
- private BeamSqlSeekableTable getSeekableTableFromRelNode(BeamRelNode
relNode, BeamSqlEnv sqlEnv) {
- BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
- String tableName =
Joiner.on('.').join(srcRel.getTable().getQualifiedName());
- BeamSqlTable sourceTable = sqlEnv.findTable(tableName);
- return (BeamSqlSeekableTable) sourceTable;
- }
-
/** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
- private boolean seekable(BeamRelNode relNode, BeamSqlEnv sqlEnv) {
+ private boolean seekable(BeamRelNode relNode) {
if (relNode instanceof BeamIOSourceRel) {
BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
- String tableName =
Joiner.on('.').join(srcRel.getTable().getQualifiedName());
- BeamSqlTable sourceTable = sqlEnv.findTable(tableName);
+ BeamSqlTable sourceTable = srcRel.getBeamSqlTable();
if (sourceTable instanceof BeamSqlSeekableTable) {
return true;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
index 866298fdda2..b99278b7e05 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
@@ -17,79 +17,31 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
/** A {@code ConverterRule} to replace {@link TableModify} with {@link
BeamIOSinkRel}. */
public class BeamIOSinkRule extends ConverterRule {
+ public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
- private final BeamSqlEnv sqlEnv;
-
- public static BeamIOSinkRule forSqlEnv(BeamSqlEnv sqlEnv) {
- return new BeamIOSinkRule(sqlEnv);
- }
-
- private BeamIOSinkRule(BeamSqlEnv sqlEnv) {
+ private BeamIOSinkRule() {
super(
- LogicalTableModify.class,
- Convention.NONE,
+ BeamIOSinkRel.class,
+ BeamLogicalConvention.INSTANCE,
BeamLogicalConvention.INSTANCE,
"BeamIOSinkRule");
- this.sqlEnv = sqlEnv;
}
@Override
public RelNode convert(RelNode rel) {
- final TableModify tableModify = (TableModify) rel;
- final RelNode input = tableModify.getInput();
-
- final RelOptCluster cluster = tableModify.getCluster();
- final RelTraitSet traitSet =
tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
- final RelOptTable relOptTable = tableModify.getTable();
- final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput =
- convert(input,
input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
- final TableModify.Operation operation = tableModify.getOperation();
- final List<String> updateColumnList = tableModify.getUpdateColumnList();
- final List<RexNode> sourceExpressionList =
tableModify.getSourceExpressionList();
- final boolean flattened = tableModify.isFlattened();
-
- final Table table = tableModify.getTable().unwrap(Table.class);
-
- switch (table.getJdbcTableType()) {
- case TABLE:
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(
- String.format("Streams doesn't support %s modify operation",
operation));
- }
- return new BeamIOSinkRel(
- sqlEnv,
- cluster,
- traitSet,
- relOptTable,
- catalogReader,
- convertedInput,
- operation,
- updateColumnList,
- sourceExpressionList,
- flattened);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported table type: %s",
table.getJdbcTableType()));
- }
+ final RelNode convertedInput = convert(
+ rel.getInput(0),
+ rel.getTraitSet());
+ return rel.copy(rel.getTraitSet(),
+ Arrays.asList(convertedInput));
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
deleted file mode 100644
index 7dc0b183681..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.rule;
-
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-
-/** A {@code ConverterRule} to replace {@link TableScan} with {@link
BeamIOSourceRel}. */
-public class BeamIOSourceRule extends ConverterRule {
-
- private final BeamSqlEnv sqlEnv;
-
- public static BeamIOSourceRule forSqlEnv(BeamSqlEnv sqlEnv) {
- return new BeamIOSourceRule(sqlEnv);
- }
-
- private BeamIOSourceRule(BeamSqlEnv sqlEnv) {
- super(
- LogicalTableScan.class,
- Convention.NONE,
- BeamLogicalConvention.INSTANCE,
- "BeamIOSourceRule");
- this.sqlEnv = sqlEnv;
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableScan scan = (TableScan) rel;
-
- return new BeamIOSourceRel(
- sqlEnv,
- scan.getCluster(),
- scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- scan.getTable());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
index ff8029efdc4..88020991d99 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.extensions.sql.impl.rule;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.calcite.plan.Convention;
@@ -29,22 +28,16 @@
/** {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. */
public class BeamJoinRule extends ConverterRule {
- private final BeamSqlEnv sqlEnv;
+ public static final BeamJoinRule INSTANCE = new BeamJoinRule();
- public static BeamJoinRule forSqlEnv(BeamSqlEnv sqlEnv) {
- return new BeamJoinRule(sqlEnv);
- }
-
- private BeamJoinRule(BeamSqlEnv sqlEnv) {
+ private BeamJoinRule() {
super(LogicalJoin.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
"BeamJoinRule");
- this.sqlEnv = sqlEnv;
}
@Override
public RelNode convert(RelNode rel) {
Join join = (Join) rel;
return new BeamJoinRel(
- sqlEnv,
join.getCluster(),
join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
convert(
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 94439)
Time Spent: 8h 20m (was: 8h 10m)
> Take advantage of Calcite DDL
> -----------------------------
>
> Key: BEAM-4044
> URL: https://issues.apache.org/jira/browse/BEAM-4044
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Andrew Pilloud
> Assignee: Andrew Pilloud
> Priority: Major
> Time Spent: 8h 20m
> Remaining Estimate: 0h
>
> In Calcite 1.15 support for abstract DDL moved into calcite core. We should
> take advantage of that.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)