http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java deleted file mode 100644 index f1da29f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.transform.BeamSqlFilterFn; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rex.RexNode; - -/** - * BeamRelNode to replace a {@code Filter} node. - * - */ -public class BeamFilterRel extends Filter implements BeamRelNode { - - public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, - RexNode condition) { - super(cluster, traits, child, condition); - } - - @Override - public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { - return new BeamFilterRel(getCluster(), traitSet, input, condition); - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - - BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - - PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, - ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); - filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return filterStream; - } - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java deleted file mode 100644 index ce941a0..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import com.google.common.base.Joiner; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.TableModify; -import org.apache.calcite.rex.RexNode; - -/** - * BeamRelNode to replace a {@code TableModify} node. - * - */ -public class BeamIOSinkRel extends TableModify implements BeamRelNode { - public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, - Prepare.CatalogReader catalogReader, RelNode child, Operation operation, - List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { - super(cluster, traits, table, catalogReader, child, operation, updateColumnList, - sourceExpressionList, flattened); - } - - @Override - public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), - getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); - } - - /** - * Note that {@code BeamIOSinkRel} returns the input PCollection, - * which is the persisted PCollection. - */ - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - - BaseBeamTable targetTable = sqlEnv.findTable(sourceName); - - upstream.apply(stageName, targetTable.buildIOWriter()); - - return upstream; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java deleted file mode 100644 index 85f0bc8..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import com.google.common.base.Joiner; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.core.TableScan; - -/** - * BeamRelNode to replace a {@code TableScan} node. - * - */ -public class BeamIOSourceRel extends TableScan implements BeamRelNode { - - public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { - super(cluster, traitSet, table); - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - - TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName); - if (inputPCollections.has(sourceTupleTag)) { - //choose PCollection from input PCollectionTuple if exists there. - PCollection<BeamSqlRow> sourceStream = inputPCollections - .get(new TupleTag<BeamSqlRow>(sourceName)); - return sourceStream; - } else { - //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); - return sourceTable.buildIOReader(inputPCollections.getPipeline()) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java deleted file mode 100644 index ae73a0d..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Intersect; -import org.apache.calcite.rel.core.SetOp; - -/** - * {@code BeamRelNode} to replace a {@code Intersect} node. - * - * <p>This is used to combine two SELECT statements, but returns rows only from the - * first SELECT statement that are identical to a row in the second SELECT statement. - */ -public class BeamIntersectRel extends Intersect implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamIntersectRel( - RelOptCluster cluster, - RelTraitSet traits, - List<RelNode> inputs, - boolean all) { - super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(this, - BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); - } - - @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { - return new BeamIntersectRel(getCluster(), traitSet, inputs, all); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - return delegate.buildBeamPipeline(inputPCollections, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java deleted file mode 100644 index 3d9c9cd..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.transform.BeamJoinTransforms; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.CorrelationId; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.util.Pair; - -/** - * {@code BeamRelNode} to replace a {@code Join} node. - * - * <p>Support for join can be categorized into 3 cases: - * <ul> - * <li>BoundedTable JOIN BoundedTable</li> - * <li>UnboundedTable JOIN UnboundedTable</li> - * <li>BoundedTable JOIN UnboundedTable</li> - * </ul> - * - * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both - * sides match. - * - * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some - * constraints: - * - * <ul> - * <li>{@code FULL OUTER JOIN} is not supported.</li> - * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li> - * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li> - * </ul> - * - * - * <p>There are also some general constraints: - * - * <ul> - * <li>Only equi-join is supported.</li> - * <li>CROSS JOIN is not supported.</li> - * </ul> - */ -public class BeamJoinRel extends Join implements BeamRelNode { - public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, - RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { - super(cluster, traits, left, right, condition, variablesSet, joinType); - } - - @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, - RelNode right, JoinRelType joinType, boolean semiJoinDone) { - return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, - joinType); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, - BeamSqlEnv sqlEnv) - throws Exception { - BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); - BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); - PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); - - final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); - PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); - - String stageName = BeamSqlRelUtils.getStageName(this); - WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); - WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); - - // extract the join fields - List<Pair<Integer, Integer>> pairs = extractJoinColumns( - leftRelNode.getRowType().getFieldCount()); - - // build the extract key type - // the name of the join field is not important - List<String> names = new ArrayList<>(pairs.size()); - List<Integer> types = new ArrayList<>(pairs.size()); - for (int i = 0; i < pairs.size(); i++) { - names.add("c" + i); - types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); - } - BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types); - - Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); - - // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow> - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows - .apply(stageName + "_left_ExtractJoinFields", - MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) - .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); - - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows - .apply(stageName + "_right_ExtractJoinFields", - MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) - .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); - - // prepare the NullRows - BeamSqlRow leftNullRow = buildNullRow(leftRelNode); - BeamSqlRow rightNullRow = buildNullRow(rightRelNode); - - // a regular join - if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED - && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) - || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { - try { - leftWinFn.verifyCompatibility(rightWinFn); - } catch (IncompatibleWindowException e) { - throw new IllegalArgumentException( - "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); - } - - return standardJoin(extractedLeftRows, extractedRightRows, - leftNullRow, rightNullRow, stageName); - } else if ( - (leftRows.isBounded() == PCollection.IsBounded.BOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) - || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED - && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) - ) { - // if one of the sides is Bounded & the other is Unbounded - // then do a sideInput join - // when doing a sideInput join, the windowFn does not need to match - // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be - // the unbounded - if (joinType == JoinRelType.FULL) { - throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join " - + "a bounded table with an unbounded table."); - } - - if ((joinType == JoinRelType.LEFT - && leftRows.isBounded() == PCollection.IsBounded.BOUNDED) - || (joinType == JoinRelType.RIGHT - && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) { - throw new UnsupportedOperationException( - "LEFT side of an OUTER JOIN must be Unbounded table."); - } - - return sideInputJoin(extractedLeftRows, extractedRightRows, - leftNullRow, rightNullRow); - } else { - throw new UnsupportedOperationException( - "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn); - } - } - - private PCollection<BeamSqlRow> standardJoin( - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, - BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { - PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null; - switch (joinType) { - case LEFT: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); - break; - case RIGHT: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow); - break; - case FULL: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, - rightNullRow); - break; - case INNER: - default: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .innerJoin(extractedLeftRows, extractedRightRows); - break; - } - - PCollection<BeamSqlRow> ret = joinedRows - .apply(stageName + "_JoinParts2WholeRow", - MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - return ret; - } - - public PCollection<BeamSqlRow> sideInputJoin( - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, - BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { - // we always make the Unbounded table on the left to do the sideInput join - // (will convert the result accordingly before return) - boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); - JoinRelType realJoinType = - (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; - - PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows = - swapped ? extractedRightRows : extractedLeftRows; - PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows = - swapped ? extractedLeftRows : extractedRightRows; - BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; - - // swapped still need to pass down because, we need to swap the result back. - return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, - realRightNullRow, swapped); - } - - private PCollection<BeamSqlRow> sideInputJoinHelper( - JoinRelType joinType, - PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows, - BeamSqlRow rightNullRow, boolean swapped) { - final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows - .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap()); - - PCollection<BeamSqlRow> ret = leftRows - .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( - joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return ret; - } - - private BeamSqlRow buildNullRow(BeamRelNode relNode) { - BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); - BeamSqlRow nullRow = new BeamSqlRow(leftType); - for (int i = 0; i < leftType.size(); i++) { - nullRow.addField(i, null); - } - return nullRow; - } - - private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { - // it's a CROSS JOIN because: condition == true - if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { - throw new UnsupportedOperationException("CROSS JOIN is not supported!"); - } - - RexCall call = (RexCall) condition; - List<Pair<Integer, Integer>> pairs = new ArrayList<>(); - if ("AND".equals(call.getOperator().getName())) { - List<RexNode> operands = call.getOperands(); - for (RexNode rexNode : operands) { - Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); - pairs.add(pair); - } - } else if ("=".equals(call.getOperator().getName())) { - pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); - } else { - throw new UnsupportedOperationException( - "Operator " + call.getOperator().getName() + " is not supported in join condition"); - } - - return pairs; - } - - private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, - int leftRowColumnCount) { - List<RexNode> operands = oneCondition.getOperands(); - final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), - ((RexInputRef) operands.get(1)).getIndex()); - - final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), - ((RexInputRef) operands.get(1)).getIndex()); - final int rightIndex = rightIndex1 - leftRowColumnCount; - - return new Pair<>(leftIndex, rightIndex); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java deleted file mode 100644 index 58b90ca..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.RelTraitSet; - -/** - * Convertion for Beam SQL. - * - */ -public enum BeamLogicalConvention implements Convention { - INSTANCE; - - @Override - public Class getInterface() { - return BeamRelNode.class; - } - - @Override - public String getName() { - return "BEAM_LOGICAL"; - } - - @Override - public RelTraitDef getTraitDef() { - return ConventionTraitDef.INSTANCE; - } - - @Override - public boolean satisfies(RelTrait trait) { - return this == trait; - } - - @Override - public void register(RelOptPlanner planner) { - } - - @Override - public String toString() { - return getName(); - } - - @Override - public boolean canConvertConvention(Convention toConvention) { - return false; - } - - @Override - public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java deleted file mode 100644 index 8cef971..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Minus; -import org.apache.calcite.rel.core.SetOp; - -/** - * {@code BeamRelNode} to replace a {@code Minus} node. - * - * <p>Corresponds to the SQL {@code EXCEPT} operator. - */ -public class BeamMinusRel extends Minus implements BeamRelNode { - - private BeamSetOperatorRelBase delegate; - - public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, - boolean all) { - super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(this, - BeamSetOperatorRelBase.OpType.MINUS, inputs, all); - } - - @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { - return new BeamMinusRel(getCluster(), traitSet, inputs, all); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - return delegate.buildBeamPipeline(inputPCollections, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java deleted file mode 100644 index 8f81038..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.transform.BeamSqlProjectFn; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; - -/** - * BeamRelNode to replace a {@code Project} node. - * - */ -public class BeamProjectRel extends Project implements BeamRelNode { - - /** - * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. - * - */ - public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, - List<? extends RexNode> projects, RelDataType rowType) { - super(cluster, traits, input, projects, rowType); - } - - @Override - public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, - RelDataType rowType) { - return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - - BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - - PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo - .of(new BeamSqlProjectFn(getRelTypeName(), executor, - CalciteUtils.toBeamRowType(rowType)))); - projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return projectStream; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java deleted file mode 100644 index 80a4b84..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.rel.RelNode; - -/** - * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. - */ -public interface BeamRelNode extends RelNode { - - /** - * A {@link BeamRelNode} is a recursive structure, the - * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) - * algorithm. - */ - PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) - throws Exception; -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java deleted file mode 100644 index 7f80eb0..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.transform.BeamSetOperatorsTransforms; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.rel.RelNode; - -/** - * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} - * and {@code BeamMinusRel}. - */ -public class BeamSetOperatorRelBase { - /** - * Set operator type. - */ - public enum OpType implements Serializable { - UNION, - INTERSECT, - MINUS - } - - private BeamRelNode beamRelNode; - private List<RelNode> inputs; - private boolean all; - private OpType opType; - - public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, - List<RelNode> inputs, boolean all) { - this.beamRelNode = beamRelNode; - this.opType = opType; - this.inputs = inputs; - this.all = all; - } - - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) - .buildBeamPipeline(inputPCollections, sqlEnv); - PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) - .buildBeamPipeline(inputPCollections, sqlEnv); - - WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); - WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); - if (!leftWindow.isCompatible(rightWindow)) { - throw new IllegalArgumentException( - "inputs of " + opType + " have different window strategy: " - + leftWindow + " VS " + rightWindow); - } - - final TupleTag<BeamSqlRow> leftTag = new TupleTag<>(); - final TupleTag<BeamSqlRow> rightTag = new TupleTag<>(); - - // co-group - String stageName = BeamSqlRelUtils.getStageName(beamRelNode); - PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple - .of(leftTag, leftRows.apply( - stageName + "_CreateLeftIndex", MapElements.via( - new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) - .and(rightTag, rightRows.apply( - stageName + "_CreateRightIndex", MapElements.via( - new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) - .apply(CoGroupByKey.<BeamSqlRow>create()); - PCollection<BeamSqlRow> ret = coGbkResultCollection - .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, - opType, all))); - return ret; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java deleted file mode 100644 index 363c0a9..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.io.Serializable; -import java.lang.reflect.Type; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Top; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelCollation; -import org.apache.calcite.rel.RelCollationImpl; -import org.apache.calcite.rel.RelFieldCollation; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamRelNode} to replace a {@code Sort} node. - * - * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement - * the {@code Sort} algebra. The following types of ORDER BY are supported: - - * <pre>{@code - * select * from t order by id desc limit 10; - * select * from t order by id desc limit 10, 5; - * }</pre> - * - * <p>but Order BY without a limit is NOT supported: - * - * <pre>{@code - * select * from t order by id desc - * }</pre> - * - * <h3>Constraints</h3> - * <ul> - * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT` - * must fit into the memory of a single machine.</li> - * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`, - * it does not make much sense to use `ORDER BY` with `WINDOW`. - * </li> - * </ul> - */ -public class BeamSortRel extends Sort implements BeamRelNode { - private List<Integer> fieldIndices = new ArrayList<>(); - private List<Boolean> orientation = new ArrayList<>(); - private List<Boolean> nullsFirst = new ArrayList<>(); - - private int startIndex = 0; - private int count; - - public BeamSortRel( - RelOptCluster cluster, - RelTraitSet traits, - RelNode child, - RelCollation collation, - RexNode offset, - RexNode fetch) { - super(cluster, traits, child, collation, offset, fetch); - - List<RexNode> fieldExps = getChildExps(); - RelCollationImpl collationImpl = (RelCollationImpl) collation; - List<RelFieldCollation> collations = collationImpl.getFieldCollations(); - for (int i = 0; i < fieldExps.size(); i++) { - RexNode fieldExp = fieldExps.get(i); - RexInputRef inputRef = (RexInputRef) fieldExp; - fieldIndices.add(inputRef.getIndex()); - orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); - - RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; - if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { - rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); - } - nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); - } - - if (fetch == null) { - throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!"); - } - - RexLiteral fetchLiteral = (RexLiteral) fetch; - count = ((BigDecimal) fetchLiteral.getValue()).intValue(); - - if (offset != null) { - RexLiteral offsetLiteral = (RexLiteral) offset; - startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); - } - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input) - .buildBeamPipeline(inputPCollections, sqlEnv); - Type windowType = upstream.getWindowingStrategy().getWindowFn() - .getWindowTypeDescriptor().getType(); - if (!windowType.equals(GlobalWindow.class)) { - throw new UnsupportedOperationException( - "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType); - } - - BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation, - nullsFirst); - // first find the top (offset + count) - PCollection<List<BeamSqlRow>> rawStream = - upstream.apply("extractTopOffsetAndFetch", - Top.of(startIndex + count, comparator).withoutDefaults()) - .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); - - // strip the `leading offset` - if (startIndex > 0) { - rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( - new SubListFn<BeamSqlRow>(startIndex, startIndex + count))) - .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); - } - - PCollection<BeamSqlRow> orderedStream = rawStream.apply( - "flatten", Flatten.<BeamSqlRow>iterables()); - orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return orderedStream; - } - - private static class SubListFn<T> extends DoFn<List<T>, List<T>> { - private int startIndex; - private int endIndex; - - public SubListFn(int startIndex, int endIndex) { - this.startIndex = startIndex; - this.endIndex = endIndex; - } - - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().subList(startIndex, endIndex)); - } - } - - @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, - RexNode offset, RexNode fetch) { - return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); - } - - private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable { - private List<Integer> fieldsIndices; - private List<Boolean> orientation; - private List<Boolean> nullsFirst; - - public BeamSqlRowComparator(List<Integer> fieldsIndices, - List<Boolean> orientation, - List<Boolean> nullsFirst) { - this.fieldsIndices = fieldsIndices; - this.orientation = orientation; - this.nullsFirst = nullsFirst; - } - - @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) { - for (int i = 0; i < fieldsIndices.size(); i++) { - int fieldIndex = fieldsIndices.get(i); - int fieldRet = 0; - SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); - // whether NULL should be ordered first or last(compared to non-null values) depends on - // what user specified in SQL(NULLS FIRST/NULLS LAST) - if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { - continue; - } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) { - fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1); - } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { - fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1); - } else { - switch (fieldType) { - case TINYINT: - fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex)); - break; - case SMALLINT: - fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex)); - break; - case INTEGER: - fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex)); - break; - case BIGINT: - fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex)); - break; - case FLOAT: - fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex)); - break; - case DOUBLE: - fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex)); - break; - case VARCHAR: - fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex)); - break; - case DATE: - fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex)); - break; - default: - throw new UnsupportedOperationException( - "Data type: " + fieldType + " not supported yet!"); - } - } - - fieldRet *= (orientation.get(i) ? -1 : 1); - if (fieldRet != 0) { - return fieldRet; - } - } - return 0; - } - } - - public static <T extends Number & Comparable> int numberCompare(T a, T b) { - return a.compareTo(b); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java deleted file mode 100644 index cc503d0..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.volcano.RelSubset; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.sql.SqlExplainLevel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for {@code BeamRelNode}. - */ -class BeamSqlRelUtils { - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class); - - private static final AtomicInteger sequence = new AtomicInteger(0); - private static final AtomicInteger classSequence = new AtomicInteger(0); - - public static String getStageName(BeamRelNode relNode) { - return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" - + sequence.getAndIncrement(); - } - - public static String getClassName(BeamRelNode relNode) { - return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() - + "_" + classSequence.getAndIncrement(); - } - - public static BeamRelNode getBeamRelInput(RelNode input) { - if (input instanceof RelSubset) { - // go with known best input - input = ((RelSubset) input).getBest(); - } - return (BeamRelNode) input; - } - - public static String explain(final RelNode rel) { - return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); - } - - public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { - String explain = ""; - try { - explain = RelOptUtil.toString(rel); - } catch (StackOverflowError e) { - LOG.error("StackOverflowError occurred while extracting plan. " - + "Please report it to the dev@ mailing list."); - LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); - LOG.error("Forcing plan to empty string and continue... " - + "SQL Runner may not working properly after."); - } - return explain; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java deleted file mode 100644 index 695521d..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelInput; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.core.Union; - -/** - * {@link BeamRelNode} to replace a {@link Union}. - * - * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL - * perspective, two cases are supported: - * - * <p>1) Do not use {@code grouped window function}: - * - * <pre>{@code - * select * from person UNION select * from person - * }</pre> - * - * <p>2) Use the same {@code grouped window function}, with the same param: - * <pre>{@code - * select id, count(*) from person - * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) - * UNION - * select * from person - * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) - * }</pre> - * - * <p>Inputs with different group functions are NOT supported: - * <pre>{@code - * select id, count(*) from person - * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) - * UNION - * select * from person - * group by id, TUMBLE(order_time, INTERVAL '2' HOUR) - * }</pre> - */ -public class BeamUnionRel extends Union implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamUnionRel(RelOptCluster cluster, - RelTraitSet traits, - List<RelNode> inputs, - boolean all) { - super(cluster, traits, inputs, all); - this.delegate = new BeamSetOperatorRelBase(this, - BeamSetOperatorRelBase.OpType.UNION, - inputs, all); - } - - public BeamUnionRel(RelInput input) { - super(input); - } - - @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { - return new BeamUnionRel(getCluster(), traitSet, inputs, all); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - return delegate.buildBeamPipeline(inputPCollections, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java deleted file mode 100644 index f3bf3a3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.core.Values; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexLiteral; - -/** - * {@code BeamRelNode} to replace a {@code Values} node. - * - * <p>{@code BeamValuesRel} will be used in the following SQLs: - * <ul> - * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li> - * <li>{@code select 1, '1', LOCALTIME}</li> - * </ul> - */ -public class BeamValuesRel extends Values implements BeamRelNode { - - public BeamValuesRel( - RelOptCluster cluster, - RelDataType rowType, - ImmutableList<ImmutableList<RexLiteral>> tuples, - RelTraitSet traits) { - super(cluster, rowType, tuples, traits); - - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - List<BeamSqlRow> rows = new ArrayList<>(tuples.size()); - String stageName = BeamSqlRelUtils.getStageName(this); - if (tuples.isEmpty()) { - throw new IllegalStateException("Values with empty tuples!"); - } - - BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); - for (ImmutableList<RexLiteral> tuple : tuples) { - BeamSqlRow row = new BeamSqlRow(beamSQLRowType); - for (int i = 0; i < tuple.size(); i++) { - BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); - } - rows.add(row); - } - - return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) - .setCoder(new BeamSqlRowCoder(beamSQLRowType)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java deleted file mode 100644 index fb0a8e2..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. - * - */ -package org.apache.beam.sdk.extensions.sql.rel; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java deleted file mode 100644 index 17e3f80..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rule; - -import com.google.common.collect.ImmutableList; -import java.util.GregorianCalendar; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.rel.BeamAggregationRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptRuleOperand; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.tools.RelBuilderFactory; -import org.apache.calcite.util.ImmutableBitSet; -import org.joda.time.Duration; - -/** - * Rule to detect the window/trigger settings. - * - */ -public class BeamAggregationRule extends RelOptRule { - public static final BeamAggregationRule INSTANCE = - new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); - - public BeamAggregationRule( - Class<? extends Aggregate> aggregateClass, - Class<? extends Project> projectClass, - RelBuilderFactory relBuilderFactory) { - super( - operand(aggregateClass, - operand(projectClass, any())), - relBuilderFactory, null); - } - - public BeamAggregationRule(RelOptRuleOperand operand, String description) { - super(operand, description); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final Project project = call.rel(1); - updateWindowTrigger(call, aggregate, project); - } - - private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, - Project project) { - ImmutableBitSet groupByFields = aggregate.getGroupSet(); - List<RexNode> projectMapping = project.getProjects(); - - WindowFn windowFn = new GlobalWindows(); - Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow()); - int windowFieldIdx = -1; - Duration allowedLatence = Duration.ZERO; - - for (int groupField : groupByFields.asList()) { - RexNode projNode = projectMapping.get(groupField); - if (projNode instanceof RexCall) { - SqlOperator op = ((RexCall) projNode).op; - ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; - String functionName = op.getName(); - switch (functionName) { - case "TUMBLE": - windowFieldIdx = groupField; - windowFn = FixedWindows - .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); - if (parameters.size() == 3) { - GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) - .getValue(); - triggerFn = createTriggerWithDelay(delayTime); - allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); - } - break; - case "HOP": - windowFieldIdx = groupField; - windowFn = SlidingWindows - .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))) - .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2)))); - if (parameters.size() == 4) { - GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) - .getValue(); - triggerFn = createTriggerWithDelay(delayTime); - allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); - } - break; - case "SESSION": - windowFieldIdx = groupField; - windowFn = Sessions - .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); - if (parameters.size() == 3) { - GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) - .getValue(); - triggerFn = createTriggerWithDelay(delayTime); - allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); - } - break; - default: - break; - } - } - } - - BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(), - aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(aggregate.getInput(), - aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - aggregate.indicator, - aggregate.getGroupSet(), - aggregate.getGroupSets(), - aggregate.getAggCallList(), - windowFn, - triggerFn, - windowFieldIdx, - allowedLatence); - call.transformTo(newAggregator); - } - - private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { - return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime - .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis())))); - } - - private long getWindowParameterAsMillis(RexNode parameterNode) { - if (parameterNode instanceof RexLiteral) { - return RexLiteral.intValue(parameterNode); - } else { - throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java deleted file mode 100644 index b30a9d9..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rule; - -import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.logical.LogicalFilter; - -/** - * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. - * - */ -public class BeamFilterRule extends ConverterRule { - public static final BeamFilterRule INSTANCE = new BeamFilterRule(); - - private BeamFilterRule() { - super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final Filter filter = (Filter) rel; - final RelNode input = filter.getInput(); - - return new BeamFilterRel(filter.getCluster(), - filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - filter.getCondition()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java deleted file mode 100644 index 54079b0..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rule; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.rel.BeamIOSinkRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.TableModify; -import org.apache.calcite.rel.logical.LogicalTableModify; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.Table; - -/** - * A {@code ConverterRule} to replace {@link TableModify} with - * {@link BeamIOSinkRel}. - * - */ -public class BeamIOSinkRule extends ConverterRule { - public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); - - private BeamIOSinkRule() { - super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamIOSinkRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final TableModify tableModify = (TableModify) rel; - final RelNode input = tableModify.getInput(); - - final RelOptCluster cluster = tableModify.getCluster(); - final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); - final RelOptTable relOptTable = tableModify.getTable(); - final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); - final RelNode convertedInput = convert(input, - input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); - final TableModify.Operation operation = tableModify.getOperation(); - final List<String> updateColumnList = tableModify.getUpdateColumnList(); - final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); - final boolean flattened = tableModify.isFlattened(); - - final Table table = tableModify.getTable().unwrap(Table.class); - - switch (table.getJdbcTableType()) { - case TABLE: - case STREAM: - if (operation != TableModify.Operation.INSERT) { - throw new UnsupportedOperationException( - String.format("Streams doesn't support %s modify operation", operation)); - } - return new BeamIOSinkRel(cluster, traitSet, - relOptTable, catalogReader, convertedInput, operation, updateColumnList, - sourceExpressionList, flattened); - default: - throw new IllegalArgumentException( - String.format("Unsupported table type: %s", table.getJdbcTableType())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java deleted file mode 100644 index 496b977..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rule; - -import org.apache.beam.sdk.extensions.sql.rel.BeamIOSourceRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.logical.LogicalTableScan; - -/** - * A {@code ConverterRule} to replace {@link TableScan} with - * {@link BeamIOSourceRel}. - * - */ -public class BeamIOSourceRule extends ConverterRule { - public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); - - private BeamIOSourceRule() { - super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamIOSourceRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final TableScan scan = (TableScan) rel; - - return new BeamIOSourceRel(scan.getCluster(), - scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java deleted file mode 100644 index 6fdbd9b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rule; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.rel.BeamIntersectRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Intersect; -import org.apache.calcite.rel.logical.LogicalIntersect; - -/** - * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. - */ -public class BeamIntersectRule extends ConverterRule { - public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); - private BeamIntersectRule() { - super(LogicalIntersect.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); - } - - @Override public RelNode convert(RelNode rel) { - Intersect intersect = (Intersect) rel; - final List<RelNode> inputs = intersect.getInputs(); - return new BeamIntersectRel( - intersect.getCluster(), - intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convertList(inputs, BeamLogicalConvention.INSTANCE), - intersect.all - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java deleted file mode 100644 index 147932e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rule; - -import org.apache.beam.sdk.extensions.sql.rel.BeamJoinRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.logical.LogicalJoin; - -/** - * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. - */ -public class BeamJoinRule extends ConverterRule { - public static final BeamJoinRule INSTANCE = new BeamJoinRule(); - private BeamJoinRule() { - super(LogicalJoin.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamJoinRule"); - } - - @Override public RelNode convert(RelNode rel) { - Join join = (Join) rel; - return new BeamJoinRel( - join.getCluster(), - join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(join.getLeft(), - join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - convert(join.getRight(), - join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - join.getCondition(), - join.getVariablesSet(), - join.getJoinType() - ); - } -}
