refactor BeamRecord, BeamRecordType, BeamSqlRecordType, BeamRecordCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89109b8c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89109b8c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89109b8c Branch: refs/heads/DSL_SQL Commit: 89109b8cdc667c4e07529e9748ed4290e88b9282 Parents: 129ae96 Author: mingmxu <[email protected]> Authored: Thu Aug 3 12:11:06 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Fri Aug 4 10:08:37 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BeamRecordCoder.java | 20 +- .../org/apache/beam/sdk/values/BeamRecord.java | 22 +- .../apache/beam/sdk/values/BeamRecordType.java | 70 ++++++ .../beam/sdk/values/BeamRecordTypeProvider.java | 59 ----- .../apache/beam/sdk/extensions/sql/BeamSql.java | 22 +- .../beam/sdk/extensions/sql/BeamSqlCli.java | 8 +- .../beam/sdk/extensions/sql/BeamSqlEnv.java | 6 +- .../extensions/sql/example/BeamSqlExample.java | 27 ++- .../interpreter/BeamSqlExpressionExecutor.java | 6 +- .../sql/impl/interpreter/BeamSqlFnExecutor.java | 6 +- .../operator/BeamSqlCaseExpression.java | 4 +- .../operator/BeamSqlCastExpression.java | 4 +- .../interpreter/operator/BeamSqlExpression.java | 8 +- .../operator/BeamSqlInputRefExpression.java | 4 +- .../interpreter/operator/BeamSqlPrimitive.java | 6 +- .../operator/BeamSqlReinterpretExpression.java | 4 +- .../operator/BeamSqlUdfExpression.java | 4 +- .../operator/BeamSqlWindowEndExpression.java | 4 +- .../operator/BeamSqlWindowExpression.java | 4 +- .../operator/BeamSqlWindowStartExpression.java | 4 +- .../arithmetic/BeamSqlArithmeticExpression.java | 4 +- .../comparison/BeamSqlCompareExpression.java | 4 +- .../comparison/BeamSqlIsNotNullExpression.java | 4 +- .../comparison/BeamSqlIsNullExpression.java | 4 +- .../date/BeamSqlCurrentDateExpression.java | 4 +- .../date/BeamSqlCurrentTimeExpression.java | 4 +- .../date/BeamSqlCurrentTimestampExpression.java | 4 +- .../date/BeamSqlDateCeilExpression.java | 4 +- .../date/BeamSqlDateFloorExpression.java | 4 +- .../operator/date/BeamSqlExtractExpression.java | 4 +- .../operator/logical/BeamSqlAndExpression.java | 4 +- .../operator/logical/BeamSqlNotExpression.java | 4 +- .../operator/logical/BeamSqlOrExpression.java | 4 +- .../math/BeamSqlMathBinaryExpression.java | 4 +- .../math/BeamSqlMathUnaryExpression.java | 4 +- .../operator/math/BeamSqlPiExpression.java | 4 +- .../operator/math/BeamSqlRandExpression.java | 4 +- .../math/BeamSqlRandIntegerExpression.java | 4 +- .../string/BeamSqlCharLengthExpression.java | 4 +- .../string/BeamSqlConcatExpression.java | 4 +- .../string/BeamSqlInitCapExpression.java | 4 +- .../operator/string/BeamSqlLowerExpression.java | 4 +- .../string/BeamSqlOverlayExpression.java | 4 +- .../string/BeamSqlPositionExpression.java | 4 +- .../string/BeamSqlSubstringExpression.java | 4 +- .../operator/string/BeamSqlTrimExpression.java | 4 +- .../operator/string/BeamSqlUpperExpression.java | 4 +- .../sql/impl/planner/BeamQueryPlanner.java | 4 +- .../sql/impl/rel/BeamAggregationRel.java | 38 ++-- .../extensions/sql/impl/rel/BeamFilterRel.java | 11 +- .../extensions/sql/impl/rel/BeamIOSinkRel.java | 6 +- .../sql/impl/rel/BeamIOSourceRel.java | 13 +- .../sql/impl/rel/BeamIntersectRel.java | 4 +- .../extensions/sql/impl/rel/BeamJoinRel.java | 75 ++++--- .../extensions/sql/impl/rel/BeamMinusRel.java | 4 +- .../extensions/sql/impl/rel/BeamProjectRel.java | 11 +- .../extensions/sql/impl/rel/BeamRelNode.java | 4 +- .../sql/impl/rel/BeamSetOperatorRelBase.java | 18 +- .../extensions/sql/impl/rel/BeamSortRel.java | 49 ++--- .../extensions/sql/impl/rel/BeamUnionRel.java | 4 +- .../extensions/sql/impl/rel/BeamValuesRel.java | 15 +- .../transform/BeamAggregationTransforms.java | 47 ++-- .../sql/impl/transform/BeamJoinTransforms.java | 65 +++--- .../transform/BeamSetOperatorsTransforms.java | 24 +- .../sql/impl/transform/BeamSqlFilterFn.java | 6 +- .../transform/BeamSqlOutputToConsoleFn.java | 4 +- .../sql/impl/transform/BeamSqlProjectFn.java | 14 +- .../extensions/sql/impl/utils/CalciteUtils.java | 10 +- .../extensions/sql/schema/BaseBeamTable.java | 6 +- .../sql/schema/BeamPCollectionTable.java | 13 +- .../sql/schema/BeamSqlRecordHelper.java | 217 +++++++++++++++++++ .../sql/schema/BeamSqlRecordType.java | 168 ++++++++++++++ .../sdk/extensions/sql/schema/BeamSqlRow.java | 41 ---- .../extensions/sql/schema/BeamSqlRowCoder.java | 186 ---------------- .../extensions/sql/schema/BeamSqlRowType.java | 109 ---------- .../sdk/extensions/sql/schema/BeamSqlTable.java | 7 +- .../extensions/sql/schema/BeamTableUtils.java | 14 +- .../sql/schema/kafka/BeamKafkaCSVTable.java | 38 ++-- .../sql/schema/kafka/BeamKafkaTable.java | 20 +- .../sql/schema/text/BeamTextCSVTable.java | 12 +- .../schema/text/BeamTextCSVTableIOReader.java | 14 +- .../schema/text/BeamTextCSVTableIOWriter.java | 16 +- .../sql/schema/text/BeamTextTable.java | 4 +- .../sql/BeamSqlDslAggregationTest.java | 80 +++---- .../beam/sdk/extensions/sql/BeamSqlDslBase.java | 51 +++-- .../extensions/sql/BeamSqlDslFilterTest.java | 26 +-- .../sdk/extensions/sql/BeamSqlDslJoinTest.java | 26 +-- .../extensions/sql/BeamSqlDslProjectTest.java | 64 +++--- .../extensions/sql/BeamSqlDslUdfUdafTest.java | 24 +- .../beam/sdk/extensions/sql/TestUtils.java | 30 +-- .../interpreter/BeamSqlFnExecutorTestBase.java | 10 +- .../sql/impl/rel/BeamIntersectRelTest.java | 6 +- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 10 +- .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 10 +- .../BeamJoinRelUnboundedVsUnboundedTest.java | 10 +- .../sql/impl/rel/BeamMinusRelTest.java | 6 +- .../impl/rel/BeamSetOperatorRelBaseTest.java | 4 +- .../sql/impl/rel/BeamSortRelTest.java | 12 +- .../sql/impl/rel/BeamUnionRelTest.java | 6 +- .../sql/impl/rel/BeamValuesRelTest.java | 8 +- .../sdk/extensions/sql/impl/rel/CheckSize.java | 8 +- ...mSqlBuiltinFunctionsIntegrationTestBase.java | 17 +- ...amSqlComparisonOperatorsIntegrationTest.java | 11 +- .../BeamSqlDateFunctionsIntegrationTest.java | 12 +- .../extensions/sql/mock/MockedBoundedTable.java | 24 +- .../sdk/extensions/sql/mock/MockedTable.java | 8 +- .../sql/mock/MockedUnboundedTable.java | 18 +- .../sql/schema/BeamSqlRowCoderTest.java | 8 +- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 14 +- .../sql/schema/text/BeamTextCSVTableTest.java | 16 +- .../transform/BeamAggregationTransformTest.java | 72 +++--- .../schema/transform/BeamTransformBaseTest.java | 18 +- 112 files changed, 1171 insertions(+), 1129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index 27f92ce..06958a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -24,7 +24,7 @@ import java.util.BitSet; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.BeamRecordTypeProvider; +import org.apache.beam.sdk.values.BeamRecordType; /** * A {@link Coder} for {@link BeamRecord}. It wraps the {@link Coder} for each element directly. @@ -34,31 +34,35 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { private static final BitSetCoder nullListCoder = BitSetCoder.of(); private static final InstantCoder instantCoder = InstantCoder.of(); - private BeamRecordTypeProvider recordType; + private BeamRecordType recordType; private List<Coder> coderArray; - private BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> coderArray) { + private BeamRecordCoder(BeamRecordType recordType, List<Coder> coderArray) { this.recordType = recordType; this.coderArray = coderArray; } - public static BeamRecordCoder of(BeamRecordTypeProvider recordType, List<Coder> coderArray){ + public static BeamRecordCoder of(BeamRecordType recordType, List<Coder> coderArray){ if (recordType.size() != coderArray.size()) { throw new IllegalArgumentException("Coder size doesn't match with field size"); } return new BeamRecordCoder(recordType, coderArray); } + public BeamRecordType getRecordType() { + return recordType; + } + @Override public void encode(BeamRecord value, OutputStream outStream) throws CoderException, IOException { nullListCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { - if (value.getNullFields().get(idx)) { + if (value.isNull(idx)) { continue; } - coderArray.get(idx).encode(value.getInteger(idx), outStream); + coderArray.get(idx).encode(value.getFieldValue(idx), outStream); } instantCoder.encode(value.getWindowStart(), outStream); @@ -70,7 +74,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { BitSet nullFields = nullListCoder.decode(inStream); BeamRecord record = new BeamRecord(recordType); - record.setNullFields(nullFields); for (int idx = 0; idx < recordType.size(); ++idx) { if (nullFields.get(idx)) { continue; @@ -88,5 +91,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { @Override public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + for (Coder c : coderArray) { + c.verifyDeterministic(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index 476233e..bac649e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -32,7 +32,7 @@ import org.joda.time.Instant; /** * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with - * {@link BeamRecordTypeProvider}, represents one element in a + * {@link BeamRecordType}, represents one element in a * {@link org.apache.beam.sdk.values.PCollection}. */ @Experimental @@ -40,12 +40,12 @@ public class BeamRecord implements Serializable { private List<Object> dataValues; //null values are indexed here, to handle properly in Coder. private BitSet nullFields; - private BeamRecordTypeProvider dataType; + private BeamRecordType dataType; private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); - public BeamRecord(BeamRecordTypeProvider dataType) { + public BeamRecord(BeamRecordType dataType) { this.dataType = dataType; this.nullFields = new BitSet(dataType.size()); this.dataValues = new ArrayList<>(); @@ -55,7 +55,7 @@ public class BeamRecord implements Serializable { } } - public BeamRecord(BeamRecordTypeProvider dataType, List<Object> dataValues) { + public BeamRecord(BeamRecordType dataType, List<Object> dataValues) { this(dataType); for (int idx = 0; idx < dataValues.size(); ++idx) { addField(idx, dataValues.get(idx)); @@ -137,10 +137,6 @@ public class BeamRecord implements Serializable { } public Object getFieldValue(int fieldIdx) { - if (nullFields.get(fieldIdx)) { - return null; - } - return dataValues.get(fieldIdx); } @@ -200,22 +196,14 @@ public class BeamRecord implements Serializable { this.dataValues = dataValues; } - public BeamRecordTypeProvider getDataType() { + public BeamRecordType getDataType() { return dataType; } - public void setDataType(BeamRecordTypeProvider dataType) { - this.dataType = dataType; - } - public BitSet getNullFields() { return nullFields; } - public void setNullFields(BitSet nullFields) { - this.nullFields = nullFields; - } - /** * is the specified field NULL? */ http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java new file mode 100644 index 0000000..3b20b50 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.BeamRecordCoder; +import org.apache.beam.sdk.coders.Coder; + +/** + * The default type provider used in {@link BeamRecord}. + */ +@Experimental +public class BeamRecordType implements Serializable{ + private List<String> fieldsName; + private List<Coder> fieldsCoder; + + public BeamRecordType(List<String> fieldsName, List<Coder> fieldsCoder) { + this.fieldsName = fieldsName; + this.fieldsCoder = fieldsCoder; + } + + /** + * Validate input fieldValue for a field. + * @throws IllegalArgumentException throw exception when the validation fails. + */ + public void validateValueType(int index, Object fieldValue) + throws IllegalArgumentException{ + //do nothing by default. + } + + /** + * Get the coder for {@link BeamRecordCoder}. + */ + public BeamRecordCoder getRecordCoder(){ + return BeamRecordCoder.of(this, fieldsCoder); + } + + public List<String> getFieldsName(){ + return fieldsName; + } + + public String getFieldByIndex(int index){ + return fieldsName.get(index); + } + + public int findIndexOfField(String fieldName){ + return fieldsName.indexOf(fieldName); + } + + public int size(){ + return fieldsName.size(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java deleted file mode 100644 index 63a961c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.values; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; - -/** - * The default type provider used in {@link BeamRecord}. - */ -@Experimental -public class BeamRecordTypeProvider implements Serializable{ - private List<String> fieldsName; - - public BeamRecordTypeProvider(List<String> fieldsName) { - this.fieldsName = fieldsName; - } - - /** - * Validate input fieldValue for a field. - * @throws IllegalArgumentException throw exception when the validation fails. - */ - public void validateValueType(int index, Object fieldValue) - throws IllegalArgumentException{ - //do nothing by default. - } - - public List<String> getFieldsName(){ - return fieldsName; - } - - public String getFieldByIndex(int index){ - return fieldsName.get(index); - } - - public int findIndexOfField(String fieldName){ - return fieldsName.indexOf(fieldName); - } - - public int size(){ - return fieldsName.size(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index 0dabf40..86e4d8d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -19,13 +19,14 @@ package org.apache.beam.sdk.extensions.sql; import com.google.auto.value.AutoValue; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -111,7 +112,7 @@ public class BeamSql { */ @AutoValue public abstract static class QueryTransform extends - PTransform<PCollectionTuple, PCollection<BeamSqlRow>> { + PTransform<PCollectionTuple, PCollection<BeamRecord>> { abstract BeamSqlEnv getSqlEnv(); abstract String getSqlQuery(); @@ -143,7 +144,7 @@ public class BeamSql { } @Override - public PCollection<BeamSqlRow> expand(PCollectionTuple input) { + public PCollection<BeamRecord> expand(PCollectionTuple input) { registerTables(input); BeamRelNode beamRelNode = null; @@ -163,11 +164,12 @@ public class BeamSql { //register tables, related with input PCollections. private void registerTables(PCollectionTuple input){ for (TupleTag<?> sourceTag : input.getAll().keySet()) { - PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); - BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); + PCollection<BeamRecord> sourceStream = (PCollection<BeamRecord>) input.get(sourceTag); + BeamRecordCoder sourceCoder = (BeamRecordCoder) sourceStream.getCoder(); getSqlEnv().registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getSqlRecordType())); + new BeamPCollectionTable(sourceStream, + (BeamSqlRecordType) sourceCoder.getRecordType())); } } } @@ -178,7 +180,7 @@ public class BeamSql { */ @AutoValue public abstract static class SimpleQueryTransform - extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> { + extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> { private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; abstract BeamSqlEnv getSqlEnv(); abstract String getSqlQuery(); @@ -232,9 +234,9 @@ public class BeamSql { } @Override - public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { + public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) { validateQuery(); - return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input) + return PCollectionTuple.of(new TupleTag<BeamRecord>(PCOLLECTION_TABLE_NAME), input) .apply(QueryTransform.builder() .setSqlEnv(getSqlEnv()) .setSqlQuery(getSqlQuery()) http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 967dee5..a43808e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -20,9 +20,9 @@ package org.apache.beam.sdk.extensions.sql; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptUtil; @@ -43,7 +43,7 @@ public class BeamSqlCli { /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) + public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) throws Exception{ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() .as(PipelineOptions.class); // FlinkPipelineOptions.class @@ -56,9 +56,9 @@ public class BeamSqlCli { /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline, + public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline, BeamSqlEnv sqlEnv) throws Exception{ - PCollection<BeamSqlRow> resultStream = + PCollection<BeamRecord> resultStream = sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); return resultStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java index be0b0af..3c5eb36 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.calcite.DataContext; @@ -84,8 +84,8 @@ public class BeamSqlEnv implements Serializable{ } private static class BeamCalciteTable implements ScannableTable, Serializable { - private BeamSqlRowType beamSqlRowType; - public BeamCalciteTable(BeamSqlRowType beamSqlRowType) { + private BeamSqlRecordType beamSqlRowType; + public BeamCalciteTable(BeamSqlRecordType beamSqlRowType) { this.beamSqlRowType = beamSqlRowType; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index 21e02a7..fbc1fd8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -22,14 +22,13 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.BeamSql; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -54,39 +53,39 @@ class BeamSqlExample { //define the input row format List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); - BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes); - BeamSqlRow row = new BeamSqlRow(type); + BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes); + BeamRecord row = new BeamRecord(type); row.addField(0, 1); row.addField(1, "row"); row.addField(2, 1.0); //create a source PCollection with Create.of(); - PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row) - .withCoder(new BeamSqlRowCoder(type))); + PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row) + .withCoder(type.getRecordCoder())); //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; - PCollection<BeamSqlRow> outputStream = inputTable.apply( + PCollection<BeamRecord> outputStream = inputTable.apply( BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1")); //print the output record of case 1; outputStream.apply("log_result", - MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { - public Void apply(BeamSqlRow input) { + MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() { + public Void apply(BeamRecord input) { System.out.println("PCOLLECTION: " + input); return null; } })); //Case 2. run the query with BeamSql.query over result PCollection of case 1. - PCollection<BeamSqlRow> outputStream2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream) + PCollection<BeamRecord> outputStream2 = + PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream) .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1")); //print the output record of case 2; outputStream2.apply("log_result", - MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { + MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() { @Override - public Void apply(BeamSqlRow input) { + public Void apply(BeamRecord input) { System.out.println("TABLE_B: " + input); return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java index 1ae6bb3..3cd6d65 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter; import java.io.Serializable; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; /** * {@code BeamSqlExpressionExecutor} fills the gap between relational @@ -34,10 +34,10 @@ public interface BeamSqlExpressionExecutor extends Serializable { void prepare(); /** - * apply transformation to input record {@link BeamSqlRow}. + * apply transformation to input record {@link BeamRecord}. * */ - List<Object> execute(BeamSqlRow inputRow); + List<Object> execute(BeamRecord inputRow); void close(); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java index 1f9e0e3..0f77ed8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -88,7 +88,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamS import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -102,7 +102,7 @@ import org.apache.calcite.util.NlsString; /** * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression}, - * which can be evaluated against the {@link BeamSqlRow}. + * which can be evaluated against the {@link BeamRecord}. * */ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { @@ -427,7 +427,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { } @Override - public List<Object> execute(BeamSqlRow inputRow) { + public List<Object> execute(BeamRecord inputRow) { List<Object> results = new ArrayList<>(); for (BeamSqlExpression exp : exps) { results.add(exp.evaluate(inputRow).getValue()); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java index 61e8aae..af48cbe 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -48,7 +48,7 @@ public class BeamSqlCaseExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { for (int i = 0; i < operands.size() - 1; i += 2) { if (opValueEvaluated(i, inputRow)) { return BeamSqlPrimitive.of( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java index c98c10d..3786281 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.sql.Date; import java.sql.Timestamp; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.runtime.SqlFunctions; import org.apache.calcite.sql.type.SqlTypeName; import org.joda.time.format.DateTimeFormat; @@ -71,7 +71,7 @@ public class BeamSqlCastExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow) { SqlTypeName castOutputType = getOutputType(); switch (castOutputType) { case INTEGER: http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java index dc5db81..f42a365 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.io.Serializable; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; @@ -49,7 +49,7 @@ public abstract class BeamSqlExpression implements Serializable { return op(idx).getOutputType(); } - public <T> T opValueEvaluated(int idx, BeamSqlRow row) { + public <T> T opValueEvaluated(int idx, BeamRecord row) { return (T) op(idx).evaluate(row).getValue(); } @@ -59,10 +59,10 @@ public abstract class BeamSqlExpression implements Serializable { public abstract boolean accept(); /** - * Apply input record {@link BeamSqlRow} to this expression, + * Apply input record {@link BeamRecord} to this expression, * the output value is wrapped with {@link BeamSqlPrimitive}. */ - public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow); + public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow); public List<BeamSqlExpression> getOperands() { return operands; http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java index 7aba024..8c3d4d4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java index 6380af9..f763898 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java @@ -21,13 +21,13 @@ import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; /** * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. - * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}. + * It holds the value, and return it directly during {@link #evaluate(BeamRecord)}. * */ public class BeamSqlPrimitive<T> extends BeamSqlExpression { @@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow) { return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java index 243baaa..c1fa2c7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -41,7 +41,7 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression { && SqlTypeName.DATETIME_TYPES.contains(opType(0)); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { if (opType(0) == SqlTypeName.TIME) { GregorianCalendar date = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(outputType, date.getTimeInMillis()); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java index eebb97c..da706f3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -51,7 +51,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow) { if (method == null) { reConstructMethod(); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java index 0bd68df..2f4c165 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -34,7 +34,7 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date(inputRow.getWindowEnd().getMillis())); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java index b560ef8..2f3dd5c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -42,7 +42,7 @@ public class BeamSqlWindowExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, (Date) operands.get(0).evaluate(inputRow).getValue()); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java index e2c1b34..9186ec0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -35,7 +35,7 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date(inputRow.getWindowStart().getMillis())); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java index b07b28f..fd36457 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -50,7 +50,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { super(operands, outputType); } - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) { BigDecimal left = BigDecimal.valueOf( Double.valueOf(opValueEvaluated(0, inputRow).toString())); BigDecimal right = BigDecimal.valueOf( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java index 811b582..93032ae 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -51,7 +51,7 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) { Object leftValue = operands.get(0).evaluate(inputRow).getValue(); Object rightValue = operands.get(1).evaluate(inputRow).getValue(); switch (operands.get(0).getOutputType()) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java index 88dc73f..7177d96 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java @@ -21,7 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -46,7 +46,7 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) { Object leftValue = operands.get(0).evaluate(inputRow).getValue(); return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java index b626ce7..c74fcd9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java @@ -21,7 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -46,7 +46,7 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) { Object leftValue = operands.get(0).evaluate(inputRow).getValue(); return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java index d5793d5..86abe43 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java @@ -22,7 +22,7 @@ import java.util.Collections; import java.util.Date; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -38,7 +38,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression { return getOperands().size() == 0; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(outputType, new Date()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java index 99eea95..d8de464 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.TimeZone; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -44,7 +44,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression { return opCount <= 1; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault()); ret.setTime(new Date()); return BeamSqlPrimitive.of(outputType, ret); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java index 09a3c60..4736571 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java @@ -22,7 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -42,7 +42,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression { return opCount <= 1; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(outputType, new Date()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java index 55b6fcd..55767fa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java @@ -22,7 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.sql.type.SqlTypeName; @@ -41,7 +41,7 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.SYMBOL; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { Date date = opValueEvaluated(0, inputRow); long time = date.getTime(); TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java index f031c31..3310da5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java @@ -22,7 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.sql.type.SqlTypeName; @@ -41,7 +41,7 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.SYMBOL; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { Date date = opValueEvaluated(0, inputRow); long time = date.getTime(); TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java index 2740f82..47cd879 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.sql.type.SqlTypeName; @@ -61,7 +61,7 @@ public class BeamSqlExtractExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.BIGINT; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { Long time = opValueEvaluated(1, inputRow); TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java index 0c8854c..b8964d5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -32,7 +32,7 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) { boolean result = true; for (BeamSqlExpression exp : operands) { BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java index 65634b0..f9578b9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -42,7 +42,7 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression { return super.accept(); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { Boolean value = opValueEvaluated(0, inputRow); if (value == null) { return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java index da15c34..88a3916 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -32,7 +32,7 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) { boolean result = false; for (BeamSqlExpression exp : operands) { BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java index c12b725..8f6c00c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -38,7 +38,7 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression { return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1)); } - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) { BeamSqlExpression leftOp = op(0); BeamSqlExpression rightOp = op(1); return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow)); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java index 163c40e..b225b8e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -45,7 +45,7 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression { return acceptance; } - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) { BeamSqlExpression operand = op(0); return calculate(operand.evaluate(inputRow)); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java index dfaf546..676f859 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java index f2d7a47..0575978 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Random; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -42,7 +42,7 @@ public class BeamSqlRandExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive evaluate(BeamRecord inputRecord) { if (operands.size() == 1) { int rowSeed = opValueEvaluated(0, inputRecord); if (seed == null || seed != rowSeed) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java index b2e65ce..52f0cc1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Random; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -43,7 +43,7 @@ public class BeamSqlRandIntegerExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive evaluate(BeamRecord inputRecord) { int numericIdx = 0; if (operands.size() == 2) { int rowSeed = opValueEvaluated(0, inputRecord); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java index 580d747..974e2bc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -32,7 +32,7 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.INTEGER); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String str = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length()); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java index 772ad41..14ef55d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -51,7 +51,7 @@ public class BeamSqlConcatExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String left = opValueEvaluated(0, inputRow); String right = opValueEvaluated(1, inputRow); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java index dc893e7..e50872b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -32,7 +32,7 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String str = opValueEvaluated(0, inputRow); StringBuilder ret = new StringBuilder(str); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java index fd9d7aa..0f9a501 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -32,7 +32,7 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String str = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase()); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java index 8d38efb..2336876 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -54,7 +54,7 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { String str = opValueEvaluated(0, inputRow); String replaceStr = opValueEvaluated(1, inputRow); int idx = opValueEvaluated(2, inputRow);
