http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java new file mode 100644 index 0000000..ba6235f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.planner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The core component to handle through a SQL statement, from explain execution plan, + * to generate a Beam pipeline. + * + */ +public class BeamQueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); + + protected final Planner planner; + private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); + + public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + + public BeamQueryPlanner(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<>(); + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, + Collections.<String>emptyList(), TYPE_FACTORY)); + + FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) + .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) + .build(); + this.planner = Frameworks.getPlanner(config); + + for (String t : schema.getTableNames()) { + sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); + } + } + + /** + * Parse input SQL query, and return a {@link SqlNode} as grammar tree. + */ + public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ + return planner.parse(sqlQuery); + } + + /** + * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, + * which is linked with the given {@code pipeline}. The final output stream is returned as + * {@code PCollection} so more operations can be applied. + */ + public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception { + BeamRelNode relNode = convertToBeamRel(sqlStatement); + + // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. + return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); + } + + /** + * It parses and validate the input query, then convert into a + * {@link BeamRelNode} tree. + * + */ + public BeamRelNode convertToBeamRel(String sqlStatement) + throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode beamRelNode; + try { + beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); + } finally { + planner.close(); + } + return beamRelNode; + } + + private RelNode validateAndConvert(SqlNode sqlNode) + throws ValidationException, RelConversionException { + SqlNode validated = validateNode(sqlNode); + LOG.info("SQL:\n" + validated); + RelNode relNode = convertToRelNode(validated); + return convertToBeamRel(relNode); + } + + private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { + RelTraitSet traitSet = relNode.getTraitSet(); + + LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); + + // PlannerImpl.transform() optimizes RelNode with ruleset + return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); + } + + private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { + return planner.rel(sqlNode).rel; + } + + private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { + return planner.validate(sqlNode); + } + + public Map<String, BaseBeamTable> getSourceTables() { + return sourceTables; + } + + public Planner getPlanner() { + return planner; + } + +}
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java new file mode 100644 index 0000000..fba4638 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.planner; + +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** + * customized data type in Beam. + * + */ +public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { + public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); + + @Override + public int getMaxNumericScale() { + return 38; + } + + @Override + public int getMaxNumericPrecision() { + return 38; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java new file mode 100644 index 0000000..e907321 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.Iterator; +import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.rule.BeamAggregationRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamFilterRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamIOSinkRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamIOSourceRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamIntersectRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamJoinRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamMinusRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamProjectRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamSortRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamUnionRule; +import org.apache.beam.sdk.extensions.sql.rule.BeamValuesRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.tools.RuleSet; + +/** + * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard + * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} + * + */ +public class BeamRuleSets { + private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet + .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, + BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, + BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE, + BeamJoinRule.INSTANCE) + .build(); + + public static RuleSet[] getRuleSets() { + return new RuleSet[] { new BeamRuleSet( + ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; + } + + private static class BeamRuleSet implements RuleSet { + final ImmutableSet<RelOptRule> rules; + + public BeamRuleSet(ImmutableSet<RelOptRule> rules) { + this.rules = rules; + } + + public BeamRuleSet(ImmutableList<RelOptRule> rules) { + this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java new file mode 100644 index 0000000..680ccbd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner} is the main interface. + * It defines data sources, validate a SQL statement, and convert it as a Beam + * pipeline. + */ +package org.apache.beam.sdk.extensions.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java new file mode 100644 index 0000000..66ab892 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.transform.BeamAggregationTransforms; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.joda.time.Duration; + +/** + * {@link BeamRelNode} to replace a {@link Aggregate} node. + * + */ +public class BeamAggregationRel extends Aggregate implements BeamRelNode { + private int windowFieldIdx = -1; + private WindowFn<BeamSqlRow, BoundedWindow> windowFn; + private Trigger trigger; + private Duration allowedLatence = Duration.ZERO; + + public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits + , RelNode child, boolean indicator, + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls + , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + this.windowFn = windowFn; + this.trigger = trigger; + this.windowFieldIdx = windowFieldIdx; + this.allowedLatence = allowedLatence; + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this) + "_"; + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + if (windowFieldIdx != -1) { + upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + .setCoder(upstream.getCoder()); + } + + PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window", + Window.into(windowFn) + .triggering(trigger) + .withAllowedLateness(allowedLatence) + .accumulatingFiredPanes()); + + BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); + PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( + stageName + "exCombineBy", + WithKeys + .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( + windowFieldIdx, groupSet))) + .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); + + + BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); + + PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( + stageName + "combineBy", + Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( + new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(), + CalciteUtils.toBeamRowType(input.getRowType())))) + .setCoder(KvCoder.of(keyCoder, aggCoder)); + + PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", + ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( + CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx))); + mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return mergedStream; + } + + /** + * Type of sub-rowrecord used as Group-By keys. + */ + private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) { + BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType); + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (int i : groupSet.asList()) { + if (i != windowFieldIdx) { + fieldNames.add(inputRowType.getFieldsName().get(i)); + fieldTypes.add(inputRowType.getFieldsType().get(i)); + } + } + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + + /** + * Type of sub-rowrecord, that represents the list of aggregation fields. + */ + private BeamSqlRowType exAggFieldsSchema() { + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (AggregateCall ac : getAggCallList()) { + fieldNames.add(ac.name); + fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); + } + + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + + @Override + public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator + , ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + return new BeamAggregationRel(getCluster(), traitSet, input, indicator + , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence); + } + + public void setWindowFn(WindowFn windowFn) { + this.windowFn = windowFn; + } + + public void setTrigger(Trigger trigger) { + this.trigger = trigger; + } + + public RelWriter explainTerms(RelWriter pw) { + // We skip the "groups" element if it is a singleton of "group". + pw.item("group", groupSet) + .itemIf("window", windowFn, windowFn != null) + .itemIf("trigger", trigger, trigger != null) + .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1) + .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE) + .itemIf("indicator", indicator, indicator) + .itemIf("aggs", aggCalls, pw.nest()); + if (!pw.nest()) { + for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) { + pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e); + } + } + return pw; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java new file mode 100644 index 0000000..f1da29f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.transform.BeamSqlFilterFn; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code Filter} node. + * + */ +public class BeamFilterRel extends Filter implements BeamRelNode { + + public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new BeamFilterRel(getCluster(), traitSet, input, condition); + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this); + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + + BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); + + PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, + ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); + filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return filterStream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java new file mode 100644 index 0000000..ce941a0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import com.google.common.base.Joiner; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code TableModify} node. + * + */ +public class BeamIOSinkRel extends TableModify implements BeamRelNode { + public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + Prepare.CatalogReader catalogReader, RelNode child, Operation operation, + List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, + sourceExpressionList, flattened); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), + getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + } + + /** + * Note that {@code BeamIOSinkRel} returns the input PCollection, + * which is the persisted PCollection. + */ + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this); + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); + + BaseBeamTable targetTable = sqlEnv.findTable(sourceName); + + upstream.apply(stageName, targetTable.buildIOWriter()); + + return upstream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java new file mode 100644 index 0000000..85f0bc8 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import com.google.common.base.Joiner; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.TableScan; + +/** + * BeamRelNode to replace a {@code TableScan} node. + * + */ +public class BeamIOSourceRel extends TableScan implements BeamRelNode { + + public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); + + TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName); + if (inputPCollections.has(sourceTupleTag)) { + //choose PCollection from input PCollectionTuple if exists there. + PCollection<BeamSqlRow> sourceStream = inputPCollections + .get(new TupleTag<BeamSqlRow>(sourceName)); + return sourceStream; + } else { + //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). + BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); + return sourceTable.buildIOReader(inputPCollections.getPipeline()) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java new file mode 100644 index 0000000..ae73a0d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Intersect} node. + * + * <p>This is used to combine two SELECT statements, but returns rows only from the + * first SELECT statement that are identical to a row in the second SELECT statement. + */ +public class BeamIntersectRel extends Intersect implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamIntersectRel( + RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamIntersectRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java new file mode 100644 index 0000000..3d9c9cd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.transform.BeamJoinTransforms; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; + +/** + * {@code BeamRelNode} to replace a {@code Join} node. + * + * <p>Support for join can be categorized into 3 cases: + * <ul> + * <li>BoundedTable JOIN BoundedTable</li> + * <li>UnboundedTable JOIN UnboundedTable</li> + * <li>BoundedTable JOIN UnboundedTable</li> + * </ul> + * + * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both + * sides match. + * + * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some + * constraints: + * + * <ul> + * <li>{@code FULL OUTER JOIN} is not supported.</li> + * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li> + * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li> + * </ul> + * + * + * <p>There are also some general constraints: + * + * <ul> + * <li>Only equi-join is supported.</li> + * <li>CROSS JOIN is not supported.</li> + * </ul> + */ +public class BeamJoinRel extends Join implements BeamRelNode { + public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + super(cluster, traits, left, right, condition, variablesSet, joinType); + } + + @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, + RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, + joinType); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, + BeamSqlEnv sqlEnv) + throws Exception { + BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); + BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); + PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + + final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); + PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + + String stageName = BeamSqlRelUtils.getStageName(this); + WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); + + // extract the join fields + List<Pair<Integer, Integer>> pairs = extractJoinColumns( + leftRelNode.getRowType().getFieldCount()); + + // build the extract key type + // the name of the join field is not important + List<String> names = new ArrayList<>(pairs.size()); + List<Integer> types = new ArrayList<>(pairs.size()); + for (int i = 0; i < pairs.size(); i++) { + names.add("c" + i); + types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); + } + BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types); + + Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); + + // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow> + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows + .apply(stageName + "_left_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); + + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows + .apply(stageName + "_right_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); + + // prepare the NullRows + BeamSqlRow leftNullRow = buildNullRow(leftRelNode); + BeamSqlRow rightNullRow = buildNullRow(rightRelNode); + + // a regular join + if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { + try { + leftWinFn.verifyCompatibility(rightWinFn); + } catch (IncompatibleWindowException e) { + throw new IllegalArgumentException( + "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); + } + + return standardJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow, stageName); + } else if ( + (leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + ) { + // if one of the sides is Bounded & the other is Unbounded + // then do a sideInput join + // when doing a sideInput join, the windowFn does not need to match + // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be + // the unbounded + if (joinType == JoinRelType.FULL) { + throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join " + + "a bounded table with an unbounded table."); + } + + if ((joinType == JoinRelType.LEFT + && leftRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (joinType == JoinRelType.RIGHT + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) { + throw new UnsupportedOperationException( + "LEFT side of an OUTER JOIN must be Unbounded table."); + } + + return sideInputJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow); + } else { + throw new UnsupportedOperationException( + "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn); + } + } + + private PCollection<BeamSqlRow> standardJoin( + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { + PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null; + switch (joinType) { + case LEFT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); + break; + case RIGHT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow); + break; + case FULL: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, + rightNullRow); + break; + case INNER: + default: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .innerJoin(extractedLeftRows, extractedRightRows); + break; + } + + PCollection<BeamSqlRow> ret = joinedRows + .apply(stageName + "_JoinParts2WholeRow", + MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + return ret; + } + + public PCollection<BeamSqlRow> sideInputJoin( + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { + // we always make the Unbounded table on the left to do the sideInput join + // (will convert the result accordingly before return) + boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); + JoinRelType realJoinType = + (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; + + PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows = + swapped ? extractedRightRows : extractedLeftRows; + PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows = + swapped ? extractedLeftRows : extractedRightRows; + BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; + + // swapped still need to pass down because, we need to swap the result back. + return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, + realRightNullRow, swapped); + } + + private PCollection<BeamSqlRow> sideInputJoinHelper( + JoinRelType joinType, + PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows, + BeamSqlRow rightNullRow, boolean swapped) { + final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows + .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap()); + + PCollection<BeamSqlRow> ret = leftRows + .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( + joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return ret; + } + + private BeamSqlRow buildNullRow(BeamRelNode relNode) { + BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); + BeamSqlRow nullRow = new BeamSqlRow(leftType); + for (int i = 0; i < leftType.size(); i++) { + nullRow.addField(i, null); + } + return nullRow; + } + + private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { + // it's a CROSS JOIN because: condition == true + if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { + throw new UnsupportedOperationException("CROSS JOIN is not supported!"); + } + + RexCall call = (RexCall) condition; + List<Pair<Integer, Integer>> pairs = new ArrayList<>(); + if ("AND".equals(call.getOperator().getName())) { + List<RexNode> operands = call.getOperands(); + for (RexNode rexNode : operands) { + Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); + pairs.add(pair); + } + } else if ("=".equals(call.getOperator().getName())) { + pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); + } else { + throw new UnsupportedOperationException( + "Operator " + call.getOperator().getName() + " is not supported in join condition"); + } + + return pairs; + } + + private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, + int leftRowColumnCount) { + List<RexNode> operands = oneCondition.getOperands(); + final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + + final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + final int rightIndex = rightIndex1 - leftRowColumnCount; + + return new Pair<>(leftIndex, rightIndex); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java new file mode 100644 index 0000000..58b90ca --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; + +/** + * Convertion for Beam SQL. + * + */ +public enum BeamLogicalConvention implements Convention { + INSTANCE; + + @Override + public Class getInterface() { + return BeamRelNode.class; + } + + @Override + public String getName() { + return "BEAM_LOGICAL"; + } + + @Override + public RelTraitDef getTraitDef() { + return ConventionTraitDef.INSTANCE; + } + + @Override + public boolean satisfies(RelTrait trait) { + return this == trait; + } + + @Override + public void register(RelOptPlanner planner) { + } + + @Override + public String toString() { + return getName(); + } + + @Override + public boolean canConvertConvention(Convention toConvention) { + return false; + } + + @Override + public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java new file mode 100644 index 0000000..8cef971 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Minus} node. + * + * <p>Corresponds to the SQL {@code EXCEPT} operator. + */ +public class BeamMinusRel extends Minus implements BeamRelNode { + + private BeamSetOperatorRelBase delegate; + + public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.MINUS, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamMinusRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java new file mode 100644 index 0000000..8f81038 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.transform.BeamSqlProjectFn; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code Project} node. + * + */ +public class BeamProjectRel extends Project implements BeamRelNode { + + /** + * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. + * + */ + public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this); + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + + BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); + + PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo + .of(new BeamSqlProjectFn(getRelTypeName(), executor, + CalciteUtils.toBeamRowType(rowType)))); + projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return projectStream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java new file mode 100644 index 0000000..80a4b84 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.rel.RelNode; + +/** + * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. + */ +public interface BeamRelNode extends RelNode { + + /** + * A {@link BeamRelNode} is a recursive structure, the + * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) + * algorithm. + */ + PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java new file mode 100644 index 0000000..7f80eb0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.transform.BeamSetOperatorsTransforms; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.rel.RelNode; + +/** + * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} + * and {@code BeamMinusRel}. + */ +public class BeamSetOperatorRelBase { + /** + * Set operator type. + */ + public enum OpType implements Serializable { + UNION, + INTERSECT, + MINUS + } + + private BeamRelNode beamRelNode; + private List<RelNode> inputs; + private boolean all; + private OpType opType; + + public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, + List<RelNode> inputs, boolean all) { + this.beamRelNode = beamRelNode; + this.opType = opType; + this.inputs = inputs; + this.all = all; + } + + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) + .buildBeamPipeline(inputPCollections, sqlEnv); + PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) + .buildBeamPipeline(inputPCollections, sqlEnv); + + WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); + if (!leftWindow.isCompatible(rightWindow)) { + throw new IllegalArgumentException( + "inputs of " + opType + " have different window strategy: " + + leftWindow + " VS " + rightWindow); + } + + final TupleTag<BeamSqlRow> leftTag = new TupleTag<>(); + final TupleTag<BeamSqlRow> rightTag = new TupleTag<>(); + + // co-group + String stageName = BeamSqlRelUtils.getStageName(beamRelNode); + PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple + .of(leftTag, leftRows.apply( + stageName + "_CreateLeftIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .and(rightTag, rightRows.apply( + stageName + "_CreateRightIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .apply(CoGroupByKey.<BeamSqlRow>create()); + PCollection<BeamSqlRow> ret = coGbkResultCollection + .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, + opType, all))); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java new file mode 100644 index 0000000..363c0a9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.io.Serializable; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Top; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamRelNode} to replace a {@code Sort} node. + * + * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement + * the {@code Sort} algebra. The following types of ORDER BY are supported: + + * <pre>{@code + * select * from t order by id desc limit 10; + * select * from t order by id desc limit 10, 5; + * }</pre> + * + * <p>but Order BY without a limit is NOT supported: + * + * <pre>{@code + * select * from t order by id desc + * }</pre> + * + * <h3>Constraints</h3> + * <ul> + * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT` + * must fit into the memory of a single machine.</li> + * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`, + * it does not make much sense to use `ORDER BY` with `WINDOW`. + * </li> + * </ul> + */ +public class BeamSortRel extends Sort implements BeamRelNode { + private List<Integer> fieldIndices = new ArrayList<>(); + private List<Boolean> orientation = new ArrayList<>(); + private List<Boolean> nullsFirst = new ArrayList<>(); + + private int startIndex = 0; + private int count; + + public BeamSortRel( + RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + RelCollation collation, + RexNode offset, + RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + + List<RexNode> fieldExps = getChildExps(); + RelCollationImpl collationImpl = (RelCollationImpl) collation; + List<RelFieldCollation> collations = collationImpl.getFieldCollations(); + for (int i = 0; i < fieldExps.size(); i++) { + RexNode fieldExp = fieldExps.get(i); + RexInputRef inputRef = (RexInputRef) fieldExp; + fieldIndices.add(inputRef.getIndex()); + orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); + + RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; + if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { + rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); + } + nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); + } + + if (fetch == null) { + throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!"); + } + + RexLiteral fetchLiteral = (RexLiteral) fetch; + count = ((BigDecimal) fetchLiteral.getValue()).intValue(); + + if (offset != null) { + RexLiteral offsetLiteral = (RexLiteral) offset; + startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); + } + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input) + .buildBeamPipeline(inputPCollections, sqlEnv); + Type windowType = upstream.getWindowingStrategy().getWindowFn() + .getWindowTypeDescriptor().getType(); + if (!windowType.equals(GlobalWindow.class)) { + throw new UnsupportedOperationException( + "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType); + } + + BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation, + nullsFirst); + // first find the top (offset + count) + PCollection<List<BeamSqlRow>> rawStream = + upstream.apply("extractTopOffsetAndFetch", + Top.of(startIndex + count, comparator).withoutDefaults()) + .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); + + // strip the `leading offset` + if (startIndex > 0) { + rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( + new SubListFn<BeamSqlRow>(startIndex, startIndex + count))) + .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); + } + + PCollection<BeamSqlRow> orderedStream = rawStream.apply( + "flatten", Flatten.<BeamSqlRow>iterables()); + orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return orderedStream; + } + + private static class SubListFn<T> extends DoFn<List<T>, List<T>> { + private int startIndex; + private int endIndex; + + public SubListFn(int startIndex, int endIndex) { + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().subList(startIndex, endIndex)); + } + } + + @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, + RexNode offset, RexNode fetch) { + return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable { + private List<Integer> fieldsIndices; + private List<Boolean> orientation; + private List<Boolean> nullsFirst; + + public BeamSqlRowComparator(List<Integer> fieldsIndices, + List<Boolean> orientation, + List<Boolean> nullsFirst) { + this.fieldsIndices = fieldsIndices; + this.orientation = orientation; + this.nullsFirst = nullsFirst; + } + + @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) { + for (int i = 0; i < fieldsIndices.size(); i++) { + int fieldIndex = fieldsIndices.get(i); + int fieldRet = 0; + SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); + // whether NULL should be ordered first or last(compared to non-null values) depends on + // what user specified in SQL(NULLS FIRST/NULLS LAST) + if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { + continue; + } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) { + fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1); + } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { + fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1); + } else { + switch (fieldType) { + case TINYINT: + fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex)); + break; + case SMALLINT: + fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex)); + break; + case INTEGER: + fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex)); + break; + case BIGINT: + fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex)); + break; + case FLOAT: + fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex)); + break; + case DOUBLE: + fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex)); + break; + case VARCHAR: + fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex)); + break; + case DATE: + fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex)); + break; + default: + throw new UnsupportedOperationException( + "Data type: " + fieldType + " not supported yet!"); + } + } + + fieldRet *= (orientation.get(i) ? -1 : 1); + if (fieldRet != 0) { + return fieldRet; + } + } + return 0; + } + } + + public static <T extends Number & Comparable> int numberCompare(T a, T b) { + return a.compareTo(b); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java new file mode 100644 index 0000000..cc503d0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlExplainLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for {@code BeamRelNode}. + */ +class BeamSqlRelUtils { + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class); + + private static final AtomicInteger sequence = new AtomicInteger(0); + private static final AtomicInteger classSequence = new AtomicInteger(0); + + public static String getStageName(BeamRelNode relNode) { + return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + + sequence.getAndIncrement(); + } + + public static String getClassName(BeamRelNode relNode) { + return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + + "_" + classSequence.getAndIncrement(); + } + + public static BeamRelNode getBeamRelInput(RelNode input) { + if (input instanceof RelSubset) { + // go with known best input + input = ((RelSubset) input).getBest(); + } + return (BeamRelNode) input; + } + + public static String explain(final RelNode rel) { + return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { + String explain = ""; + try { + explain = RelOptUtil.toString(rel); + } catch (StackOverflowError e) { + LOG.error("StackOverflowError occurred while extracting plan. " + + "Please report it to the dev@ mailing list."); + LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); + LOG.error("Forcing plan to empty string and continue... " + + "SQL Runner may not working properly after."); + } + return explain; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java new file mode 100644 index 0000000..695521d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Union; + +/** + * {@link BeamRelNode} to replace a {@link Union}. + * + * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL + * perspective, two cases are supported: + * + * <p>1) Do not use {@code grouped window function}: + * + * <pre>{@code + * select * from person UNION select * from person + * }</pre> + * + * <p>2) Use the same {@code grouped window function}, with the same param: + * <pre>{@code + * select id, count(*) from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * UNION + * select * from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * }</pre> + * + * <p>Inputs with different group functions are NOT supported: + * <pre>{@code + * select id, count(*) from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * UNION + * select * from person + * group by id, TUMBLE(order_time, INTERVAL '2' HOUR) + * }</pre> + */ +public class BeamUnionRel extends Union implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamUnionRel(RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + this.delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.UNION, + inputs, all); + } + + public BeamUnionRel(RelInput input) { + super(input); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java new file mode 100644 index 0000000..f3bf3a3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; + +/** + * {@code BeamRelNode} to replace a {@code Values} node. + * + * <p>{@code BeamValuesRel} will be used in the following SQLs: + * <ul> + * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li> + * <li>{@code select 1, '1', LOCALTIME}</li> + * </ul> + */ +public class BeamValuesRel extends Values implements BeamRelNode { + + public BeamValuesRel( + RelOptCluster cluster, + RelDataType rowType, + ImmutableList<ImmutableList<RexLiteral>> tuples, + RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + List<BeamSqlRow> rows = new ArrayList<>(tuples.size()); + String stageName = BeamSqlRelUtils.getStageName(this); + if (tuples.isEmpty()) { + throw new IllegalStateException("Values with empty tuples!"); + } + + BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); + for (ImmutableList<RexLiteral> tuple : tuples) { + BeamSqlRow row = new BeamSqlRow(beamSQLRowType); + for (int i = 0; i < tuple.size(); i++) { + BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); + } + rows.add(row); + } + + return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) + .setCoder(new BeamSqlRowCoder(beamSQLRowType)); + } +}