http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java deleted file mode 100644 index ba344df..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.io.Serializable; -import java.lang.reflect.Type; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Top; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelCollation; -import org.apache.calcite.rel.RelCollationImpl; -import org.apache.calcite.rel.RelFieldCollation; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamRelNode} to replace a {@code Sort} node. - * - * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement - * the {@code Sort} algebra. The following types of ORDER BY are supported: - - * <pre>{@code - * select * from t order by id desc limit 10; - * select * from t order by id desc limit 10, 5; - * }</pre> - * - * <p>but Order BY without a limit is NOT supported: - * - * <pre>{@code - * select * from t order by id desc - * }</pre> - * - * <h3>Constraints</h3> - * <ul> - * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT` - * must fit into the memory of a single machine.</li> - * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`, - * it does not make much sense to use `ORDER BY` with `WINDOW`. - * </li> - * </ul> - */ -public class BeamSortRel extends Sort implements BeamRelNode { - private List<Integer> fieldIndices = new ArrayList<>(); - private List<Boolean> orientation = new ArrayList<>(); - private List<Boolean> nullsFirst = new ArrayList<>(); - - private int startIndex = 0; - private int count; - - public BeamSortRel( - RelOptCluster cluster, - RelTraitSet traits, - RelNode child, - RelCollation collation, - RexNode offset, - RexNode fetch) { - super(cluster, traits, child, collation, offset, fetch); - - List<RexNode> fieldExps = getChildExps(); - RelCollationImpl collationImpl = (RelCollationImpl) collation; - List<RelFieldCollation> collations = collationImpl.getFieldCollations(); - for (int i = 0; i < fieldExps.size(); i++) { - RexNode fieldExp = fieldExps.get(i); - RexInputRef inputRef = (RexInputRef) fieldExp; - fieldIndices.add(inputRef.getIndex()); - orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); - - RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; - if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { - rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); - } - nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); - } - - if (fetch == null) { - throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!"); - } - - RexLiteral fetchLiteral = (RexLiteral) fetch; - count = ((BigDecimal) fetchLiteral.getValue()).intValue(); - - if (offset != null) { - RexLiteral offsetLiteral = (RexLiteral) offset; - startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); - } - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input) - .buildBeamPipeline(inputPCollections, sqlEnv); - Type windowType = upstream.getWindowingStrategy().getWindowFn() - .getWindowTypeDescriptor().getType(); - if (!windowType.equals(GlobalWindow.class)) { - throw new UnsupportedOperationException( - "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType); - } - - BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation, - nullsFirst); - // first find the top (offset + count) - PCollection<List<BeamSqlRow>> rawStream = - upstream.apply("extractTopOffsetAndFetch", - Top.of(startIndex + count, comparator).withoutDefaults()) - .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); - - // strip the `leading offset` - if (startIndex > 0) { - rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( - new SubListFn<BeamSqlRow>(startIndex, startIndex + count))) - .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); - } - - PCollection<BeamSqlRow> orderedStream = rawStream.apply( - "flatten", Flatten.<BeamSqlRow>iterables()); - orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return orderedStream; - } - - private static class SubListFn<T> extends DoFn<List<T>, List<T>> { - private int startIndex; - private int endIndex; - - public SubListFn(int startIndex, int endIndex) { - this.startIndex = startIndex; - this.endIndex = endIndex; - } - - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().subList(startIndex, endIndex)); - } - } - - @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, - RexNode offset, RexNode fetch) { - return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); - } - - private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable { - private List<Integer> fieldsIndices; - private List<Boolean> orientation; - private List<Boolean> nullsFirst; - - public BeamSqlRowComparator(List<Integer> fieldsIndices, - List<Boolean> orientation, - List<Boolean> nullsFirst) { - this.fieldsIndices = fieldsIndices; - this.orientation = orientation; - this.nullsFirst = nullsFirst; - } - - @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) { - for (int i = 0; i < fieldsIndices.size(); i++) { - int fieldIndex = fieldsIndices.get(i); - int fieldRet = 0; - SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); - // whether NULL should be ordered first or last(compared to non-null values) depends on - // what user specified in SQL(NULLS FIRST/NULLS LAST) - if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { - continue; - } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) { - fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1); - } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { - fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1); - } else { - switch (fieldType) { - case TINYINT: - fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex)); - break; - case SMALLINT: - fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex)); - break; - case INTEGER: - fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex)); - break; - case BIGINT: - fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex)); - break; - case FLOAT: - fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex)); - break; - case DOUBLE: - fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex)); - break; - case VARCHAR: - fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex)); - break; - case DATE: - fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex)); - break; - default: - throw new UnsupportedOperationException( - "Data type: " + fieldType + " not supported yet!"); - } - } - - fieldRet *= (orientation.get(i) ? -1 : 1); - if (fieldRet != 0) { - return fieldRet; - } - } - return 0; - } - } - - public static <T extends Number & Comparable> int numberCompare(T a, T b) { - return a.compareTo(b); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java deleted file mode 100644 index 9f1f703..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.volcano.RelSubset; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.sql.SqlExplainLevel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for {@code BeamRelNode}. - */ -class BeamSqlRelUtils { - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class); - - private static final AtomicInteger sequence = new AtomicInteger(0); - private static final AtomicInteger classSequence = new AtomicInteger(0); - - public static String getStageName(BeamRelNode relNode) { - return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" - + sequence.getAndIncrement(); - } - - public static String getClassName(BeamRelNode relNode) { - return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() - + "_" + classSequence.getAndIncrement(); - } - - public static BeamRelNode getBeamRelInput(RelNode input) { - if (input instanceof RelSubset) { - // go with known best input - input = ((RelSubset) input).getBest(); - } - return (BeamRelNode) input; - } - - public static String explain(final RelNode rel) { - return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); - } - - public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { - String explain = ""; - try { - explain = RelOptUtil.toString(rel); - } catch (StackOverflowError e) { - LOG.error("StackOverflowError occurred while extracting plan. " - + "Please report it to the dev@ mailing list."); - LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); - LOG.error("Forcing plan to empty string and continue... " - + "SQL Runner may not working properly after."); - } - return explain; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java deleted file mode 100644 index c661585..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelInput; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.core.Union; - -/** - * {@link BeamRelNode} to replace a {@link Union}. - * - * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL - * perspective, two cases are supported: - * - * <p>1) Do not use {@code grouped window function}: - * - * <pre>{@code - * select * from person UNION select * from person - * }</pre> - * - * <p>2) Use the same {@code grouped window function}, with the same param: - * <pre>{@code - * select id, count(*) from person - * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) - * UNION - * select * from person - * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) - * }</pre> - * - * <p>Inputs with different group functions are NOT supported: - * <pre>{@code - * select id, count(*) from person - * group by id, TUMBLE(order_time, INTERVAL '1' HOUR) - * UNION - * select * from person - * group by id, TUMBLE(order_time, INTERVAL '2' HOUR) - * }</pre> - */ -public class BeamUnionRel extends Union implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamUnionRel(RelOptCluster cluster, - RelTraitSet traits, - List<RelNode> inputs, - boolean all) { - super(cluster, traits, inputs, all); - this.delegate = new BeamSetOperatorRelBase(this, - BeamSetOperatorRelBase.OpType.UNION, - inputs, all); - } - - public BeamUnionRel(RelInput input) { - super(input); - } - - @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { - return new BeamUnionRel(getCluster(), traitSet, inputs, all); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - return delegate.buildBeamPipeline(inputPCollections, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java deleted file mode 100644 index 43b74c3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.schema.BeamTableUtils; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.core.Values; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexLiteral; - -/** - * {@code BeamRelNode} to replace a {@code Values} node. - * - * <p>{@code BeamValuesRel} will be used in the following SQLs: - * <ul> - * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li> - * <li>{@code select 1, '1', LOCALTIME}</li> - * </ul> - */ -public class BeamValuesRel extends Values implements BeamRelNode { - - public BeamValuesRel( - RelOptCluster cluster, - RelDataType rowType, - ImmutableList<ImmutableList<RexLiteral>> tuples, - RelTraitSet traits) { - super(cluster, rowType, tuples, traits); - - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - List<BeamSqlRow> rows = new ArrayList<>(tuples.size()); - String stageName = BeamSqlRelUtils.getStageName(this); - if (tuples.isEmpty()) { - throw new IllegalStateException("Values with empty tuples!"); - } - - BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); - for (ImmutableList<RexLiteral> tuple : tuples) { - BeamSqlRow row = new BeamSqlRow(beamSQLRowType); - for (int i = 0; i < tuple.size(); i++) { - BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); - } - rows.add(row); - } - - return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) - .setCoder(new BeamSqlRowCoder(beamSQLRowType)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java deleted file mode 100644 index 77d6204..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. - * - */ -package org.apache.beam.dsls.sql.rel; http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java deleted file mode 100644 index 6e843d4..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rule; - -import com.google.common.collect.ImmutableList; -import java.util.GregorianCalendar; -import java.util.List; -import org.apache.beam.dsls.sql.rel.BeamAggregationRel; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptRuleOperand; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.tools.RelBuilderFactory; -import org.apache.calcite.util.ImmutableBitSet; -import org.joda.time.Duration; - -/** - * Rule to detect the window/trigger settings. - * - */ -public class BeamAggregationRule extends RelOptRule { - public static final BeamAggregationRule INSTANCE = - new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); - - public BeamAggregationRule( - Class<? extends Aggregate> aggregateClass, - Class<? extends Project> projectClass, - RelBuilderFactory relBuilderFactory) { - super( - operand(aggregateClass, - operand(projectClass, any())), - relBuilderFactory, null); - } - - public BeamAggregationRule(RelOptRuleOperand operand, String description) { - super(operand, description); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final Project project = call.rel(1); - updateWindowTrigger(call, aggregate, project); - } - - private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, - Project project) { - ImmutableBitSet groupByFields = aggregate.getGroupSet(); - List<RexNode> projectMapping = project.getProjects(); - - WindowFn windowFn = new GlobalWindows(); - Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow()); - int windowFieldIdx = -1; - Duration allowedLatence = Duration.ZERO; - - for (int groupField : groupByFields.asList()) { - RexNode projNode = projectMapping.get(groupField); - if (projNode instanceof RexCall) { - SqlOperator op = ((RexCall) projNode).op; - ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; - String functionName = op.getName(); - switch (functionName) { - case "TUMBLE": - windowFieldIdx = groupField; - windowFn = FixedWindows - .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); - if (parameters.size() == 3) { - GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) - .getValue(); - triggerFn = createTriggerWithDelay(delayTime); - allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); - } - break; - case "HOP": - windowFieldIdx = groupField; - windowFn = SlidingWindows - .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))) - .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2)))); - if (parameters.size() == 4) { - GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) - .getValue(); - triggerFn = createTriggerWithDelay(delayTime); - allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); - } - break; - case "SESSION": - windowFieldIdx = groupField; - windowFn = Sessions - .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); - if (parameters.size() == 3) { - GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) - .getValue(); - triggerFn = createTriggerWithDelay(delayTime); - allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); - } - break; - default: - break; - } - } - } - - BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(), - aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(aggregate.getInput(), - aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - aggregate.indicator, - aggregate.getGroupSet(), - aggregate.getGroupSets(), - aggregate.getAggCallList(), - windowFn, - triggerFn, - windowFieldIdx, - allowedLatence); - call.transformTo(newAggregator); - } - - private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { - return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime - .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis())))); - } - - private long getWindowParameterAsMillis(RexNode parameterNode) { - if (parameterNode instanceof RexLiteral) { - return RexLiteral.intValue(parameterNode); - } else { - throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java deleted file mode 100644 index 414b666..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamFilterRel; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.logical.LogicalFilter; - -/** - * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. - * - */ -public class BeamFilterRule extends ConverterRule { - public static final BeamFilterRule INSTANCE = new BeamFilterRule(); - - private BeamFilterRule() { - super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final Filter filter = (Filter) rel; - final RelNode input = filter.getInput(); - - return new BeamFilterRel(filter.getCluster(), - filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - filter.getCondition()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java deleted file mode 100644 index 4cc4ef5..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rule; - -import java.util.List; - -import org.apache.beam.dsls.sql.rel.BeamIOSinkRel; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.TableModify; -import org.apache.calcite.rel.logical.LogicalTableModify; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.Table; - -/** - * A {@code ConverterRule} to replace {@link TableModify} with - * {@link BeamIOSinkRel}. - * - */ -public class BeamIOSinkRule extends ConverterRule { - public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); - - private BeamIOSinkRule() { - super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamIOSinkRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final TableModify tableModify = (TableModify) rel; - final RelNode input = tableModify.getInput(); - - final RelOptCluster cluster = tableModify.getCluster(); - final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); - final RelOptTable relOptTable = tableModify.getTable(); - final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); - final RelNode convertedInput = convert(input, - input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); - final TableModify.Operation operation = tableModify.getOperation(); - final List<String> updateColumnList = tableModify.getUpdateColumnList(); - final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); - final boolean flattened = tableModify.isFlattened(); - - final Table table = tableModify.getTable().unwrap(Table.class); - - switch (table.getJdbcTableType()) { - case TABLE: - case STREAM: - if (operation != TableModify.Operation.INSERT) { - throw new UnsupportedOperationException( - String.format("Streams doesn't support %s modify operation", operation)); - } - return new BeamIOSinkRel(cluster, traitSet, - relOptTable, catalogReader, convertedInput, operation, updateColumnList, - sourceExpressionList, flattened); - default: - throw new IllegalArgumentException( - String.format("Unsupported table type: %s", table.getJdbcTableType())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java deleted file mode 100644 index 85a69ff..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamIOSourceRel; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.logical.LogicalTableScan; - -/** - * A {@code ConverterRule} to replace {@link TableScan} with - * {@link BeamIOSourceRel}. - * - */ -public class BeamIOSourceRule extends ConverterRule { - public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); - - private BeamIOSourceRule() { - super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamIOSourceRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final TableScan scan = (TableScan) rel; - - return new BeamIOSourceRel(scan.getCluster(), - scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java deleted file mode 100644 index 70716c5..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rule; - -import java.util.List; - -import org.apache.beam.dsls.sql.rel.BeamIntersectRel; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Intersect; -import org.apache.calcite.rel.logical.LogicalIntersect; - -/** - * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. - */ -public class BeamIntersectRule extends ConverterRule { - public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); - private BeamIntersectRule() { - super(LogicalIntersect.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); - } - - @Override public RelNode convert(RelNode rel) { - Intersect intersect = (Intersect) rel; - final List<RelNode> inputs = intersect.getInputs(); - return new BeamIntersectRel( - intersect.getCluster(), - intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convertList(inputs, BeamLogicalConvention.INSTANCE), - intersect.all - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java deleted file mode 100644 index 78253fe..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamJoinRel; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.logical.LogicalJoin; - -/** - * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. - */ -public class BeamJoinRule extends ConverterRule { - public static final BeamJoinRule INSTANCE = new BeamJoinRule(); - private BeamJoinRule() { - super(LogicalJoin.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamJoinRule"); - } - - @Override public RelNode convert(RelNode rel) { - Join join = (Join) rel; - return new BeamJoinRel( - join.getCluster(), - join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(join.getLeft(), - join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - convert(join.getRight(), - join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - join.getCondition(), - join.getVariablesSet(), - join.getJoinType() - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java deleted file mode 100644 index ca93c71..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rule; - -import java.util.List; - -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.beam.dsls.sql.rel.BeamMinusRel; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Minus; -import org.apache.calcite.rel.logical.LogicalMinus; - -/** - * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}. - */ -public class BeamMinusRule extends ConverterRule { - public static final BeamMinusRule INSTANCE = new BeamMinusRule(); - private BeamMinusRule() { - super(LogicalMinus.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamMinusRule"); - } - - @Override public RelNode convert(RelNode rel) { - Minus minus = (Minus) rel; - final List<RelNode> inputs = minus.getInputs(); - return new BeamMinusRel( - minus.getCluster(), - minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convertList(inputs, BeamLogicalConvention.INSTANCE), - minus.all - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java deleted file mode 100644 index 6dc3b57..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.beam.dsls.sql.rel.BeamProjectRel; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.logical.LogicalProject; - -/** - * A {@code ConverterRule} to replace {@link Project} with - * {@link BeamProjectRel}. - * - */ -public class BeamProjectRule extends ConverterRule { - public static final BeamProjectRule INSTANCE = new BeamProjectRule(); - - private BeamProjectRule() { - super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); - } - - @Override - public RelNode convert(RelNode rel) { - final Project project = (Project) rel; - final RelNode input = project.getInput(); - - return new BeamProjectRel(project.getCluster(), - project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - project.getProjects(), project.getRowType()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java deleted file mode 100644 index d802e9d..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; - -import org.apache.beam.dsls.sql.rel.BeamSortRel; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.logical.LogicalSort; - -/** - * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}. - */ -public class BeamSortRule extends ConverterRule { - public static final BeamSortRule INSTANCE = new BeamSortRule(); - private BeamSortRule() { - super(LogicalSort.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamSortRule"); - } - - @Override public RelNode convert(RelNode rel) { - Sort sort = (Sort) rel; - final RelNode input = sort.getInput(); - return new BeamSortRel( - sort.getCluster(), - sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), - sort.getCollation(), - sort.offset, - sort.fetch - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java deleted file mode 100644 index b8430b9..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.beam.dsls.sql.rel.BeamUnionRel; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Union; -import org.apache.calcite.rel.logical.LogicalUnion; - -/** - * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with - * {@link BeamUnionRule}. - */ -public class BeamUnionRule extends ConverterRule { - public static final BeamUnionRule INSTANCE = new BeamUnionRule(); - private BeamUnionRule() { - super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE, - "BeamUnionRule"); - } - - @Override public RelNode convert(RelNode rel) { - Union union = (Union) rel; - - return new BeamUnionRel( - union.getCluster(), - union.getTraitSet().replace(BeamLogicalConvention.INSTANCE), - convertList(union.getInputs(), BeamLogicalConvention.INSTANCE), - union.all - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java deleted file mode 100644 index 4ea9e60..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rule; - -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.beam.dsls.sql.rel.BeamValuesRel; -import org.apache.calcite.plan.Convention; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.convert.ConverterRule; -import org.apache.calcite.rel.core.Values; -import org.apache.calcite.rel.logical.LogicalValues; - -/** - * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}. - */ -public class BeamValuesRule extends ConverterRule { - public static final BeamValuesRule INSTANCE = new BeamValuesRule(); - private BeamValuesRule() { - super(LogicalValues.class, Convention.NONE, - BeamLogicalConvention.INSTANCE, "BeamValuesRule"); - } - - @Override public RelNode convert(RelNode rel) { - Values values = (Values) rel; - return new BeamValuesRel( - values.getCluster(), - values.getRowType(), - values.getTuples(), - values.getTraitSet().replace(BeamLogicalConvention.INSTANCE) - ); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java deleted file mode 100644 index 5d32647..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.apache.calcite.plan.RelOptRule} to generate - * {@link org.apache.beam.dsls.sql.rel.BeamRelNode}. - */ -package org.apache.beam.dsls.sql.rule; http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java deleted file mode 100644 index dfa2785..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; - -/** - * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. - */ -public abstract class BaseBeamTable implements BeamSqlTable, Serializable { - protected BeamSqlRowType beamSqlRowType; - public BaseBeamTable(BeamSqlRowType beamSqlRowType) { - this.beamSqlRowType = beamSqlRowType; - } - - @Override public BeamSqlRowType getRowType() { - return beamSqlRowType; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java deleted file mode 100644 index 502e8c1..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; - -/** - * Type as a source IO, determined whether it's a STREAMING process, or batch - * process. - */ -public enum BeamIOType implements Serializable { - BOUNDED, UNBOUNDED; -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java deleted file mode 100644 index 5b63780..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PDone; - -/** - * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table, - * then a downstream query can query directly. - */ -public class BeamPCollectionTable extends BaseBeamTable { - private BeamIOType ioType; - private transient PCollection<BeamSqlRow> upstream; - - protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) { - super(beamSqlRowType); - } - - public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, - BeamSqlRowType beamSqlRowType){ - this(beamSqlRowType); - ioType = upstream.isBounded().equals(IsBounded.BOUNDED) - ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; - this.upstream = upstream; - } - - @Override - public BeamIOType getSourceType() { - return ioType; - } - - @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - return upstream; - } - - @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java deleted file mode 100644 index d789446..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.calcite.sql.type.SqlTypeName; -import org.joda.time.Instant; - -/** - * Represent a generic ROW record in Beam SQL. - * - */ -public class BeamSqlRow implements Serializable { - private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); - static { - SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); - } - - private List<Integer> nullFields = new ArrayList<>(); - private List<Object> dataValues; - private BeamSqlRowType dataType; - - private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); - private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); - - public BeamSqlRow(BeamSqlRowType dataType) { - this.dataType = dataType; - this.dataValues = new ArrayList<>(); - for (int idx = 0; idx < dataType.size(); ++idx) { - dataValues.add(null); - nullFields.add(idx); - } - } - - public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) { - this(dataType); - for (int idx = 0; idx < dataValues.size(); ++idx) { - addField(idx, dataValues.get(idx)); - } - } - - public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){ - windowStart = upstreamRecord.windowStart; - windowEnd = upstreamRecord.windowEnd; - - if (window instanceof IntervalWindow) { - IntervalWindow iWindow = (IntervalWindow) window; - windowStart = iWindow.start(); - windowEnd = iWindow.end(); - } - } - - public void addField(String fieldName, Object fieldValue) { - addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); - } - - public void addField(int index, Object fieldValue) { - if (fieldValue == null) { - return; - } else { - if (nullFields.contains(index)) { - nullFields.remove(nullFields.indexOf(index)); - } - } - - validateValueType(index, fieldValue); - dataValues.set(index, fieldValue); - } - - private void validateValueType(int index, Object fieldValue) { - SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); - Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType)); - if (javaClazz == null) { - throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); - } - - if (!fieldValue.getClass().equals(javaClazz)) { - throw new IllegalArgumentException( - String.format("[%s](%s) doesn't match type [%s]", - fieldValue, fieldValue.getClass(), fieldType) - ); - } - } - - public Object getFieldValue(String fieldName) { - return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); - } - - public byte getByte(String fieldName) { - return (Byte) getFieldValue(fieldName); - } - - public short getShort(String fieldName) { - return (Short) getFieldValue(fieldName); - } - - public int getInteger(String fieldName) { - return (Integer) getFieldValue(fieldName); - } - - public float getFloat(String fieldName) { - return (Float) getFieldValue(fieldName); - } - - public double getDouble(String fieldName) { - return (Double) getFieldValue(fieldName); - } - - public long getLong(String fieldName) { - return (Long) getFieldValue(fieldName); - } - - public String getString(String fieldName) { - return (String) getFieldValue(fieldName); - } - - public Date getDate(String fieldName) { - return (Date) getFieldValue(fieldName); - } - - public GregorianCalendar getGregorianCalendar(String fieldName) { - return (GregorianCalendar) getFieldValue(fieldName); - } - - public BigDecimal getBigDecimal(String fieldName) { - return (BigDecimal) getFieldValue(fieldName); - } - - public boolean getBoolean(String fieldName) { - return (boolean) getFieldValue(fieldName); - } - - public Object getFieldValue(int fieldIdx) { - if (nullFields.contains(fieldIdx)) { - return null; - } - - return dataValues.get(fieldIdx); - } - - public byte getByte(int idx) { - return (Byte) getFieldValue(idx); - } - - public short getShort(int idx) { - return (Short) getFieldValue(idx); - } - - public int getInteger(int idx) { - return (Integer) getFieldValue(idx); - } - - public float getFloat(int idx) { - return (Float) getFieldValue(idx); - } - - public double getDouble(int idx) { - return (Double) getFieldValue(idx); - } - - public long getLong(int idx) { - return (Long) getFieldValue(idx); - } - - public String getString(int idx) { - return (String) getFieldValue(idx); - } - - public Date getDate(int idx) { - return (Date) getFieldValue(idx); - } - - public GregorianCalendar getGregorianCalendar(int idx) { - return (GregorianCalendar) getFieldValue(idx); - } - - public BigDecimal getBigDecimal(int idx) { - return (BigDecimal) getFieldValue(idx); - } - - public boolean getBoolean(int idx) { - return (boolean) getFieldValue(idx); - } - - public int size() { - return dataValues.size(); - } - - public List<Object> getDataValues() { - return dataValues; - } - - public void setDataValues(List<Object> dataValues) { - this.dataValues = dataValues; - } - - public BeamSqlRowType getDataType() { - return dataType; - } - - public void setDataType(BeamSqlRowType dataType) { - this.dataType = dataType; - } - - public void setNullFields(List<Integer> nullFields) { - this.nullFields = nullFields; - } - - public List<Integer> getNullFields() { - return nullFields; - } - - /** - * is the specified field NULL? - */ - public boolean isNull(int idx) { - return nullFields.contains(idx); - } - - public Instant getWindowStart() { - return windowStart; - } - - public Instant getWindowEnd() { - return windowEnd; - } - - public void setWindowStart(Instant windowStart) { - this.windowStart = windowStart; - } - - public void setWindowEnd(Instant windowEnd) { - this.windowEnd = windowEnd; - } - - @Override - public String toString() { - return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" - + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; - } - - /** - * Return data fields as key=value. - */ - public String valueInString() { - StringBuilder sb = new StringBuilder(); - for (int idx = 0; idx < size(); ++idx) { - sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); - } - return sb.substring(1); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - BeamSqlRow other = (BeamSqlRow) obj; - return toString().equals(other.toString()); - } - - @Override public int hashCode() { - return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java deleted file mode 100644 index f14864a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.List; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.ByteCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.DoubleCoder; -import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; - -/** - * A {@link Coder} encodes {@link BeamSqlRow}. - */ -public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { - private BeamSqlRowType tableSchema; - - private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); - - private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); - private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); - private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); - private static final DoubleCoder doubleCoder = DoubleCoder.of(); - private static final InstantCoder instantCoder = InstantCoder.of(); - private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); - private static final ByteCoder byteCoder = ByteCoder.of(); - - public BeamSqlRowCoder(BeamSqlRowType tableSchema) { - this.tableSchema = tableSchema; - } - - @Override - public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { - listCoder.encode(value.getNullFields(), outStream); - for (int idx = 0; idx < value.size(); ++idx) { - if (value.getNullFields().contains(idx)) { - continue; - } - - switch (CalciteUtils.getFieldType(value.getDataType(), idx)) { - case INTEGER: - intCoder.encode(value.getInteger(idx), outStream); - break; - case SMALLINT: - intCoder.encode((int) value.getShort(idx), outStream); - break; - case TINYINT: - byteCoder.encode(value.getByte(idx), outStream); - break; - case DOUBLE: - doubleCoder.encode(value.getDouble(idx), outStream); - break; - case FLOAT: - doubleCoder.encode((double) value.getFloat(idx), outStream); - break; - case DECIMAL: - bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); - break; - case BIGINT: - longCoder.encode(value.getLong(idx), outStream); - break; - case VARCHAR: - case CHAR: - stringCoder.encode(value.getString(idx), outStream); - break; - case TIME: - longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); - break; - case DATE: - case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream); - break; - case BOOLEAN: - byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream); - break; - - default: - throw new UnsupportedOperationException( - "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!"); - } - } - - instantCoder.encode(value.getWindowStart(), outStream); - instantCoder.encode(value.getWindowEnd(), outStream); - } - - @Override - public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { - List<Integer> nullFields = listCoder.decode(inStream); - - BeamSqlRow record = new BeamSqlRow(tableSchema); - record.setNullFields(nullFields); - for (int idx = 0; idx < tableSchema.size(); ++idx) { - if (nullFields.contains(idx)) { - continue; - } - - switch (CalciteUtils.getFieldType(tableSchema, idx)) { - case INTEGER: - record.addField(idx, intCoder.decode(inStream)); - break; - case SMALLINT: - record.addField(idx, intCoder.decode(inStream).shortValue()); - break; - case TINYINT: - record.addField(idx, byteCoder.decode(inStream)); - break; - case DOUBLE: - record.addField(idx, doubleCoder.decode(inStream)); - break; - case FLOAT: - record.addField(idx, doubleCoder.decode(inStream).floatValue()); - break; - case BIGINT: - record.addField(idx, longCoder.decode(inStream)); - break; - case DECIMAL: - record.addField(idx, bigDecimalCoder.decode(inStream)); - break; - case VARCHAR: - case CHAR: - record.addField(idx, stringCoder.decode(inStream)); - break; - case TIME: - GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date(longCoder.decode(inStream))); - record.addField(idx, calendar); - break; - case DATE: - case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream))); - break; - case BOOLEAN: - record.addField(idx, byteCoder.decode(inStream) == 1); - break; - - default: - throw new UnsupportedOperationException("Data type: " - + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx)) - + " not supported yet!"); - } - } - - record.setWindowStart(instantCoder.decode(inStream)); - record.setWindowEnd(instantCoder.decode(inStream)); - - return record; - } - - public BeamSqlRowType getTableSchema() { - return tableSchema; - } - - @Override - public void verifyDeterministic() - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java deleted file mode 100644 index 1129bdd..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import java.util.List; - -/** - * Field type information in {@link BeamSqlRow}. - * - */ -@AutoValue -public abstract class BeamSqlRowType implements Serializable { - public abstract List<String> getFieldsName(); - public abstract List<Integer> getFieldsType(); - - public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) { - return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes); - } - - public int size() { - return getFieldsName().size(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eca2ec/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java deleted file mode 100644 index d419473..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -/** - * This interface defines a Beam Sql Table. - */ -public interface BeamSqlTable { - /** - * In Beam SQL, there's no difference between a batch query and a streaming - * query. {@link BeamIOType} is used to validate the sources. - */ - BeamIOType getSourceType(); - - /** - * create a {@code PCollection<BeamSqlRow>} from source. - * - */ - PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline); - - /** - * create a {@code IO.write()} instance to write to target. - * - */ - PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter(); - - /** - * Get the schema info of the table. - */ - BeamSqlRowType getRowType(); -}