http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/package-info.java new file mode 100644 index 0000000..76b335d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. + * + */ +package org.apache.beam.sdk.extensions.sql.impl.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java new file mode 100644 index 0000000..cdf6712 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import com.google.common.collect.ImmutableList; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.joda.time.Duration; + +/** + * Rule to detect the window/trigger settings. + * + */ +public class BeamAggregationRule extends RelOptRule { + public static final BeamAggregationRule INSTANCE = + new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); + + public BeamAggregationRule( + Class<? extends Aggregate> aggregateClass, + Class<? extends Project> projectClass, + RelBuilderFactory relBuilderFactory) { + super( + operand(aggregateClass, + operand(projectClass, any())), + relBuilderFactory, null); + } + + public BeamAggregationRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate aggregate = call.rel(0); + final Project project = call.rel(1); + updateWindowTrigger(call, aggregate, project); + } + + private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, + Project project) { + ImmutableBitSet groupByFields = aggregate.getGroupSet(); + List<RexNode> projectMapping = project.getProjects(); + + WindowFn windowFn = new GlobalWindows(); + Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow()); + int windowFieldIdx = -1; + Duration allowedLatence = Duration.ZERO; + + for (int groupField : groupByFields.asList()) { + RexNode projNode = projectMapping.get(groupField); + if (projNode instanceof RexCall) { + SqlOperator op = ((RexCall) projNode).op; + ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; + String functionName = op.getName(); + switch (functionName) { + case "TUMBLE": + windowFieldIdx = groupField; + windowFn = FixedWindows + .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + case "HOP": + windowFieldIdx = groupField; + windowFn = SlidingWindows + .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))) + .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2)))); + if (parameters.size() == 4) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + case "SESSION": + windowFieldIdx = groupField; + windowFn = Sessions + .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + default: + break; + } + } + } + + BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(), + aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(aggregate.getInput(), + aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList(), + windowFn, + triggerFn, + windowFieldIdx, + allowedLatence); + call.transformTo(newAggregator); + } + + private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { + return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime + .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis())))); + } + + private long getWindowParameterAsMillis(RexNode parameterNode) { + if (parameterNode instanceof RexLiteral) { + return RexLiteral.intValue(parameterNode); + } else { + throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamFilterRule.java new file mode 100644 index 0000000..bc25085 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamFilterRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; + +/** + * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. + * + */ +public class BeamFilterRule extends ConverterRule { + public static final BeamFilterRule INSTANCE = new BeamFilterRule(); + + private BeamFilterRule() { + super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + return new BeamFilterRel(filter.getCluster(), + filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + filter.getCondition()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java new file mode 100644 index 0000000..77f4bdd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; + +/** + * A {@code ConverterRule} to replace {@link TableModify} with + * {@link BeamIOSinkRel}. + * + */ +public class BeamIOSinkRule extends ConverterRule { + public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); + + private BeamIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new BeamIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java new file mode 100644 index 0000000..a257d3d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; + +/** + * A {@code ConverterRule} to replace {@link TableScan} with + * {@link BeamIOSourceRel}. + * + */ +public class BeamIOSourceRule extends ConverterRule { + public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); + + private BeamIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new BeamIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIntersectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIntersectRule.java new file mode 100644 index 0000000..03d7129 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIntersectRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIntersectRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; + +/** + * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. + */ +public class BeamIntersectRule extends ConverterRule { + public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); + private BeamIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); + } + + @Override public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List<RelNode> inputs = intersect.getInputs(); + return new BeamIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + intersect.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java new file mode 100644 index 0000000..4d9dd20 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; + +/** + * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. + */ +public class BeamJoinRule extends ConverterRule { + public static final BeamJoinRule INSTANCE = new BeamJoinRule(); + private BeamJoinRule() { + super(LogicalJoin.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamJoinRule"); + } + + @Override public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new BeamJoinRel( + join.getCluster(), + join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamMinusRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamMinusRule.java new file mode 100644 index 0000000..9efdf70 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamMinusRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamMinusRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; + +/** + * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}. + */ +public class BeamMinusRule extends ConverterRule { + public static final BeamMinusRule INSTANCE = new BeamMinusRule(); + private BeamMinusRule() { + super(LogicalMinus.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamMinusRule"); + } + + @Override public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List<RelNode> inputs = minus.getInputs(); + return new BeamMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + minus.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java new file mode 100644 index 0000000..d19a01d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; + +/** + * A {@code ConverterRule} to replace {@link Project} with + * {@link BeamProjectRel}. + * + */ +public class BeamProjectRule extends ConverterRule { + public static final BeamProjectRule INSTANCE = new BeamProjectRule(); + + private BeamProjectRule() { + super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new BeamProjectRel(project.getCluster(), + project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSortRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSortRule.java new file mode 100644 index 0000000..36a7c1b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSortRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSortRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; + +/** + * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}. + */ +public class BeamSortRule extends ConverterRule { + public static final BeamSortRule INSTANCE = new BeamSortRule(); + private BeamSortRule() { + super(LogicalSort.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamSortRule"); + } + + @Override public RelNode convert(RelNode rel) { + Sort sort = (Sort) rel; + final RelNode input = sort.getInput(); + return new BeamSortRel( + sort.getCluster(), + sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + sort.getCollation(), + sort.offset, + sort.fetch + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamUnionRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamUnionRule.java new file mode 100644 index 0000000..6065b72 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamUnionRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnionRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; + +/** + * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with + * {@link BeamUnionRule}. + */ +public class BeamUnionRule extends ConverterRule { + public static final BeamUnionRule INSTANCE = new BeamUnionRule(); + private BeamUnionRule() { + super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamUnionRule"); + } + + @Override public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new BeamUnionRel( + union.getCluster(), + union.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(union.getInputs(), BeamLogicalConvention.INSTANCE), + union.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamValuesRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamValuesRule.java new file mode 100644 index 0000000..b5dc30c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamValuesRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamValuesRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalValues; + +/** + * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}. + */ +public class BeamValuesRule extends ConverterRule { + public static final BeamValuesRule INSTANCE = new BeamValuesRule(); + private BeamValuesRule() { + super(LogicalValues.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamValuesRule"); + } + + @Override public RelNode convert(RelNode rel) { + Values values = (Values) rel; + return new BeamValuesRel( + values.getCluster(), + values.getRowType(), + values.getTuples(), + values.getTraitSet().replace(BeamLogicalConvention.INSTANCE) + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/package-info.java new file mode 100644 index 0000000..fa32b44 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.calcite.plan.RelOptRule} to generate + * {@link org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode}. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java new file mode 100644 index 0000000..095875f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.schema.impl.AggregateFunctionImpl; +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; +import org.apache.calcite.util.ImmutableBitSet; +import org.joda.time.Instant; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation. + */ +public class BeamAggregationTransforms implements Serializable{ + /** + * Merge KV to single record. + */ + public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { + private BeamSqlRowType outRowType; + private List<String> aggFieldNames; + private int windowStartFieldIdx; + + public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList + , int windowStartFieldIdx) { + this.outRowType = outRowType; + this.aggFieldNames = new ArrayList<>(); + for (AggregateCall ac : aggList) { + aggFieldNames.add(ac.getName()); + } + this.windowStartFieldIdx = windowStartFieldIdx; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + BeamSqlRow outRecord = new BeamSqlRow(outRowType); + outRecord.updateWindowRange(c.element().getKey(), window); + + KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element(); + for (String f : kvRecord.getKey().getDataType().getFieldsName()) { + outRecord.addField(f, kvRecord.getKey().getFieldValue(f)); + } + for (int idx = 0; idx < aggFieldNames.size(); ++idx) { + outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx)); + } + if (windowStartFieldIdx != -1) { + outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate()); + } + + c.output(outRecord); + } + } + + /** + * extract group-by fields. + */ + public static class AggregationGroupByKeyFn + implements SerializableFunction<BeamSqlRow, BeamSqlRow> { + private List<Integer> groupByKeys; + + public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) { + this.groupByKeys = new ArrayList<>(); + for (int i : groupSet.asList()) { + if (i != windowFieldIdx) { + groupByKeys.add(i); + } + } + } + + @Override + public BeamSqlRow apply(BeamSqlRow input) { + BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType()); + BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey); + keyOfRecord.updateWindowRange(input, null); + + for (int idx = 0; idx < groupByKeys.size(); ++idx) { + keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx))); + } + return keyOfRecord; + } + + private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) { + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (int idx : groupByKeys) { + fieldNames.add(dataType.getFieldsName().get(idx)); + fieldTypes.add(dataType.getFieldsType().get(idx)); + } + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + } + + /** + * Assign event timestamp. + */ + public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> { + private int windowFieldIdx = -1; + + public WindowTimestampFn(int windowFieldIdx) { + super(); + this.windowFieldIdx = windowFieldIdx; + } + + @Override + public Instant apply(BeamSqlRow input) { + return new Instant(input.getDate(windowFieldIdx).getTime()); + } + } + + /** + * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}. + */ + public static class AggregationAdaptor + extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> { + private List<BeamSqlUdaf> aggregators; + private List<BeamSqlExpression> sourceFieldExps; + private BeamSqlRowType finalRowType; + + public AggregationAdaptor(List<AggregateCall> aggregationCalls, + BeamSqlRowType sourceRowType) { + aggregators = new ArrayList<>(); + sourceFieldExps = new ArrayList<>(); + List<String> outFieldsName = new ArrayList<>(); + List<Integer> outFieldsType = new ArrayList<>(); + for (AggregateCall call : aggregationCalls) { + int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0; + BeamSqlExpression sourceExp = new BeamSqlInputRefExpression( + CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex); + sourceFieldExps.add(sourceExp); + + outFieldsName.add(call.name); + int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName()); + outFieldsType.add(outFieldType); + + switch (call.getAggregation().getName()) { + case "COUNT": + aggregators.add(new BeamBuiltinAggregations.Count()); + break; + case "MAX": + aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName())); + break; + case "MIN": + aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName())); + break; + case "SUM": + aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName())); + break; + case "AVG": + aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName())); + break; + default: + if (call.getAggregation() instanceof SqlUserDefinedAggFunction) { + // handle UDAF. + SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation(); + AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function; + try { + aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } else { + throw new UnsupportedOperationException( + String.format("Aggregator [%s] is not supported", + call.getAggregation().getName())); + } + break; + } + } + finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType); + } + @Override + public AggregationAccumulator createAccumulator() { + AggregationAccumulator initialAccu = new AggregationAccumulator(); + for (BeamSqlUdaf agg : aggregators) { + initialAccu.accumulatorElements.add(agg.init()); + } + return initialAccu; + } + @Override + public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) { + AggregationAccumulator deltaAcc = new AggregationAccumulator(); + for (int idx = 0; idx < aggregators.size(); ++idx) { + deltaAcc.accumulatorElements.add( + aggregators.get(idx).add(accumulator.accumulatorElements.get(idx), + sourceFieldExps.get(idx).evaluate(input).getValue())); + } + return deltaAcc; + } + @Override + public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) { + AggregationAccumulator deltaAcc = new AggregationAccumulator(); + for (int idx = 0; idx < aggregators.size(); ++idx) { + List accs = new ArrayList<>(); + Iterator<AggregationAccumulator> ite = accumulators.iterator(); + while (ite.hasNext()) { + accs.add(ite.next().accumulatorElements.get(idx)); + } + deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs)); + } + return deltaAcc; + } + @Override + public BeamSqlRow extractOutput(AggregationAccumulator accumulator) { + BeamSqlRow result = new BeamSqlRow(finalRowType); + for (int idx = 0; idx < aggregators.size(); ++idx) { + result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); + } + return result; + } + @Override + public Coder<AggregationAccumulator> getAccumulatorCoder( + CoderRegistry registry, Coder<BeamSqlRow> inputCoder) + throws CannotProvideCoderException { + registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of()); + List<Coder> aggAccuCoderList = new ArrayList<>(); + for (BeamSqlUdaf udaf : aggregators) { + aggAccuCoderList.add(udaf.getAccumulatorCoder(registry)); + } + return new AggregationAccumulatorCoder(aggAccuCoderList); + } + } + + /** + * A class to holder varied accumulator objects. + */ + public static class AggregationAccumulator{ + private List accumulatorElements = new ArrayList<>(); + } + + /** + * Coder for {@link AggregationAccumulator}. + */ + public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{ + private VarIntCoder sizeCoder = VarIntCoder.of(); + private List<Coder> elementCoders; + + public AggregationAccumulatorCoder(List<Coder> elementCoders) { + this.elementCoders = elementCoders; + } + + @Override + public void encode(AggregationAccumulator value, OutputStream outStream) + throws CoderException, IOException { + sizeCoder.encode(value.accumulatorElements.size(), outStream); + for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) { + elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream); + } + } + + @Override + public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException { + AggregationAccumulator accu = new AggregationAccumulator(); + int size = sizeCoder.decode(inStream); + for (int idx = 0; idx < size; ++idx) { + accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream)); + } + return accu; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java new file mode 100644 index 0000000..1fc8cf6 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.Iterator; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; +import org.apache.beam.sdk.values.KV; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG. + */ +class BeamBuiltinAggregations { + /** + * Built-in aggregation for COUNT. + */ + public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> { + public Count() {} + + @Override + public Long init() { + return 0L; + } + + @Override + public Long add(Long accumulator, T input) { + return accumulator + 1; + } + + @Override + public Long merge(Iterable<Long> accumulators) { + long v = 0L; + Iterator<Long> ite = accumulators.iterator(); + while (ite.hasNext()) { + v += ite.next(); + } + return v; + } + + @Override + public Long result(Long accumulator) { + return accumulator; + } + } + + /** + * Built-in aggregation for MAX. + */ + public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> { + public static Max create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Max<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Max<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Max<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Max<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Max<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Max<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Max<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in MAX", fieldType)); + } + } + + private final SqlTypeName fieldType; + private Max(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public T init() { + return null; + } + + @Override + public T add(T accumulator, T input) { + return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator; + } + + @Override + public T merge(Iterable<T> accumulators) { + Iterator<T> ite = accumulators.iterator(); + T mergedV = ite.next(); + while (ite.hasNext()) { + T v = ite.next(); + mergedV = mergedV.compareTo(v) > 0 ? mergedV : v; + } + return mergedV; + } + + @Override + public T result(T accumulator) { + return accumulator; + } + + @Override + public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { + return BeamBuiltinAggregations.getSqlTypeCoder(fieldType); + } + } + + /** + * Built-in aggregation for MIN. + */ + public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> { + public static Min create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Min<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Min<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Min<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Min<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Min<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Min<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Min<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in MIN", fieldType)); + } + } + + private final SqlTypeName fieldType; + private Min(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public T init() { + return null; + } + + @Override + public T add(T accumulator, T input) { + return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator; + } + + @Override + public T merge(Iterable<T> accumulators) { + Iterator<T> ite = accumulators.iterator(); + T mergedV = ite.next(); + while (ite.hasNext()) { + T v = ite.next(); + mergedV = mergedV.compareTo(v) < 0 ? mergedV : v; + } + return mergedV; + } + + @Override + public T result(T accumulator) { + return accumulator; + } + + @Override + public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { + return BeamBuiltinAggregations.getSqlTypeCoder(fieldType); + } + } + + /** + * Built-in aggregation for SUM. + */ + public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> { + public static Sum create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Sum<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Sum<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Sum<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Sum<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Sum<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Sum<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Sum<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in SUM", fieldType)); + } + } + + private SqlTypeName fieldType; + private Sum(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public BigDecimal init() { + return new BigDecimal(0); + } + + @Override + public BigDecimal add(BigDecimal accumulator, T input) { + return accumulator.add(new BigDecimal(input.toString())); + } + + @Override + public BigDecimal merge(Iterable<BigDecimal> accumulators) { + BigDecimal v = new BigDecimal(0); + Iterator<BigDecimal> ite = accumulators.iterator(); + while (ite.hasNext()) { + v = v.add(ite.next()); + } + return v; + } + + @Override + public T result(BigDecimal accumulator) { + Object result = null; + switch (fieldType) { + case INTEGER: + result = accumulator.intValue(); + break; + case BIGINT: + result = accumulator.longValue(); + break; + case SMALLINT: + result = accumulator.shortValue(); + break; + case TINYINT: + result = accumulator.byteValue(); + break; + case DOUBLE: + result = accumulator.doubleValue(); + break; + case FLOAT: + result = accumulator.floatValue(); + break; + case DECIMAL: + result = accumulator; + break; + default: + break; + } + return (T) result; + } + } + + /** + * Built-in aggregation for AVG. + */ + public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> { + public static Avg create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Avg<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Avg<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Avg<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Avg<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Avg<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Avg<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Avg<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in AVG", fieldType)); + } + } + + private SqlTypeName fieldType; + private Avg(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public KV<BigDecimal, Long> init() { + return KV.of(new BigDecimal(0), 0L); + } + + @Override + public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) { + return KV.of( + accumulator.getKey().add(new BigDecimal(input.toString())), + accumulator.getValue() + 1); + } + + @Override + public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) { + BigDecimal v = new BigDecimal(0); + long s = 0; + Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator(); + while (ite.hasNext()) { + KV<BigDecimal, Long> r = ite.next(); + v = v.add(r.getKey()); + s += r.getValue(); + } + return KV.of(v, s); + } + + @Override + public T result(KV<BigDecimal, Long> accumulator) { + BigDecimal decimalAvg = accumulator.getKey().divide( + new BigDecimal(accumulator.getValue())); + Object result = null; + switch (fieldType) { + case INTEGER: + result = decimalAvg.intValue(); + break; + case BIGINT: + result = decimalAvg.longValue(); + break; + case SMALLINT: + result = decimalAvg.shortValue(); + break; + case TINYINT: + result = decimalAvg.byteValue(); + break; + case DOUBLE: + result = decimalAvg.doubleValue(); + break; + case FLOAT: + result = decimalAvg.floatValue(); + break; + case DECIMAL: + result = decimalAvg; + break; + default: + break; + } + return (T) result; + } + + @Override + public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry) + throws CannotProvideCoderException { + return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of()); + } + } + + /** + * Find {@link Coder} for Beam SQL field types. + */ + private static Coder getSqlTypeCoder(SqlTypeName sqlType) { + switch (sqlType) { + case INTEGER: + return VarIntCoder.of(); + case SMALLINT: + return SerializableCoder.of(Short.class); + case TINYINT: + return ByteCoder.of(); + case BIGINT: + return VarLongCoder.of(); + case FLOAT: + return SerializableCoder.of(Float.class); + case DOUBLE: + return DoubleCoder.of(); + case TIMESTAMP: + return SerializableCoder.of(Date.class); + case DECIMAL: + return BigDecimalCoder.of(); + default: + throw new UnsupportedOperationException( + String.format("Cannot find a Coder for data type [%s]", sqlType)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java new file mode 100644 index 0000000..e0898d1 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.util.Pair; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. + */ +public class BeamJoinTransforms { + + /** + * A {@code SimpleFunction} to extract join fields from the specified row. + */ + public static class ExtractJoinFields + extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + private final boolean isLeft; + private final List<Pair<Integer, Integer>> joinColumns; + + public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) { + this.isLeft = isLeft; + this.joinColumns = joinColumns; + } + + @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + // build the type + // the name of the join field is not important + List<String> names = new ArrayList<>(joinColumns.size()); + List<Integer> types = new ArrayList<>(joinColumns.size()); + for (int i = 0; i < joinColumns.size(); i++) { + names.add("c" + i); + types.add(isLeft + ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : + input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); + } + BeamSqlRowType type = BeamSqlRowType.create(names, types); + + // build the row + BeamSqlRow row = new BeamSqlRow(type); + for (int i = 0; i < joinColumns.size(); i++) { + row.addField(i, input + .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); + } + return KV.of(row, input); + } + } + + + /** + * A {@code DoFn} which implement the sideInput-JOIN. + */ + public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { + private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView; + private final JoinRelType joinType; + private final BeamSqlRow rightNullRow; + private final boolean swap; + + public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, + PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView, + boolean swap) { + this.joinType = joinType; + this.rightNullRow = rightNullRow; + this.sideInputView = sideInputView; + this.swap = swap; + } + + @ProcessElement public void processElement(ProcessContext context) { + BeamSqlRow key = context.element().getKey(); + BeamSqlRow leftRow = context.element().getValue(); + Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView); + Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key); + + if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { + Iterator<BeamSqlRow> it = rightRowsIterable.iterator(); + while (it.hasNext()) { + context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap)); + } + } else { + if (joinType == JoinRelType.LEFT) { + context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap)); + } + } + } + } + + + /** + * A {@code SimpleFunction} to combine two rows into one. + */ + public static class JoinParts2WholeRow + extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> { + @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) { + KV<BeamSqlRow, BeamSqlRow> parts = input.getValue(); + BeamSqlRow leftRow = parts.getKey(); + BeamSqlRow rightRow = parts.getValue(); + return combineTwoRowsIntoOne(leftRow, rightRow, false); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, + BeamSqlRow rightRow, boolean swap) { + if (swap) { + return combineTwoRowsIntoOneHelper(rightRow, leftRow); + } else { + return combineTwoRowsIntoOneHelper(leftRow, rightRow); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow, + BeamSqlRow rightRow) { + // build the type + List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); + names.addAll(leftRow.getDataType().getFieldsName()); + names.addAll(rightRow.getDataType().getFieldsName()); + + List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); + types.addAll(leftRow.getDataType().getFieldsType()); + types.addAll(rightRow.getDataType().getFieldsType()); + BeamSqlRowType type = BeamSqlRowType.create(names, types); + + BeamSqlRow row = new BeamSqlRow(type); + // build the row + for (int i = 0; i < leftRow.size(); i++) { + row.addField(i, leftRow.getFieldValue(i)); + } + + for (int i = 0; i < rightRow.size(); i++) { + row.addField(i + leftRow.size(), rightRow.getFieldValue(i)); + } + + return row; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java new file mode 100644 index 0000000..326b328 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import java.util.Iterator; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. + */ +public abstract class BeamSetOperatorsTransforms { + /** + * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}. + */ + public static class BeamSqlRow2KvFn extends + SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + return KV.of(input, input); + } + } + + /** + * Filter function used for Set operators. + */ + public static class SetOperatorFilteringDoFn extends + DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> { + private TupleTag<BeamSqlRow> leftTag; + private TupleTag<BeamSqlRow> rightTag; + private BeamSetOperatorRelBase.OpType opType; + // ALL? + private boolean all; + + public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag, + BeamSetOperatorRelBase.OpType opType, boolean all) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.opType = opType; + this.all = all; + } + + @ProcessElement public void processElement(ProcessContext ctx) { + CoGbkResult coGbkResult = ctx.element().getValue(); + Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag); + Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag); + switch (opType) { + case UNION: + if (all) { + // output both left & right + Iterator<BeamSqlRow> iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + iter = rightRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output the key + ctx.output(ctx.element().getKey()); + } + break; + case INTERSECT: + if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { + if (all) { + for (BeamSqlRow leftRow : leftRows) { + ctx.output(leftRow); + } + } else { + ctx.output(ctx.element().getKey()); + } + } + break; + case MINUS: + if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { + Iterator<BeamSqlRow> iter = leftRows.iterator(); + if (all) { + // output all + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output one + ctx.output(iter.next()); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java new file mode 100644 index 0000000..855de7a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step. + * + */ +public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> { + + private String stepName; + private BeamSqlExpressionExecutor executor; + + public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) { + super(); + this.stepName = stepName; + this.executor = executor; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + BeamSqlRow in = c.element(); + + List<Object> result = executor.execute(in); + + if ((Boolean) result.get(0)) { + c.output(in); + } + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java new file mode 100644 index 0000000..b40cfa6 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * A test PTransform to display output in console. + * + */ +public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> { + + private String stepName; + + public BeamSqlOutputToConsoleFn(String stepName) { + super(); + this.stepName = stepName; + } + + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println("Output: " + c.element().getDataValues()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java new file mode 100644 index 0000000..b3f7ce5 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.transform; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +/** + * + * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step. + * + */ +public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { + private String stepName; + private BeamSqlExpressionExecutor executor; + private BeamSqlRowType outputRowType; + + public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor, + BeamSqlRowType outputRowType) { + super(); + this.stepName = stepName; + this.executor = executor; + this.outputRowType = outputRowType; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + BeamSqlRow inputRow = c.element(); + List<Object> results = executor.execute(inputRow); + + BeamSqlRow outRow = new BeamSqlRow(outputRowType); + outRow.updateWindowRange(inputRow, window); + + for (int idx = 0; idx < results.size(); ++idx) { + BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx)); + } + + c.output(outRow); + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/package-info.java new file mode 100644 index 0000000..bc90e5b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline. + */ +package org.apache.beam.sdk.extensions.sql.impl.transform;
