http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java new file mode 100644 index 0000000..ba344df --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.io.Serializable; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Top; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamRelNode} to replace a {@code Sort} node. + * + * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement + * the {@code Sort} algebra. The following types of ORDER BY are supported: + + * <pre>{@code + * select * from t order by id desc limit 10; + * select * from t order by id desc limit 10, 5; + * }</pre> + * + * <p>but Order BY without a limit is NOT supported: + * + * <pre>{@code + * select * from t order by id desc + * }</pre> + * + * <h3>Constraints</h3> + * <ul> + * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT` + * must fit into the memory of a single machine.</li> + * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`, + * it does not make much sense to use `ORDER BY` with `WINDOW`. + * </li> + * </ul> + */ +public class BeamSortRel extends Sort implements BeamRelNode { + private List<Integer> fieldIndices = new ArrayList<>(); + private List<Boolean> orientation = new ArrayList<>(); + private List<Boolean> nullsFirst = new ArrayList<>(); + + private int startIndex = 0; + private int count; + + public BeamSortRel( + RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + RelCollation collation, + RexNode offset, + RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + + List<RexNode> fieldExps = getChildExps(); + RelCollationImpl collationImpl = (RelCollationImpl) collation; + List<RelFieldCollation> collations = collationImpl.getFieldCollations(); + for (int i = 0; i < fieldExps.size(); i++) { + RexNode fieldExp = fieldExps.get(i); + RexInputRef inputRef = (RexInputRef) fieldExp; + fieldIndices.add(inputRef.getIndex()); + orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); + + RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; + if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { + rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); + } + nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); + } + + if (fetch == null) { + throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!"); + } + + RexLiteral fetchLiteral = (RexLiteral) fetch; + count = ((BigDecimal) fetchLiteral.getValue()).intValue(); + + if (offset != null) { + RexLiteral offsetLiteral = (RexLiteral) offset; + startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); + } + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input) + .buildBeamPipeline(inputPCollections, sqlEnv); + Type windowType = upstream.getWindowingStrategy().getWindowFn() + .getWindowTypeDescriptor().getType(); + if (!windowType.equals(GlobalWindow.class)) { + throw new UnsupportedOperationException( + "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType); + } + + BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation, + nullsFirst); + // first find the top (offset + count) + PCollection<List<BeamSqlRow>> rawStream = + upstream.apply("extractTopOffsetAndFetch", + Top.of(startIndex + count, comparator).withoutDefaults()) + .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); + + // strip the `leading offset` + if (startIndex > 0) { + rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( + new SubListFn<BeamSqlRow>(startIndex, startIndex + count))) + .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); + } + + PCollection<BeamSqlRow> orderedStream = rawStream.apply( + "flatten", Flatten.<BeamSqlRow>iterables()); + orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return orderedStream; + } + + private static class SubListFn<T> extends DoFn<List<T>, List<T>> { + private int startIndex; + private int endIndex; + + public SubListFn(int startIndex, int endIndex) { + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().subList(startIndex, endIndex)); + } + } + + @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, + RexNode offset, RexNode fetch) { + return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable { + private List<Integer> fieldsIndices; + private List<Boolean> orientation; + private List<Boolean> nullsFirst; + + public BeamSqlRowComparator(List<Integer> fieldsIndices, + List<Boolean> orientation, + List<Boolean> nullsFirst) { + this.fieldsIndices = fieldsIndices; + this.orientation = orientation; + this.nullsFirst = nullsFirst; + } + + @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) { + for (int i = 0; i < fieldsIndices.size(); i++) { + int fieldIndex = fieldsIndices.get(i); + int fieldRet = 0; + SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); + // whether NULL should be ordered first or last(compared to non-null values) depends on + // what user specified in SQL(NULLS FIRST/NULLS LAST) + if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { + continue; + } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) { + fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1); + } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { + fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1); + } else { + switch (fieldType) { + case TINYINT: + fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex)); + break; + case SMALLINT: + fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex)); + break; + case INTEGER: + fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex)); + break; + case BIGINT: + fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex)); + break; + case FLOAT: + fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex)); + break; + case DOUBLE: + fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex)); + break; + case VARCHAR: + fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex)); + break; + case DATE: + fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex)); + break; + default: + throw new UnsupportedOperationException( + "Data type: " + fieldType + " not supported yet!"); + } + } + + fieldRet *= (orientation.get(i) ? -1 : 1); + if (fieldRet != 0) { + return fieldRet; + } + } + return 0; + } + } + + public static <T extends Number & Comparable> int numberCompare(T a, T b) { + return a.compareTo(b); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java new file mode 100644 index 0000000..9f1f703 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlExplainLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for {@code BeamRelNode}. + */ +class BeamSqlRelUtils { + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class); + + private static final AtomicInteger sequence = new AtomicInteger(0); + private static final AtomicInteger classSequence = new AtomicInteger(0); + + public static String getStageName(BeamRelNode relNode) { + return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + + sequence.getAndIncrement(); + } + + public static String getClassName(BeamRelNode relNode) { + return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + + "_" + classSequence.getAndIncrement(); + } + + public static BeamRelNode getBeamRelInput(RelNode input) { + if (input instanceof RelSubset) { + // go with known best input + input = ((RelSubset) input).getBest(); + } + return (BeamRelNode) input; + } + + public static String explain(final RelNode rel) { + return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { + String explain = ""; + try { + explain = RelOptUtil.toString(rel); + } catch (StackOverflowError e) { + LOG.error("StackOverflowError occurred while extracting plan. " + + "Please report it to the dev@ mailing list."); + LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); + LOG.error("Forcing plan to empty string and continue... " + + "SQL Runner may not working properly after."); + } + return explain; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java new file mode 100644 index 0000000..c661585 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Union; + +/** + * {@link BeamRelNode} to replace a {@link Union}. + * + * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL + * perspective, two cases are supported: + * + * <p>1) Do not use {@code grouped window function}: + * + * <pre>{@code + * select * from person UNION select * from person + * }</pre> + * + * <p>2) Use the same {@code grouped window function}, with the same param: + * <pre>{@code + * select id, count(*) from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * UNION + * select * from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * }</pre> + * + * <p>Inputs with different group functions are NOT supported: + * <pre>{@code + * select id, count(*) from person + * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) + * UNION + * select * from person + * group by id, TUMBLE(order_time, INTERVAL '2' HOUR) + * }</pre> + */ +public class BeamUnionRel extends Union implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamUnionRel(RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + this.delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.UNION, + inputs, all); + } + + public BeamUnionRel(RelInput input) { + super(input); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java new file mode 100644 index 0000000..43b74c3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.schema.BeamTableUtils; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; + +/** + * {@code BeamRelNode} to replace a {@code Values} node. + * + * <p>{@code BeamValuesRel} will be used in the following SQLs: + * <ul> + * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li> + * <li>{@code select 1, '1', LOCALTIME}</li> + * </ul> + */ +public class BeamValuesRel extends Values implements BeamRelNode { + + public BeamValuesRel( + RelOptCluster cluster, + RelDataType rowType, + ImmutableList<ImmutableList<RexLiteral>> tuples, + RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + List<BeamSqlRow> rows = new ArrayList<>(tuples.size()); + String stageName = BeamSqlRelUtils.getStageName(this); + if (tuples.isEmpty()) { + throw new IllegalStateException("Values with empty tuples!"); + } + + BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); + for (ImmutableList<RexLiteral> tuple : tuples) { + BeamSqlRow row = new BeamSqlRow(beamSQLRowType); + for (int i = 0; i < tuple.size(); i++) { + BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); + } + rows.add(row); + } + + return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) + .setCoder(new BeamSqlRowCoder(beamSQLRowType)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java new file mode 100644 index 0000000..77d6204 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. + * + */ +package org.apache.beam.dsls.sql.rel; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java new file mode 100644 index 0000000..6e843d4 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import com.google.common.collect.ImmutableList; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.dsls.sql.rel.BeamAggregationRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.joda.time.Duration; + +/** + * Rule to detect the window/trigger settings. + * + */ +public class BeamAggregationRule extends RelOptRule { + public static final BeamAggregationRule INSTANCE = + new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); + + public BeamAggregationRule( + Class<? extends Aggregate> aggregateClass, + Class<? extends Project> projectClass, + RelBuilderFactory relBuilderFactory) { + super( + operand(aggregateClass, + operand(projectClass, any())), + relBuilderFactory, null); + } + + public BeamAggregationRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate aggregate = call.rel(0); + final Project project = call.rel(1); + updateWindowTrigger(call, aggregate, project); + } + + private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, + Project project) { + ImmutableBitSet groupByFields = aggregate.getGroupSet(); + List<RexNode> projectMapping = project.getProjects(); + + WindowFn windowFn = new GlobalWindows(); + Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow()); + int windowFieldIdx = -1; + Duration allowedLatence = Duration.ZERO; + + for (int groupField : groupByFields.asList()) { + RexNode projNode = projectMapping.get(groupField); + if (projNode instanceof RexCall) { + SqlOperator op = ((RexCall) projNode).op; + ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; + String functionName = op.getName(); + switch (functionName) { + case "TUMBLE": + windowFieldIdx = groupField; + windowFn = FixedWindows + .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + case "HOP": + windowFieldIdx = groupField; + windowFn = SlidingWindows + .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))) + .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2)))); + if (parameters.size() == 4) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + case "SESSION": + windowFieldIdx = groupField; + windowFn = Sessions + .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + default: + break; + } + } + } + + BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(), + aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(aggregate.getInput(), + aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList(), + windowFn, + triggerFn, + windowFieldIdx, + allowedLatence); + call.transformTo(newAggregator); + } + + private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { + return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime + .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis())))); + } + + private long getWindowParameterAsMillis(RexNode parameterNode) { + if (parameterNode instanceof RexLiteral) { + return RexLiteral.intValue(parameterNode); + } else { + throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java new file mode 100644 index 0000000..414b666 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; + +/** + * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. + * + */ +public class BeamFilterRule extends ConverterRule { + public static final BeamFilterRule INSTANCE = new BeamFilterRule(); + + private BeamFilterRule() { + super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + return new BeamFilterRel(filter.getCluster(), + filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + filter.getCondition()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java new file mode 100644 index 0000000..4cc4ef5 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamIOSinkRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; + +/** + * A {@code ConverterRule} to replace {@link TableModify} with + * {@link BeamIOSinkRel}. + * + */ +public class BeamIOSinkRule extends ConverterRule { + public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); + + private BeamIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new BeamIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java new file mode 100644 index 0000000..85a69ff --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamIOSourceRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; + +/** + * A {@code ConverterRule} to replace {@link TableScan} with + * {@link BeamIOSourceRel}. + * + */ +public class BeamIOSourceRule extends ConverterRule { + public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); + + private BeamIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new BeamIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java new file mode 100644 index 0000000..70716c5 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamIntersectRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; + +/** + * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. + */ +public class BeamIntersectRule extends ConverterRule { + public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); + private BeamIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); + } + + @Override public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List<RelNode> inputs = intersect.getInputs(); + return new BeamIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + intersect.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java new file mode 100644 index 0000000..78253fe --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamJoinRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; + +/** + * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. + */ +public class BeamJoinRule extends ConverterRule { + public static final BeamJoinRule INSTANCE = new BeamJoinRule(); + private BeamJoinRule() { + super(LogicalJoin.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamJoinRule"); + } + + @Override public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new BeamJoinRel( + join.getCluster(), + join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java new file mode 100644 index 0000000..ca93c71 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamMinusRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; + +/** + * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}. + */ +public class BeamMinusRule extends ConverterRule { + public static final BeamMinusRule INSTANCE = new BeamMinusRule(); + private BeamMinusRule() { + super(LogicalMinus.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamMinusRule"); + } + + @Override public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List<RelNode> inputs = minus.getInputs(); + return new BeamMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + minus.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java new file mode 100644 index 0000000..6dc3b57 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; + +/** + * A {@code ConverterRule} to replace {@link Project} with + * {@link BeamProjectRel}. + * + */ +public class BeamProjectRule extends ConverterRule { + public static final BeamProjectRule INSTANCE = new BeamProjectRule(); + + private BeamProjectRule() { + super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new BeamProjectRel(project.getCluster(), + project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java new file mode 100644 index 0000000..d802e9d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; + +import org.apache.beam.dsls.sql.rel.BeamSortRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; + +/** + * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}. + */ +public class BeamSortRule extends ConverterRule { + public static final BeamSortRule INSTANCE = new BeamSortRule(); + private BeamSortRule() { + super(LogicalSort.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamSortRule"); + } + + @Override public RelNode convert(RelNode rel) { + Sort sort = (Sort) rel; + final RelNode input = sort.getInput(); + return new BeamSortRel( + sort.getCluster(), + sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + sort.getCollation(), + sort.offset, + sort.fetch + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java new file mode 100644 index 0000000..b8430b9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamUnionRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; + +/** + * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with + * {@link BeamUnionRule}. + */ +public class BeamUnionRule extends ConverterRule { + public static final BeamUnionRule INSTANCE = new BeamUnionRule(); + private BeamUnionRule() { + super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamUnionRule"); + } + + @Override public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new BeamUnionRel( + union.getCluster(), + union.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(union.getInputs(), BeamLogicalConvention.INSTANCE), + union.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java new file mode 100644 index 0000000..4ea9e60 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamValuesRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalValues; + +/** + * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}. + */ +public class BeamValuesRule extends ConverterRule { + public static final BeamValuesRule INSTANCE = new BeamValuesRule(); + private BeamValuesRule() { + super(LogicalValues.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamValuesRule"); + } + + @Override public RelNode convert(RelNode rel) { + Values values = (Values) rel; + return new BeamValuesRel( + values.getCluster(), + values.getRowType(), + values.getTuples(), + values.getTraitSet().replace(BeamLogicalConvention.INSTANCE) + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java new file mode 100644 index 0000000..5d32647 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.calcite.plan.RelOptRule} to generate + * {@link org.apache.beam.dsls.sql.rel.BeamRelNode}. + */ +package org.apache.beam.dsls.sql.rule; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java new file mode 100644 index 0000000..dfa2785 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; + +/** + * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. + */ +public abstract class BaseBeamTable implements BeamSqlTable, Serializable { + protected BeamSqlRowType beamSqlRowType; + public BaseBeamTable(BeamSqlRowType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; + } + + @Override public BeamSqlRowType getRowType() { + return beamSqlRowType; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java new file mode 100644 index 0000000..502e8c1 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; + +/** + * Type as a source IO, determined whether it's a STREAMING process, or batch + * process. + */ +public enum BeamIOType implements Serializable { + BOUNDED, UNBOUNDED; +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java new file mode 100644 index 0000000..5b63780 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PDone; + +/** + * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table, + * then a downstream query can query directly. + */ +public class BeamPCollectionTable extends BaseBeamTable { + private BeamIOType ioType; + private transient PCollection<BeamSqlRow> upstream; + + protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); + } + + public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, + BeamSqlRowType beamSqlRowType){ + this(beamSqlRowType); + ioType = upstream.isBounded().equals(IsBounded.BOUNDED) + ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; + this.upstream = upstream; + } + + @Override + public BeamIOType getSourceType() { + return ioType; + } + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return upstream; + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java new file mode 100644 index 0000000..d789446 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Instant; + +/** + * Represent a generic ROW record in Beam SQL. + * + */ +public class BeamSqlRow implements Serializable { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + + private List<Integer> nullFields = new ArrayList<>(); + private List<Object> dataValues; + private BeamSqlRowType dataType; + + private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + + public BeamSqlRow(BeamSqlRowType dataType) { + this.dataType = dataType; + this.dataValues = new ArrayList<>(); + for (int idx = 0; idx < dataType.size(); ++idx) { + dataValues.add(null); + nullFields.add(idx); + } + } + + public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) { + this(dataType); + for (int idx = 0; idx < dataValues.size(); ++idx) { + addField(idx, dataValues.get(idx)); + } + } + + public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){ + windowStart = upstreamRecord.windowStart; + windowEnd = upstreamRecord.windowEnd; + + if (window instanceof IntervalWindow) { + IntervalWindow iWindow = (IntervalWindow) window; + windowStart = iWindow.start(); + windowEnd = iWindow.end(); + } + } + + public void addField(String fieldName, Object fieldValue) { + addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + } + + public void addField(int index, Object fieldValue) { + if (fieldValue == null) { + return; + } else { + if (nullFields.contains(index)) { + nullFields.remove(nullFields.indexOf(index)); + } + } + + validateValueType(index, fieldValue); + dataValues.set(index, fieldValue); + } + + private void validateValueType(int index, Object fieldValue) { + SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType)); + if (javaClazz == null) { + throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } + } + + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + } + + public byte getByte(String fieldName) { + return (Byte) getFieldValue(fieldName); + } + + public short getShort(String fieldName) { + return (Short) getFieldValue(fieldName); + } + + public int getInteger(String fieldName) { + return (Integer) getFieldValue(fieldName); + } + + public float getFloat(String fieldName) { + return (Float) getFieldValue(fieldName); + } + + public double getDouble(String fieldName) { + return (Double) getFieldValue(fieldName); + } + + public long getLong(String fieldName) { + return (Long) getFieldValue(fieldName); + } + + public String getString(String fieldName) { + return (String) getFieldValue(fieldName); + } + + public Date getDate(String fieldName) { + return (Date) getFieldValue(fieldName); + } + + public GregorianCalendar getGregorianCalendar(String fieldName) { + return (GregorianCalendar) getFieldValue(fieldName); + } + + public BigDecimal getBigDecimal(String fieldName) { + return (BigDecimal) getFieldValue(fieldName); + } + + public boolean getBoolean(String fieldName) { + return (boolean) getFieldValue(fieldName); + } + + public Object getFieldValue(int fieldIdx) { + if (nullFields.contains(fieldIdx)) { + return null; + } + + return dataValues.get(fieldIdx); + } + + public byte getByte(int idx) { + return (Byte) getFieldValue(idx); + } + + public short getShort(int idx) { + return (Short) getFieldValue(idx); + } + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public float getFloat(int idx) { + return (Float) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + + public boolean getBoolean(int idx) { + return (boolean) getFieldValue(idx); + } + + public int size() { + return dataValues.size(); + } + + public List<Object> getDataValues() { + return dataValues; + } + + public void setDataValues(List<Object> dataValues) { + this.dataValues = dataValues; + } + + public BeamSqlRowType getDataType() { + return dataType; + } + + public void setDataType(BeamSqlRowType dataType) { + this.dataType = dataType; + } + + public void setNullFields(List<Integer> nullFields) { + this.nullFields = nullFields; + } + + public List<Integer> getNullFields() { + return nullFields; + } + + /** + * is the specified field NULL? + */ + public boolean isNull(int idx) { + return nullFields.contains(idx); + } + + public Instant getWindowStart() { + return windowStart; + } + + public Instant getWindowEnd() { + return windowEnd; + } + + public void setWindowStart(Instant windowStart) { + this.windowStart = windowStart; + } + + public void setWindowEnd(Instant windowEnd) { + this.windowEnd = windowEnd; + } + + @Override + public String toString() { + return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" + + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; + } + + /** + * Return data fields as key=value. + */ + public String valueInString() { + StringBuilder sb = new StringBuilder(); + for (int idx = 0; idx < size(); ++idx) { + sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); + } + return sb.substring(1); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BeamSqlRow other = (BeamSqlRow) obj; + return toString().equals(other.toString()); + } + + @Override public int hashCode() { + return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java new file mode 100644 index 0000000..f14864a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** + * A {@link Coder} encodes {@link BeamSqlRow}. + */ +public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { + private BeamSqlRowType tableSchema; + + private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); + + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final DoubleCoder doubleCoder = DoubleCoder.of(); + private static final InstantCoder instantCoder = InstantCoder.of(); + private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); + private static final ByteCoder byteCoder = ByteCoder.of(); + + public BeamSqlRowCoder(BeamSqlRowType tableSchema) { + this.tableSchema = tableSchema; + } + + @Override + public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { + listCoder.encode(value.getNullFields(), outStream); + for (int idx = 0; idx < value.size(); ++idx) { + if (value.getNullFields().contains(idx)) { + continue; + } + + switch (CalciteUtils.getFieldType(value.getDataType(), idx)) { + case INTEGER: + intCoder.encode(value.getInteger(idx), outStream); + break; + case SMALLINT: + intCoder.encode((int) value.getShort(idx), outStream); + break; + case TINYINT: + byteCoder.encode(value.getByte(idx), outStream); + break; + case DOUBLE: + doubleCoder.encode(value.getDouble(idx), outStream); + break; + case FLOAT: + doubleCoder.encode((double) value.getFloat(idx), outStream); + break; + case DECIMAL: + bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); + break; + case BIGINT: + longCoder.encode(value.getLong(idx), outStream); + break; + case VARCHAR: + case CHAR: + stringCoder.encode(value.getString(idx), outStream); + break; + case TIME: + longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); + break; + case DATE: + case TIMESTAMP: + longCoder.encode(value.getDate(idx).getTime(), outStream); + break; + case BOOLEAN: + byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream); + break; + + default: + throw new UnsupportedOperationException( + "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!"); + } + } + + instantCoder.encode(value.getWindowStart(), outStream); + instantCoder.encode(value.getWindowEnd(), outStream); + } + + @Override + public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { + List<Integer> nullFields = listCoder.decode(inStream); + + BeamSqlRow record = new BeamSqlRow(tableSchema); + record.setNullFields(nullFields); + for (int idx = 0; idx < tableSchema.size(); ++idx) { + if (nullFields.contains(idx)) { + continue; + } + + switch (CalciteUtils.getFieldType(tableSchema, idx)) { + case INTEGER: + record.addField(idx, intCoder.decode(inStream)); + break; + case SMALLINT: + record.addField(idx, intCoder.decode(inStream).shortValue()); + break; + case TINYINT: + record.addField(idx, byteCoder.decode(inStream)); + break; + case DOUBLE: + record.addField(idx, doubleCoder.decode(inStream)); + break; + case FLOAT: + record.addField(idx, doubleCoder.decode(inStream).floatValue()); + break; + case BIGINT: + record.addField(idx, longCoder.decode(inStream)); + break; + case DECIMAL: + record.addField(idx, bigDecimalCoder.decode(inStream)); + break; + case VARCHAR: + case CHAR: + record.addField(idx, stringCoder.decode(inStream)); + break; + case TIME: + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date(longCoder.decode(inStream))); + record.addField(idx, calendar); + break; + case DATE: + case TIMESTAMP: + record.addField(idx, new Date(longCoder.decode(inStream))); + break; + case BOOLEAN: + record.addField(idx, byteCoder.decode(inStream) == 1); + break; + + default: + throw new UnsupportedOperationException("Data type: " + + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx)) + + " not supported yet!"); + } + } + + record.setWindowStart(instantCoder.decode(inStream)); + record.setWindowEnd(instantCoder.decode(inStream)); + + return record; + } + + public BeamSqlRowType getTableSchema() { + return tableSchema; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java new file mode 100644 index 0000000..1129bdd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; + +/** + * Field type information in {@link BeamSqlRow}. + * + */ +@AutoValue +public abstract class BeamSqlRowType implements Serializable { + public abstract List<String> getFieldsName(); + public abstract List<Integer> getFieldsType(); + + public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) { + return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes); + } + + public int size() { + return getFieldsName().size(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java new file mode 100644 index 0000000..d419473 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * This interface defines a Beam Sql Table. + */ +public interface BeamSqlTable { + /** + * In Beam SQL, there's no difference between a batch query and a streaming + * query. {@link BeamIOType} is used to validate the sources. + */ + BeamIOType getSourceType(); + + /** + * create a {@code PCollection<BeamSqlRow>} from source. + * + */ + PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline); + + /** + * create a {@code IO.write()} instance to write to target. + * + */ + PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter(); + + /** + * Get the schema info of the table. + */ + BeamSqlRowType getRowType(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java new file mode 100644 index 0000000..9582ffa --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import java.lang.reflect.ParameterizedType; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * abstract class of aggregation functions in Beam SQL. + * + * <p>There're several constrains for a UDAF:<br> + * 1. A constructor with an empty argument list is required;<br> + * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double + * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT + * /TIMESTAMP/DECIMAL;<br> + * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br> + */ +public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable { + public BeamSqlUdaf(){} + + /** + * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}. + */ + public abstract AccumT init(); + + /** + * add an input value, equals to {@link CombineFn#addInput(Object, Object)}. + */ + public abstract AccumT add(AccumT accumulator, InputT input); + + /** + * merge aggregation objects from parallel tasks, equals to + * {@link CombineFn#mergeAccumulators(Iterable)}. + */ + public abstract AccumT merge(Iterable<AccumT> accumulators); + + /** + * extract output value from aggregation object, equals to + * {@link CombineFn#extractOutput(Object)}. + */ + public abstract OutputT result(AccumT accumulator); + + /** + * get the coder for AccumT which stores the intermediate result. + * By default it's fetched from {@link CoderRegistry}. + */ + public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) + throws CannotProvideCoderException { + return registry.getCoder( + (Class<AccumT>) ((ParameterizedType) getClass() + .getGenericSuperclass()).getActualTypeArguments()[1]); + } +}
