http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java index ea5f749..06dce91 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -56,7 +56,7 @@ public class BeamSqlPositionExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String targetStr = opValueEvaluated(0, inputRow); String containingStr = opValueEvaluated(1, inputRow); int from = -1;
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java index 25f205a..f8582aa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -54,7 +54,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String str = opValueEvaluated(0, inputRow); int idx = opValueEvaluated(1, inputRow); int startIdx = idx; http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java index 9493e24..9c2a7ae 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.fun.SqlTrimFunction; import org.apache.calcite.sql.type.SqlTypeName; @@ -58,7 +58,7 @@ public class BeamSqlTrimExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { if (operands.size() == 1) { return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, opValueEvaluated(0, inputRow).toString().trim()); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java index 9769c0e..94ac2e2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -32,7 +32,7 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String str = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase()); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java index dd01a87..b421bc3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -107,7 +107,7 @@ public class BeamQueryPlanner { * which is linked with the given {@code pipeline}. The final output stream is returned as * {@code PCollection} so more operations can be applied. */ - public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline + public PCollection<BeamRecord> compileBeamPipeline(String sqlStatement, Pipeline basePipeline , BeamSqlEnv sqlEnv) throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 8e78684..d91b484 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -19,13 +19,12 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -55,7 +55,7 @@ import org.joda.time.Duration; */ public class BeamAggregationRel extends Aggregate implements BeamRelNode { private int windowFieldIdx = -1; - private WindowFn<BeamSqlRow, BoundedWindow> windowFn; + private WindowFn<BeamRecord, BoundedWindow> windowFn; private Trigger trigger; private Duration allowedLatence = Duration.ZERO; @@ -71,12 +71,12 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this) + "_"; - PCollection<BeamSqlRow> upstream = + PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps @@ -84,14 +84,14 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { .setCoder(upstream.getCoder()); } - PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window", + PCollection<BeamRecord> windowStream = upstream.apply(stageName + "window", Window.into(windowFn) .triggering(trigger) .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); - BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( + BeamRecordCoder keyCoder = exKeyFieldsSchema(input.getRowType()).getRecordCoder(); + PCollection<KV<BeamRecord, BeamRecord>> exCombineByStream = windowStream.apply( stageName + "exCombineBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( @@ -99,19 +99,19 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); + BeamRecordCoder aggCoder = exAggFieldsSchema().getRecordCoder(); - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( + PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = exCombineByStream.apply( stageName + "combineBy", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( + Combine.<BeamRecord, BeamRecord, BeamRecord>perKey( new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(), CalciteUtils.toBeamRowType(input.getRowType())))) .setCoder(KvCoder.of(keyCoder, aggCoder)); - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", + PCollection<BeamRecord> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx))); - mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + mergedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); return mergedStream; } @@ -119,8 +119,8 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { /** * Type of sub-rowrecord used as Group-By keys. */ - private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) { - BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType); + private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) { + BeamSqlRecordType inputRowType = CalciteUtils.toBeamRowType(relDataType); List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (int i : groupSet.asList()) { @@ -129,13 +129,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { fieldTypes.add(inputRowType.getFieldsType().get(i)); } } - return BeamSqlRowType.create(fieldNames, fieldTypes); + return BeamSqlRecordType.create(fieldNames, fieldTypes); } /** * Type of sub-rowrecord, that represents the list of aggregation fields. */ - private BeamSqlRowType exAggFieldsSchema() { + private BeamSqlRecordType exAggFieldsSchema() { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (AggregateCall ac : getAggCallList()) { @@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); } - return BeamSqlRowType.create(fieldNames, fieldTypes); + return BeamSqlRecordType.create(fieldNames, fieldTypes); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java index b453db4..8fe5be4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java @@ -22,9 +22,8 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExec import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -50,19 +49,19 @@ public class BeamFilterRel extends Filter implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); - PCollection<BeamSqlRow> upstream = + PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, + PCollection<BeamRecord> filterStream = upstream.apply(stageName, ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); - filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); return filterStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index d5eb210..1e3eb4c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -21,7 +21,7 @@ import com.google.common.base.Joiner; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -55,12 +55,12 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { * which is the persisted PCollection. */ @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); - PCollection<BeamSqlRow> upstream = + PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index 5179eba..254f990 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -21,8 +21,7 @@ import com.google.common.base.Joiner; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -42,21 +41,21 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName); + TupleTag<BeamRecord> sourceTupleTag = new TupleTag<>(sourceName); if (inputPCollections.has(sourceTupleTag)) { //choose PCollection from input PCollectionTuple if exists there. - PCollection<BeamSqlRow> sourceStream = inputPCollections - .get(new TupleTag<BeamSqlRow>(sourceName)); + PCollection<BeamRecord> sourceStream = inputPCollections + .get(new TupleTag<BeamRecord>(sourceName)); return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java index d6ab52d..5919329 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -51,7 +51,7 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode { return new BeamIntersectRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 2de2a89..9e5ce2f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -27,14 +27,13 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -93,15 +92,15 @@ public class BeamJoinRel extends Join implements BeamRelNode { joinType); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, + @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) throws Exception { BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); - BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); - PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + BeamSqlRecordType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); + PCollection<BeamRecord> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); - PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + PCollection<BeamRecord> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); String stageName = BeamSqlRelUtils.getStageName(this); WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); @@ -119,24 +118,24 @@ public class BeamJoinRel extends Join implements BeamRelNode { names.add("c" + i); types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); } - BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types); + BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types); - Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); + Coder extractKeyRowCoder = extractKeyRowType.getRecordCoder(); // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow> - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows + PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows = leftRows .apply(stageName + "_left_ExtractJoinFields", MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows + PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows = rightRows .apply(stageName + "_right_ExtractJoinFields", MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); // prepare the NullRows - BeamSqlRow leftNullRow = buildNullRow(leftRelNode); - BeamSqlRow rightNullRow = buildNullRow(rightRelNode); + BeamRecord leftNullRow = buildNullRow(leftRelNode); + BeamRecord rightNullRow = buildNullRow(rightRelNode); // a regular join if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED @@ -184,11 +183,11 @@ public class BeamJoinRel extends Join implements BeamRelNode { } } - private PCollection<BeamSqlRow> standardJoin( - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, - BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { - PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null; + private PCollection<BeamRecord> standardJoin( + PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows, + PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows, + BeamRecord leftNullRow, BeamRecord rightNullRow, String stageName) { + PCollection<KV<BeamRecord, KV<BeamRecord, BeamRecord>>> joinedRows = null; switch (joinType) { case LEFT: joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join @@ -210,53 +209,53 @@ public class BeamJoinRel extends Join implements BeamRelNode { break; } - PCollection<BeamSqlRow> ret = joinedRows + PCollection<BeamRecord> ret = joinedRows .apply(stageName + "_JoinParts2WholeRow", MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); return ret; } - public PCollection<BeamSqlRow> sideInputJoin( - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, - BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { + public PCollection<BeamRecord> sideInputJoin( + PCollection<KV<BeamRecord, BeamRecord>> extractedLeftRows, + PCollection<KV<BeamRecord, BeamRecord>> extractedRightRows, + BeamRecord leftNullRow, BeamRecord rightNullRow) { // we always make the Unbounded table on the left to do the sideInput join // (will convert the result accordingly before return) boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); JoinRelType realJoinType = (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; - PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows = + PCollection<KV<BeamRecord, BeamRecord>> realLeftRows = swapped ? extractedRightRows : extractedLeftRows; - PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows = + PCollection<KV<BeamRecord, BeamRecord>> realRightRows = swapped ? extractedLeftRows : extractedRightRows; - BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; + BeamRecord realRightNullRow = swapped ? leftNullRow : rightNullRow; // swapped still need to pass down because, we need to swap the result back. return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped); } - private PCollection<BeamSqlRow> sideInputJoinHelper( + private PCollection<BeamRecord> sideInputJoinHelper( JoinRelType joinType, - PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows, - BeamSqlRow rightNullRow, boolean swapped) { - final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows - .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap()); + PCollection<KV<BeamRecord, BeamRecord>> leftRows, + PCollection<KV<BeamRecord, BeamRecord>> rightRows, + BeamRecord rightNullRow, boolean swapped) { + final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows + .apply(View.<BeamRecord, BeamRecord>asMultimap()); - PCollection<BeamSqlRow> ret = leftRows + PCollection<BeamRecord> ret = leftRows .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); return ret; } - private BeamSqlRow buildNullRow(BeamRelNode relNode) { - BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); - BeamSqlRow nullRow = new BeamSqlRow(leftType); + private BeamRecord buildNullRow(BeamRelNode relNode) { + BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); + BeamRecord nullRow = new BeamRecord(leftType); for (int i = 0; i < leftType.size(); i++) { nullRow.addField(i, null); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java index 0075d3a..b55252a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -49,7 +49,7 @@ public class BeamMinusRel extends Minus implements BeamRelNode { return new BeamMinusRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java index 6ccb156..b1ff629 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java @@ -23,9 +23,8 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExec import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -60,20 +59,20 @@ public class BeamProjectRel extends Project implements BeamRelNode { } @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); - PCollection<BeamSqlRow> upstream = + PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo + PCollection<BeamRecord> projectStream = upstream.apply(stageName, ParDo .of(new BeamSqlProjectFn(getRelTypeName(), executor, CalciteUtils.toBeamRowType(rowType)))); - projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); return projectStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java index 8a51cc7..b8b4293 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.rel.RelNode; @@ -33,6 +33,6 @@ public interface BeamRelNode extends RelNode { * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) * algorithm. */ - PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) throws Exception; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java index 44e4338..f9cbf4f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java @@ -22,13 +22,13 @@ import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -62,11 +62,11 @@ public class BeamSetOperatorRelBase { this.all = all; } - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { - PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) + PCollection<BeamRecord> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) .buildBeamPipeline(inputPCollections, sqlEnv); - PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) + PCollection<BeamRecord> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) .buildBeamPipeline(inputPCollections, sqlEnv); WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); @@ -77,20 +77,20 @@ public class BeamSetOperatorRelBase { + leftWindow + " VS " + rightWindow); } - final TupleTag<BeamSqlRow> leftTag = new TupleTag<>(); - final TupleTag<BeamSqlRow> rightTag = new TupleTag<>(); + final TupleTag<BeamRecord> leftTag = new TupleTag<>(); + final TupleTag<BeamRecord> rightTag = new TupleTag<>(); // co-group String stageName = BeamSqlRelUtils.getStageName(beamRelNode); - PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple + PCollection<KV<BeamRecord, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple .of(leftTag, leftRows.apply( stageName + "_CreateLeftIndex", MapElements.via( new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) .and(rightTag, rightRows.apply( stageName + "_CreateRightIndex", MapElements.via( new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) - .apply(CoGroupByKey.<BeamSqlRow>create()); - PCollection<BeamSqlRow> ret = coGbkResultCollection + .apply(CoGroupByKey.<BeamRecord>create()); + PCollection<BeamRecord> ret = coGbkResultCollection .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, opType, all))); return ret; http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 4ea12ca..0cbea5c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -27,13 +27,13 @@ import java.util.List; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -120,10 +120,10 @@ public class BeamSortRel extends Sort implements BeamRelNode { } } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); - PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input) + PCollection<BeamRecord> upstream = BeamSqlRelUtils.getBeamRelInput(input) .buildBeamPipeline(inputPCollections, sqlEnv); Type windowType = upstream.getWindowingStrategy().getWindowFn() .getWindowTypeDescriptor().getType(); @@ -135,21 +135,21 @@ public class BeamSortRel extends Sort implements BeamRelNode { BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst); // first find the top (offset + count) - PCollection<List<BeamSqlRow>> rawStream = + PCollection<List<BeamRecord>> rawStream = upstream.apply("extractTopOffsetAndFetch", Top.of(startIndex + count, comparator).withoutDefaults()) - .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); + .setCoder(ListCoder.<BeamRecord>of(upstream.getCoder())); // strip the `leading offset` if (startIndex > 0) { rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( - new SubListFn<BeamSqlRow>(startIndex, startIndex + count))) - .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder())); + new SubListFn<BeamRecord>(startIndex, startIndex + count))) + .setCoder(ListCoder.<BeamRecord>of(upstream.getCoder())); } - PCollection<BeamSqlRow> orderedStream = rawStream.apply( - "flatten", Flatten.<BeamSqlRow>iterables()); - orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + PCollection<BeamRecord> orderedStream = rawStream.apply( + "flatten", Flatten.<BeamRecord>iterables()); + orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder()); return orderedStream; } @@ -174,7 +174,7 @@ public class BeamSortRel extends Sort implements BeamRelNode { return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); } - private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable { + private static class BeamSqlRowComparator implements Comparator<BeamRecord>, Serializable { private List<Integer> fieldsIndices; private List<Boolean> orientation; private List<Boolean> nullsFirst; @@ -187,11 +187,12 @@ public class BeamSortRel extends Sort implements BeamRelNode { this.nullsFirst = nullsFirst; } - @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) { + @Override public int compare(BeamRecord row1, BeamRecord row2) { for (int i = 0; i < fieldsIndices.size(); i++) { int fieldIndex = fieldsIndices.get(i); int fieldRet = 0; - SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); + SqlTypeName fieldType = CalciteUtils.getFieldType( + BeamSqlRecordHelper.getSqlRecordType(row1), fieldIndex); // whether NULL should be ordered first or last(compared to non-null values) depends on // what user specified in SQL(NULLS FIRST/NULLS LAST) if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { @@ -203,28 +204,16 @@ public class BeamSortRel extends Sort implements BeamRelNode { } else { switch (fieldType) { case TINYINT: - fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex)); - break; case SMALLINT: - fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex)); - break; case INTEGER: - fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex)); - break; case BIGINT: - fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex)); - break; case FLOAT: - fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex)); - break; case DOUBLE: - fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex)); - break; case VARCHAR: - fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex)); - break; case DATE: - fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex)); + Comparable v1 = (Comparable) row1.getFieldValue(fieldIndex); + Comparable v2 = (Comparable) row2.getFieldValue(fieldIndex); + fieldRet = v1.compareTo(v2); break; default: throw new UnsupportedOperationException( @@ -241,7 +230,7 @@ public class BeamSortRel extends Sort implements BeamRelNode { } } - public static <T extends Number & Comparable> int numberCompare(T a, T b) { + public static <T extends Comparable> int compare(T a, T b) { return a.compareTo(b); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java index d35fa67..63ebdf3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java @@ -20,8 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -81,7 +81,7 @@ public class BeamUnionRel extends Union implements BeamRelNode { return new BeamUnionRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index f12cbbc..8ad6e8d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -23,11 +23,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -56,17 +55,17 @@ public class BeamValuesRel extends Values implements BeamRelNode { } - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + @Override public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections , BeamSqlEnv sqlEnv) throws Exception { - List<BeamSqlRow> rows = new ArrayList<>(tuples.size()); + List<BeamRecord> rows = new ArrayList<>(tuples.size()); String stageName = BeamSqlRelUtils.getStageName(this); if (tuples.isEmpty()) { throw new IllegalStateException("Values with empty tuples!"); } - BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); + BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); for (ImmutableList<RexLiteral> tuple : tuples) { - BeamSqlRow row = new BeamSqlRow(beamSQLRowType); + BeamRecord row = new BeamRecord(beamSQLRowType); for (int i = 0; i < tuple.size(); i++) { BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); } @@ -74,6 +73,6 @@ public class BeamValuesRel extends Values implements BeamRelNode { } return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) - .setCoder(new BeamSqlRowCoder(beamSQLRowType)); + .setCoder(beamSQLRowType.getRecordCoder()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java index 095875f..dab79a2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java @@ -35,13 +35,14 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.schema.impl.AggregateFunctionImpl; @@ -56,12 +57,12 @@ public class BeamAggregationTransforms implements Serializable{ /** * Merge KV to single record. */ - public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { - private BeamSqlRowType outRowType; + public static class MergeAggregationRecord extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> { + private BeamSqlRecordType outRowType; private List<String> aggFieldNames; private int windowStartFieldIdx; - public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList + public MergeAggregationRecord(BeamSqlRecordType outRowType, List<AggregateCall> aggList , int windowStartFieldIdx) { this.outRowType = outRowType; this.aggFieldNames = new ArrayList<>(); @@ -73,10 +74,10 @@ public class BeamAggregationTransforms implements Serializable{ @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - BeamSqlRow outRecord = new BeamSqlRow(outRowType); + BeamRecord outRecord = new BeamRecord(outRowType); outRecord.updateWindowRange(c.element().getKey(), window); - KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element(); + KV<BeamRecord, BeamRecord> kvRecord = c.element(); for (String f : kvRecord.getKey().getDataType().getFieldsName()) { outRecord.addField(f, kvRecord.getKey().getFieldValue(f)); } @@ -95,7 +96,7 @@ public class BeamAggregationTransforms implements Serializable{ * extract group-by fields. */ public static class AggregationGroupByKeyFn - implements SerializableFunction<BeamSqlRow, BeamSqlRow> { + implements SerializableFunction<BeamRecord, BeamRecord> { private List<Integer> groupByKeys; public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) { @@ -108,9 +109,9 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public BeamSqlRow apply(BeamSqlRow input) { - BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType()); - BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey); + public BeamRecord apply(BeamRecord input) { + BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input)); + BeamRecord keyOfRecord = new BeamRecord(typeOfKey); keyOfRecord.updateWindowRange(input, null); for (int idx = 0; idx < groupByKeys.size(); ++idx) { @@ -119,21 +120,21 @@ public class BeamAggregationTransforms implements Serializable{ return keyOfRecord; } - private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) { + private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (int idx : groupByKeys) { fieldNames.add(dataType.getFieldsName().get(idx)); fieldTypes.add(dataType.getFieldsType().get(idx)); } - return BeamSqlRowType.create(fieldNames, fieldTypes); + return BeamSqlRecordType.create(fieldNames, fieldTypes); } } /** * Assign event timestamp. */ - public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> { + public static class WindowTimestampFn implements SerializableFunction<BeamRecord, Instant> { private int windowFieldIdx = -1; public WindowTimestampFn(int windowFieldIdx) { @@ -142,7 +143,7 @@ public class BeamAggregationTransforms implements Serializable{ } @Override - public Instant apply(BeamSqlRow input) { + public Instant apply(BeamRecord input) { return new Instant(input.getDate(windowFieldIdx).getTime()); } } @@ -151,13 +152,13 @@ public class BeamAggregationTransforms implements Serializable{ * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}. */ public static class AggregationAdaptor - extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> { + extends CombineFn<BeamRecord, AggregationAccumulator, BeamRecord> { private List<BeamSqlUdaf> aggregators; private List<BeamSqlExpression> sourceFieldExps; - private BeamSqlRowType finalRowType; + private BeamSqlRecordType finalRowType; public AggregationAdaptor(List<AggregateCall> aggregationCalls, - BeamSqlRowType sourceRowType) { + BeamSqlRecordType sourceRowType) { aggregators = new ArrayList<>(); sourceFieldExps = new ArrayList<>(); List<String> outFieldsName = new ArrayList<>(); @@ -206,7 +207,7 @@ public class BeamAggregationTransforms implements Serializable{ break; } } - finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType); + finalRowType = BeamSqlRecordType.create(outFieldsName, outFieldsType); } @Override public AggregationAccumulator createAccumulator() { @@ -217,7 +218,7 @@ public class BeamAggregationTransforms implements Serializable{ return initialAccu; } @Override - public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) { + public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamRecord input) { AggregationAccumulator deltaAcc = new AggregationAccumulator(); for (int idx = 0; idx < aggregators.size(); ++idx) { deltaAcc.accumulatorElements.add( @@ -240,8 +241,8 @@ public class BeamAggregationTransforms implements Serializable{ return deltaAcc; } @Override - public BeamSqlRow extractOutput(AggregationAccumulator accumulator) { - BeamSqlRow result = new BeamSqlRow(finalRowType); + public BeamRecord extractOutput(AggregationAccumulator accumulator) { + BeamRecord result = new BeamRecord(finalRowType); for (int idx = 0; idx < aggregators.size(); ++idx) { result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); } @@ -249,7 +250,7 @@ public class BeamAggregationTransforms implements Serializable{ } @Override public Coder<AggregationAccumulator> getAccumulatorCoder( - CoderRegistry registry, Coder<BeamSqlRow> inputCoder) + CoderRegistry registry, Coder<BeamRecord> inputCoder) throws CannotProvideCoderException { registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of()); List<Coder> aggAccuCoderList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index e0898d1..105bbf3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -22,10 +22,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.calcite.rel.core.JoinRelType; @@ -40,7 +41,7 @@ public class BeamJoinTransforms { * A {@code SimpleFunction} to extract join fields from the specified row. */ public static class ExtractJoinFields - extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + extends SimpleFunction<BeamRecord, KV<BeamRecord, BeamRecord>> { private final boolean isLeft; private final List<Pair<Integer, Integer>> joinColumns; @@ -49,7 +50,7 @@ public class BeamJoinTransforms { this.joinColumns = joinColumns; } - @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + @Override public KV<BeamRecord, BeamRecord> apply(BeamRecord input) { // build the type // the name of the join field is not important List<String> names = new ArrayList<>(joinColumns.size()); @@ -57,13 +58,15 @@ public class BeamJoinTransforms { for (int i = 0; i < joinColumns.size(); i++) { names.add("c" + i); types.add(isLeft - ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : - input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); + ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType() + .get(joinColumns.get(i).getKey()) + : BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType() + .get(joinColumns.get(i).getValue())); } - BeamSqlRowType type = BeamSqlRowType.create(names, types); + BeamSqlRecordType type = BeamSqlRecordType.create(names, types); // build the row - BeamSqlRow row = new BeamSqlRow(type); + BeamRecord row = new BeamRecord(type); for (int i = 0; i < joinColumns.size(); i++) { row.addField(i, input .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); @@ -76,14 +79,14 @@ public class BeamJoinTransforms { /** * A {@code DoFn} which implement the sideInput-JOIN. */ - public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { - private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView; + public static class SideInputJoinDoFn extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> { + private final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> sideInputView; private final JoinRelType joinType; - private final BeamSqlRow rightNullRow; + private final BeamRecord rightNullRow; private final boolean swap; - public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, - PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView, + public SideInputJoinDoFn(JoinRelType joinType, BeamRecord rightNullRow, + PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> sideInputView, boolean swap) { this.joinType = joinType; this.rightNullRow = rightNullRow; @@ -92,13 +95,13 @@ public class BeamJoinTransforms { } @ProcessElement public void processElement(ProcessContext context) { - BeamSqlRow key = context.element().getKey(); - BeamSqlRow leftRow = context.element().getValue(); - Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView); - Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key); + BeamRecord key = context.element().getKey(); + BeamRecord leftRow = context.element().getValue(); + Map<BeamRecord, Iterable<BeamRecord>> key2Rows = context.sideInput(sideInputView); + Iterable<BeamRecord> rightRowsIterable = key2Rows.get(key); if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { - Iterator<BeamSqlRow> it = rightRowsIterable.iterator(); + Iterator<BeamRecord> it = rightRowsIterable.iterator(); while (it.hasNext()) { context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap)); } @@ -115,11 +118,11 @@ public class BeamJoinTransforms { * A {@code SimpleFunction} to combine two rows into one. */ public static class JoinParts2WholeRow - extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> { - @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) { - KV<BeamSqlRow, BeamSqlRow> parts = input.getValue(); - BeamSqlRow leftRow = parts.getKey(); - BeamSqlRow rightRow = parts.getValue(); + extends SimpleFunction<KV<BeamRecord, KV<BeamRecord, BeamRecord>>, BeamRecord> { + @Override public BeamRecord apply(KV<BeamRecord, KV<BeamRecord, BeamRecord>> input) { + KV<BeamRecord, BeamRecord> parts = input.getValue(); + BeamRecord leftRow = parts.getKey(); + BeamRecord rightRow = parts.getValue(); return combineTwoRowsIntoOne(leftRow, rightRow, false); } } @@ -127,8 +130,8 @@ public class BeamJoinTransforms { /** * As the method name suggests: combine two rows into one wide row. */ - private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, - BeamSqlRow rightRow, boolean swap) { + private static BeamRecord combineTwoRowsIntoOne(BeamRecord leftRow, + BeamRecord rightRow, boolean swap) { if (swap) { return combineTwoRowsIntoOneHelper(rightRow, leftRow); } else { @@ -139,19 +142,19 @@ public class BeamJoinTransforms { /** * As the method name suggests: combine two rows into one wide row. */ - private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow, - BeamSqlRow rightRow) { + private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow, + BeamRecord rightRow) { // build the type List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); names.addAll(leftRow.getDataType().getFieldsName()); names.addAll(rightRow.getDataType().getFieldsName()); List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); - types.addAll(leftRow.getDataType().getFieldsType()); - types.addAll(rightRow.getDataType().getFieldsType()); - BeamSqlRowType type = BeamSqlRowType.create(names, types); + types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType()); + types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType()); + BeamSqlRecordType type = BeamSqlRecordType.create(names, types); - BeamSqlRow row = new BeamSqlRow(type); + BeamRecord row = new BeamRecord(type); // build the row for (int i = 0; i < leftRow.size(); i++) { row.addField(i, leftRow.getFieldValue(i)); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java index 326b328..33ac807 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java @@ -20,10 +20,10 @@ package org.apache.beam.sdk.extensions.sql.impl.transform; import java.util.Iterator; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; @@ -35,8 +35,8 @@ public abstract class BeamSetOperatorsTransforms { * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}. */ public static class BeamSqlRow2KvFn extends - SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { - @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + SimpleFunction<BeamRecord, KV<BeamRecord, BeamRecord>> { + @Override public KV<BeamRecord, BeamRecord> apply(BeamRecord input) { return KV.of(input, input); } } @@ -45,14 +45,14 @@ public abstract class BeamSetOperatorsTransforms { * Filter function used for Set operators. */ public static class SetOperatorFilteringDoFn extends - DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> { - private TupleTag<BeamSqlRow> leftTag; - private TupleTag<BeamSqlRow> rightTag; + DoFn<KV<BeamRecord, CoGbkResult>, BeamRecord> { + private TupleTag<BeamRecord> leftTag; + private TupleTag<BeamRecord> rightTag; private BeamSetOperatorRelBase.OpType opType; // ALL? private boolean all; - public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag, + public SetOperatorFilteringDoFn(TupleTag<BeamRecord> leftTag, TupleTag<BeamRecord> rightTag, BeamSetOperatorRelBase.OpType opType, boolean all) { this.leftTag = leftTag; this.rightTag = rightTag; @@ -62,13 +62,13 @@ public abstract class BeamSetOperatorsTransforms { @ProcessElement public void processElement(ProcessContext ctx) { CoGbkResult coGbkResult = ctx.element().getValue(); - Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag); - Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag); + Iterable<BeamRecord> leftRows = coGbkResult.getAll(leftTag); + Iterable<BeamRecord> rightRows = coGbkResult.getAll(rightTag); switch (opType) { case UNION: if (all) { // output both left & right - Iterator<BeamSqlRow> iter = leftRows.iterator(); + Iterator<BeamRecord> iter = leftRows.iterator(); while (iter.hasNext()) { ctx.output(iter.next()); } @@ -84,7 +84,7 @@ public abstract class BeamSetOperatorsTransforms { case INTERSECT: if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { if (all) { - for (BeamSqlRow leftRow : leftRows) { + for (BeamRecord leftRow : leftRows) { ctx.output(leftRow); } } else { @@ -94,7 +94,7 @@ public abstract class BeamSetOperatorsTransforms { break; case MINUS: if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { - Iterator<BeamSqlRow> iter = leftRows.iterator(); + Iterator<BeamRecord> iter = leftRows.iterator(); if (all) { // output all while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java index 855de7a..31efeb7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java @@ -20,14 +20,14 @@ package org.apache.beam.sdk.extensions.sql.impl.transform; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.BeamRecord; /** * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step. * */ -public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> { +public class BeamSqlFilterFn extends DoFn<BeamRecord, BeamRecord> { private String stepName; private BeamSqlExpressionExecutor executor; @@ -45,7 +45,7 @@ public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> { @ProcessElement public void processElement(ProcessContext c) { - BeamSqlRow in = c.element(); + BeamRecord in = c.element(); List<Object> result = executor.execute(in); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java index b40cfa6..f97a90a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java @@ -17,14 +17,14 @@ */ package org.apache.beam.sdk.extensions.sql.impl.transform; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.BeamRecord; /** * A test PTransform to display output in console. * */ -public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> { +public class BeamSqlOutputToConsoleFn extends DoFn<BeamRecord, Void> { private String stepName; http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java index b3f7ce5..a95c743 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java @@ -20,24 +20,24 @@ package org.apache.beam.sdk.extensions.sql.impl.transform; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.BeamRecord; /** * * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step. * */ -public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { +public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> { private String stepName; private BeamSqlExpressionExecutor executor; - private BeamSqlRowType outputRowType; + private BeamSqlRecordType outputRowType; public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor, - BeamSqlRowType outputRowType) { + BeamSqlRecordType outputRowType) { super(); this.stepName = stepName; this.executor = executor; @@ -51,10 +51,10 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - BeamSqlRow inputRow = c.element(); + BeamRecord inputRow = c.element(); List<Object> results = executor.execute(inputRow); - BeamSqlRow outRow = new BeamSqlRow(outputRowType); + BeamRecord outRow = new BeamRecord(outputRowType); outRow.updateWindowRange(inputRow, window); for (int idx = 0; idx < results.size(); ++idx) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index b80e045..bf96e85 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; @@ -78,27 +78,27 @@ public class CalciteUtils { /** * Get the {@code SqlTypeName} for the specified column of a table. */ - public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) { + public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { return toCalciteType(schema.getFieldsType().get(index)); } /** * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. */ - public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) { + public static BeamSqlRecordType toBeamRowType(RelDataType tableInfo) { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (RelDataTypeField f : tableInfo.getFieldList()) { fieldNames.add(f.getName()); fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); } - return BeamSqlRowType.create(fieldNames, fieldTypes); + return BeamSqlRecordType.create(fieldNames, fieldTypes); } /** * Create an instance of {@code RelDataType} so it can be used to create a table. */ - public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) { + public static RelProtoDataType toCalciteRowType(final BeamSqlRecordType that) { return new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java index bf41c95..68b120e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java @@ -23,12 +23,12 @@ import java.io.Serializable; * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. */ public abstract class BaseBeamTable implements BeamSqlTable, Serializable { - protected BeamSqlRowType beamSqlRowType; - public BaseBeamTable(BeamSqlRowType beamSqlRowType) { + protected BeamSqlRecordType beamSqlRowType; + public BaseBeamTable(BeamSqlRecordType beamSqlRowType) { this.beamSqlRowType = beamSqlRowType; } - @Override public BeamSqlRowType getRowType() { + @Override public BeamSqlRecordType getRowType() { return beamSqlRowType; } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java index 5bbb8fd..68905b5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.schema; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PDone; @@ -29,14 +30,14 @@ import org.apache.beam.sdk.values.PDone; */ public class BeamPCollectionTable extends BaseBeamTable { private BeamIOType ioType; - private transient PCollection<BeamSqlRow> upstream; + private transient PCollection<BeamRecord> upstream; - protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) { + protected BeamPCollectionTable(BeamSqlRecordType beamSqlRowType) { super(beamSqlRowType); } - public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, - BeamSqlRowType beamSqlRowType){ + public BeamPCollectionTable(PCollection<BeamRecord> upstream, + BeamSqlRecordType beamSqlRowType){ this(beamSqlRowType); ioType = upstream.isBounded().equals(IsBounded.BOUNDED) ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; @@ -49,12 +50,12 @@ public class BeamPCollectionTable extends BaseBeamTable { } @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { return upstream; } @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); }
