[BEAM-2621] BeamSqlRecordType -> BeamSqlRowType
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9c8a8a1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9c8a8a1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9c8a8a1 Branch: refs/heads/DSL_SQL Commit: a9c8a8a1e1f21d31c973e12b3c05c118e21fa43b Parents: a1f7cf6 Author: James Xu <[email protected]> Authored: Mon Jul 17 17:55:56 2017 +0800 Committer: JingsongLi <[email protected]> Committed: Wed Jul 19 10:05:17 2017 +0800 ---------------------------------------------------------------------- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 12 +++--- .../beam/dsls/sql/example/BeamSqlExample.java | 4 +- .../interpreter/BeamSqlExpressionExecutor.java | 2 +- .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 4 +- .../operator/BeamSqlCaseExpression.java | 8 ++-- .../operator/BeamSqlCastExpression.java | 22 +++++------ .../operator/BeamSqlCompareExpression.java | 6 +-- .../interpreter/operator/BeamSqlExpression.java | 2 +- .../operator/BeamSqlInputRefExpression.java | 4 +- .../operator/BeamSqlIsNotNullExpression.java | 4 +- .../operator/BeamSqlIsNullExpression.java | 4 +- .../interpreter/operator/BeamSqlPrimitive.java | 2 +- .../operator/BeamSqlReinterpretExpression.java | 6 +-- .../operator/BeamSqlUdfExpression.java | 4 +- .../operator/BeamSqlWindowEndExpression.java | 4 +- .../operator/BeamSqlWindowExpression.java | 4 +- .../operator/BeamSqlWindowStartExpression.java | 4 +- .../arithmetic/BeamSqlArithmeticExpression.java | 6 +-- .../date/BeamSqlCurrentDateExpression.java | 2 +- .../date/BeamSqlCurrentTimeExpression.java | 2 +- .../date/BeamSqlCurrentTimestampExpression.java | 2 +- .../date/BeamSqlDateCeilExpression.java | 4 +- .../date/BeamSqlDateFloorExpression.java | 4 +- .../operator/date/BeamSqlExtractExpression.java | 4 +- .../operator/logical/BeamSqlAndExpression.java | 4 +- .../operator/logical/BeamSqlNotExpression.java | 4 +- .../operator/logical/BeamSqlOrExpression.java | 4 +- .../math/BeamSqlMathBinaryExpression.java | 4 +- .../math/BeamSqlMathUnaryExpression.java | 4 +- .../operator/math/BeamSqlPiExpression.java | 2 +- .../string/BeamSqlCharLengthExpression.java | 4 +- .../string/BeamSqlConcatExpression.java | 6 +-- .../string/BeamSqlInitCapExpression.java | 4 +- .../operator/string/BeamSqlLowerExpression.java | 4 +- .../string/BeamSqlOverlayExpression.java | 10 ++--- .../string/BeamSqlPositionExpression.java | 8 ++-- .../string/BeamSqlSubstringExpression.java | 8 ++-- .../operator/string/BeamSqlTrimExpression.java | 10 ++--- .../operator/string/BeamSqlUpperExpression.java | 4 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 22 +++++------ .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 2 +- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 2 +- .../apache/beam/dsls/sql/rel/BeamJoinRel.java | 12 +++--- .../beam/dsls/sql/rel/BeamProjectRel.java | 4 +- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 2 +- .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 9 ++--- .../beam/dsls/sql/schema/BaseBeamTable.java | 10 ++--- .../dsls/sql/schema/BeamPCollectionTable.java | 8 ++-- .../beam/dsls/sql/schema/BeamSqlRecordType.java | 40 -------------------- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 10 ++--- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 6 +-- .../beam/dsls/sql/schema/BeamSqlRowType.java | 40 ++++++++++++++++++++ .../beam/dsls/sql/schema/BeamSqlTable.java | 2 +- .../beam/dsls/sql/schema/BeamTableUtils.java | 10 ++--- .../sql/schema/kafka/BeamKafkaCSVTable.java | 29 +++++++------- .../dsls/sql/schema/kafka/BeamKafkaTable.java | 11 +++--- .../dsls/sql/schema/text/BeamTextCSVTable.java | 14 +++---- .../schema/text/BeamTextCSVTableIOReader.java | 11 +++--- .../schema/text/BeamTextCSVTableIOWriter.java | 9 ++--- .../dsls/sql/schema/text/BeamTextTable.java | 6 +-- .../transform/BeamAggregationTransforms.java | 26 ++++++------- .../dsls/sql/transform/BeamJoinTransforms.java | 6 +-- .../dsls/sql/transform/BeamSqlProjectFn.java | 16 ++++---- .../beam/dsls/sql/utils/CalciteUtils.java | 12 +++--- .../dsls/sql/BeamSqlDslAggregationTest.java | 14 +++---- .../apache/beam/dsls/sql/BeamSqlDslBase.java | 26 ++++++------- .../beam/dsls/sql/BeamSqlDslJoinTest.java | 10 ++--- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 10 ++--- .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 6 +-- .../org/apache/beam/dsls/sql/TestUtils.java | 28 +++++++------- ...mSqlBuiltinFunctionsIntegrationTestBase.java | 6 +-- .../interpreter/BeamSqlFnExecutorTestBase.java | 8 ++-- .../beam/dsls/sql/mock/MockedBoundedTable.java | 14 +++---- .../apache/beam/dsls/sql/mock/MockedTable.java | 6 +-- .../dsls/sql/mock/MockedUnboundedTable.java | 14 +++---- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 6 +-- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 7 ++-- .../sql/schema/text/BeamTextCSVTableTest.java | 17 ++++----- .../transform/BeamAggregationTransformTest.java | 13 +++---- .../schema/transform/BeamTransformBaseTest.java | 12 +++--- 80 files changed, 354 insertions(+), 362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index e8c8c97..0e1ac98 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; import org.apache.beam.dsls.sql.schema.BeamSqlUdf; import org.apache.beam.dsls.sql.utils.CalciteUtils; @@ -73,7 +73,7 @@ public class BeamSqlEnv implements Serializable{ * */ public void registerTable(String tableName, BaseBeamTable table) { - schema.add(tableName, new BeamCalciteTable(table.getRecordType())); + schema.add(tableName, new BeamCalciteTable(table.getRowType())); planner.getSourceTables().put(tableName, table); } @@ -85,13 +85,13 @@ public class BeamSqlEnv implements Serializable{ } private static class BeamCalciteTable implements ScannableTable, Serializable { - private BeamSqlRecordType beamSqlRecordType; - public BeamCalciteTable(BeamSqlRecordType beamSqlRecordType) { - this.beamSqlRecordType = beamSqlRecordType; + private BeamSqlRowType beamSqlRowType; + public BeamCalciteTable(BeamSqlRowType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; } @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return CalciteUtils.toCalciteRecordType(this.beamSqlRecordType) + return CalciteUtils.toCalciteRowType(this.beamSqlRowType) .apply(BeamQueryPlanner.TYPE_FACTORY); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 04fe451..91df2be 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -21,9 +21,9 @@ import java.sql.Types; import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.BeamSql; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -51,7 +51,7 @@ class BeamSqlExample { //define the input row format List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); - BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes); + BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes); BeamSqlRow row = new BeamSqlRow(type); row.addField(0, 1); row.addField(1, "row"); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java index a314bf4..3732933 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java @@ -37,7 +37,7 @@ public interface BeamSqlExpressionExecutor extends Serializable { * apply transformation to input record {@link BeamSqlRow}. * */ - List<Object> execute(BeamSqlRow inputRecord); + List<Object> execute(BeamSqlRow inputRow); void close(); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java index 64bc880..0be918d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java @@ -427,10 +427,10 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { } @Override - public List<Object> execute(BeamSqlRow inputRecord) { + public List<Object> execute(BeamSqlRow inputRow) { List<Object> results = new ArrayList<>(); for (BeamSqlExpression exp : exps) { - results.add(exp.evaluate(inputRecord).getValue()); + results.add(exp.evaluate(inputRow).getValue()); } return results; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java index a15c42e..a30916b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java @@ -49,16 +49,16 @@ public class BeamSqlCaseExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { for (int i = 0; i < operands.size() - 1; i += 2) { - if (opValueEvaluated(i, inputRecord)) { + if (opValueEvaluated(i, inputRow)) { return BeamSqlPrimitive.of( outputType, - opValueEvaluated(i + 1, inputRecord) + opValueEvaluated(i + 1, inputRow) ); } } return BeamSqlPrimitive.of(outputType, - opValueEvaluated(operands.size() - 1, inputRecord)); + opValueEvaluated(operands.size() - 1, inputRow)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java index 7e8ab03..524d1df 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java @@ -72,40 +72,40 @@ public class BeamSqlCastExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { SqlTypeName castOutputType = getOutputType(); switch (castOutputType) { case INTEGER: return BeamSqlPrimitive - .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRecord))); + .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow))); case DOUBLE: return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRecord))); + .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow))); case SMALLINT: return BeamSqlPrimitive - .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRecord))); + .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow))); case TINYINT: return BeamSqlPrimitive - .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRecord))); + .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow))); case BIGINT: return BeamSqlPrimitive - .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRecord))); + .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow))); case DECIMAL: return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, - SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord))); + SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow))); case FLOAT: return BeamSqlPrimitive - .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRecord))); + .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow))); case CHAR: case VARCHAR: return BeamSqlPrimitive - .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRecord).toString()); + .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString()); case DATE: return BeamSqlPrimitive - .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), outputDateFormat)); + .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat)); case TIMESTAMP: return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - toTimeStamp(opValueEvaluated(index, inputRecord), outputTimestampFormat)); + toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat)); } throw new UnsupportedOperationException( String.format("Cast to type %s not supported", castOutputType)); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java index 3d96616..5076ccc 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java @@ -48,9 +48,9 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) { - Object leftValue = operands.get(0).evaluate(inputRecord).getValue(); - Object rightValue = operands.get(1).evaluate(inputRecord).getValue(); + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + Object leftValue = operands.get(0).evaluate(inputRow).getValue(); + Object rightValue = operands.get(1).evaluate(inputRow).getValue(); switch (operands.get(0).outputType) { case BIGINT: case DECIMAL: http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java index 33feb3e..9d2815c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java @@ -62,7 +62,7 @@ public abstract class BeamSqlExpression implements Serializable { * Apply input record {@link BeamSqlRow} to this expression, * the output value is wrapped with {@link BeamSqlPrimitive}. */ - public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRecord); + public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow); public List<BeamSqlExpression> getOperands() { return operands; http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java index b6d2b0b..710460b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java @@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef)); + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java index e08e737..23d9c83 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java @@ -44,8 +44,8 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) { - Object leftValue = operands.get(0).evaluate(inputRecord).getValue(); + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + Object leftValue = operands.get(0).evaluate(inputRow).getValue(); return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java index d4e070d..4d3fd45 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java @@ -44,8 +44,8 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) { - Object leftValue = operands.get(0).evaluate(inputRecord).getValue(); + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + Object leftValue = operands.get(0).evaluate(inputRow).getValue(); return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java index c5c80b9..51724bb 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java @@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) { return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java index 783466c..efdb2df 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java @@ -42,13 +42,13 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression { && SqlTypeName.DATETIME_TYPES.contains(opType(0)); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { if (opType(0) == SqlTypeName.TIME) { - GregorianCalendar date = opValueEvaluated(0, inputRecord); + GregorianCalendar date = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(outputType, date.getTimeInMillis()); } else { - Date date = opValueEvaluated(0, inputRecord); + Date date = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(outputType, date.getTime()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java index 6f18307..e389ef9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java @@ -51,14 +51,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { if (method == null) { reConstructMethod(); } try { List<Object> paras = new ArrayList<>(); for (BeamSqlExpression e : getOperands()) { - paras.add(e.evaluate(inputRecord).getValue()); + paras.add(e.evaluate(inputRow).getValue()); } return BeamSqlPrimitive.of(getOutputType(), http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java index 8bc090f..ecc6939 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java @@ -34,9 +34,9 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - new Date(inputRecord.getWindowEnd().getMillis())); + new Date(inputRow.getWindowEnd().getMillis())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java index eb4c03b..71f0672 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java @@ -42,9 +42,9 @@ public class BeamSqlWindowExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - (Date) operands.get(0).evaluate(inputRecord).getValue()); + (Date) operands.get(0).evaluate(inputRow).getValue()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java index 1e2c0a2..f3aba2e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java @@ -35,9 +35,9 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - new Date(inputRecord.getWindowStart().getMillis())); + new Date(inputRow.getWindowStart().getMillis())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java index eac4c72..d62123c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -50,11 +50,11 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { super(operands, outputType); } - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { BigDecimal left = BigDecimal.valueOf( - Double.valueOf(opValueEvaluated(0, inputRecord).toString())); + Double.valueOf(opValueEvaluated(0, inputRow).toString())); BigDecimal right = BigDecimal.valueOf( - Double.valueOf(opValueEvaluated(1, inputRecord).toString())); + Double.valueOf(opValueEvaluated(1, inputRow).toString())); BigDecimal result = calc(left, right); return getCorrectlyTypedResult(result); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java index 2f83140..c7df5ab 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java @@ -39,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression { return getOperands().size() == 0; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { return BeamSqlPrimitive.of(outputType, new Date()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java index c15123a..46e5a43 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java @@ -45,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression { return opCount <= 1; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault()); ret.setTime(new Date()); return BeamSqlPrimitive.of(outputType, ret); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java index 0ea12f1..303846d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java @@ -43,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression { return opCount <= 1; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { return BeamSqlPrimitive.of(outputType, new Date()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java index 68f1aa9..59e3e9c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java @@ -42,8 +42,8 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.SYMBOL; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - Date date = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Date date = opValueEvaluated(0, inputRow); long time = date.getTime(); TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java index 4d446e3..64234f5 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java @@ -42,8 +42,8 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.SYMBOL; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - Date date = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Date date = opValueEvaluated(0, inputRow); long time = date.getTime(); TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java index bc8ed0f..d41a249 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java @@ -61,8 +61,8 @@ public class BeamSqlExtractExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.BIGINT; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - Long time = opValueEvaluated(1, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Long time = opValueEvaluated(1, inputRow); TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java index 5da43f4..5f6abe0 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java @@ -33,10 +33,10 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { boolean result = true; for (BeamSqlExpression exp : operands) { - BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord); + BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow); result = result && expOut.getValue(); if (!result) { break; http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java index ffa0184..6df52aa 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java @@ -43,8 +43,8 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression { return super.accept(); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - Boolean value = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Boolean value = opValueEvaluated(0, inputRow); if (value == null) { return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java index 9ca57f0..450638c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java @@ -33,10 +33,10 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression { } @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) { + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { boolean result = false; for (BeamSqlExpression exp : operands) { - BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord); + BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow); result = result || expOut.getValue(); if (result) { break; http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java index f79bcf6..2d444f8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java @@ -39,10 +39,10 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression { return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1)); } - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { BeamSqlExpression leftOp = op(0); BeamSqlExpression rightOp = op(1); - return calculate(leftOp.evaluate(inputRecord), rightOp.evaluate(inputRecord)); + return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java index a65333c..4733d09 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java @@ -45,9 +45,9 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression { return acceptance; } - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { BeamSqlExpression operand = op(0); - return calculate(operand.evaluate(inputRecord)); + return calculate(operand.evaluate(inputRow)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java index 4645951..9db810e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java @@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression { return numberOfOperands() == 0; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java index 3ed9b80..7c61061 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java @@ -33,8 +33,8 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.INTEGER); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String str = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java index e8e4e50..93e1f71 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java @@ -52,9 +52,9 @@ public class BeamSqlConcatExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String left = opValueEvaluated(0, inputRecord); - String right = opValueEvaluated(1, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String left = opValueEvaluated(0, inputRow); + String right = opValueEvaluated(1, inputRow); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, new StringBuilder(left.length() + right.length()) http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java index 51dfe28..7726e27 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java @@ -33,8 +33,8 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String str = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); StringBuilder ret = new StringBuilder(str); boolean isInit = true; http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java index f70fb1a..cb198ec 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java @@ -33,8 +33,8 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String str = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java index 20d9962..cb6a523 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java @@ -55,15 +55,15 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String str = opValueEvaluated(0, inputRecord); - String replaceStr = opValueEvaluated(1, inputRecord); - int idx = opValueEvaluated(2, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); + String replaceStr = opValueEvaluated(1, inputRow); + int idx = opValueEvaluated(2, inputRow); // the index is 1 based. idx -= 1; int length = replaceStr.length(); if (operands.size() == 4) { - length = opValueEvaluated(3, inputRecord); + length = opValueEvaluated(3, inputRow); } StringBuilder result = new StringBuilder( http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java index 1d09b51..144acbf 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java @@ -57,12 +57,12 @@ public class BeamSqlPositionExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String targetStr = opValueEvaluated(0, inputRecord); - String containingStr = opValueEvaluated(1, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String targetStr = opValueEvaluated(0, inputRow); + String containingStr = opValueEvaluated(1, inputRow); int from = -1; if (operands.size() == 3) { - Number tmp = opValueEvaluated(2, inputRecord); + Number tmp = opValueEvaluated(2, inputRow); from = tmp.intValue(); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java index d9bbc98..8b33125 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java @@ -55,9 +55,9 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String str = opValueEvaluated(0, inputRecord); - int idx = opValueEvaluated(1, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); + int idx = opValueEvaluated(1, inputRow); int startIdx = idx; if (startIdx > 0) { // NOTE: SQL substring is 1 based(rather than 0 based) @@ -70,7 +70,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression { } if (operands.size() == 3) { - int length = opValueEvaluated(2, inputRecord); + int length = opValueEvaluated(2, inputRow); if (length < 0) { length = 0; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java index ac4d060..5e6c2bb 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java @@ -58,14 +58,14 @@ public class BeamSqlTrimExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { if (operands.size() == 1) { return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - opValueEvaluated(0, inputRecord).toString().trim()); + opValueEvaluated(0, inputRow).toString().trim()); } else { - SqlTrimFunction.Flag type = opValueEvaluated(0, inputRecord); - String targetStr = opValueEvaluated(1, inputRecord); - String containingStr = opValueEvaluated(2, inputRecord); + SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow); + String targetStr = opValueEvaluated(1, inputRow); + String containingStr = opValueEvaluated(2, inputRow); switch (type) { case LEADING: http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java index 8fcaca4..efa9c95 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java @@ -33,8 +33,8 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - String str = opValueEvaluated(0, inputRecord); + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 5389ec7..9dcb079 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -20,9 +20,9 @@ package org.apache.beam.dsls.sql.rel; import java.util.ArrayList; import java.util.List; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.KvCoder; @@ -105,13 +105,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { stageName + "combineBy", Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(), - CalciteUtils.toBeamRecordType(input.getRowType())))) + CalciteUtils.toBeamRowType(input.getRowType())))) .setCoder(KvCoder.of(keyCoder, aggCoder)); PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - CalciteUtils.toBeamRecordType(getRowType()), getAggCallList(), windowFieldIdx))); - mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx))); + mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); return mergedStream; } @@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { /** * Type of sub-rowrecord used as Group-By keys. */ - private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) { - BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType); + private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) { + BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType); List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (int i : groupSet.asList()) { if (i != windowFieldIdx) { - fieldNames.add(inputRecordType.getFieldsName().get(i)); - fieldTypes.add(inputRecordType.getFieldsType().get(i)); + fieldNames.add(inputRowType.getFieldsName().get(i)); + fieldTypes.add(inputRowType.getFieldsType().get(i)); } } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamSqlRowType.create(fieldNames, fieldTypes); } /** * Type of sub-rowrecord, that represents the list of aggregation fields. */ - private BeamSqlRecordType exAggFieldsSchema() { + private BeamSqlRowType exAggFieldsSchema() { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (AggregateCall ac : getAggCallList()) { @@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamSqlRowType.create(fieldNames, fieldTypes); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 07b5c7c..f802104 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -62,7 +62,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode { PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); - filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); return filterStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index b26d2b8..6754991 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -56,7 +56,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java index 3c92e42..3ebf152 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -23,9 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.transform.BeamJoinTransforms; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.Coder; @@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode { BeamSqlEnv sqlEnv) throws Exception { BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); - BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType()); + BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); @@ -119,7 +119,7 @@ public class BeamJoinRel extends Join implements BeamRelNode { names.add("c" + i); types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); } - BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types); + BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types); Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); @@ -213,7 +213,7 @@ public class BeamJoinRel extends Join implements BeamRelNode { PCollection<BeamSqlRow> ret = joinedRows .apply(stageName + "_JoinParts2WholeRow", MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); return ret; } @@ -249,13 +249,13 @@ public class BeamJoinRel extends Join implements BeamRelNode { PCollection<BeamSqlRow> ret = leftRows .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); return ret; } private BeamSqlRow buildNullRow(BeamRelNode relNode) { - BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType()); + BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); BeamSqlRow nullRow = new BeamSqlRow(leftType); for (int i = 0; i < leftType.size(); i++) { nullRow.addField(i, null); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index 2cdfc72..8f8e5ce 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -72,8 +72,8 @@ public class BeamProjectRel extends Project implements BeamRelNode { PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo .of(new BeamSqlProjectFn(getRelTypeName(), executor, - CalciteUtils.toBeamRecordType(rowType)))); - projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + CalciteUtils.toBeamRowType(rowType)))); + projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); return projectStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index 75f9717..ba344df 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -149,7 +149,7 @@ public class BeamSortRel extends Sort implements BeamRelNode { PCollection<BeamSqlRow> orderedStream = rawStream.apply( "flatten", Flatten.<BeamSqlRow>iterables()); - orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); + orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); return orderedStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index 030d2c8..43b74c3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -19,13 +19,12 @@ package org.apache.beam.dsls.sql.rel; import com.google.common.collect.ImmutableList; - import java.util.ArrayList; import java.util.List; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.schema.BeamTableUtils; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.Create; @@ -65,9 +64,9 @@ public class BeamValuesRel extends Values implements BeamRelNode { throw new IllegalStateException("Values with empty tuples!"); } - BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(this.getRowType()); + BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); for (ImmutableList<RexLiteral> tuple : tuples) { - BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); + BeamSqlRow row = new BeamSqlRow(beamSQLRowType); for (int i = 0; i < tuple.size(); i++) { BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); } @@ -75,6 +74,6 @@ public class BeamValuesRel extends Values implements BeamRelNode { } return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) - .setCoder(new BeamSqlRowCoder(beamSQLRecordType)); + .setCoder(new BeamSqlRowCoder(beamSQLRowType)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java index 6d49bcc..dfa2785 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -23,12 +23,12 @@ import java.io.Serializable; * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. */ public abstract class BaseBeamTable implements BeamSqlTable, Serializable { - protected BeamSqlRecordType beamSqlRecordType; - public BaseBeamTable(BeamSqlRecordType beamSqlRecordType) { - this.beamSqlRecordType = beamSqlRecordType; + protected BeamSqlRowType beamSqlRowType; + public BaseBeamTable(BeamSqlRowType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; } - @Override public BeamSqlRecordType getRecordType() { - return beamSqlRecordType; + @Override public BeamSqlRowType getRowType() { + return beamSqlRowType; } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java index 8309097..5b63780 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java @@ -31,13 +31,13 @@ public class BeamPCollectionTable extends BaseBeamTable { private BeamIOType ioType; private transient PCollection<BeamSqlRow> upstream; - protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); + protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); } public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, - BeamSqlRecordType beamSqlRecordType){ - this(beamSqlRecordType); + BeamSqlRowType beamSqlRowType){ + this(beamSqlRowType); ioType = upstream.isBounded().equals(IsBounded.BOUNDED) ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; this.upstream = upstream; http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java deleted file mode 100644 index 52bd652..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import java.util.List; - -/** - * Field type information in {@link BeamSqlRow}. - * - */ -@AutoValue -public abstract class BeamSqlRecordType implements Serializable { - public abstract List<String> getFieldsName(); - public abstract List<Integer> getFieldsType(); - - public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) { - return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes); - } - - public int size() { - return getFieldsName().size(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index 5c0dbc0..d789446 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -61,12 +61,12 @@ public class BeamSqlRow implements Serializable { private List<Integer> nullFields = new ArrayList<>(); private List<Object> dataValues; - private BeamSqlRecordType dataType; + private BeamSqlRowType dataType; private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); - public BeamSqlRow(BeamSqlRecordType dataType) { + public BeamSqlRow(BeamSqlRowType dataType) { this.dataType = dataType; this.dataValues = new ArrayList<>(); for (int idx = 0; idx < dataType.size(); ++idx) { @@ -75,7 +75,7 @@ public class BeamSqlRow implements Serializable { } } - public BeamSqlRow(BeamSqlRecordType dataType, List<Object> dataValues) { + public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) { this(dataType); for (int idx = 0; idx < dataValues.size(); ++idx) { addField(idx, dataValues.get(idx)); @@ -237,11 +237,11 @@ public class BeamSqlRow implements Serializable { this.dataValues = dataValues; } - public BeamSqlRecordType getDataType() { + public BeamSqlRowType getDataType() { return dataType; } - public void setDataType(BeamSqlRecordType dataType) { + public void setDataType(BeamSqlRowType dataType) { this.dataType = dataType; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index c798b35..f14864a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -40,7 +40,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; * A {@link Coder} encodes {@link BeamSqlRow}. */ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { - private BeamSqlRecordType tableSchema; + private BeamSqlRowType tableSchema; private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); @@ -52,7 +52,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); private static final ByteCoder byteCoder = ByteCoder.of(); - public BeamSqlRowCoder(BeamSqlRecordType tableSchema) { + public BeamSqlRowCoder(BeamSqlRowType tableSchema) { this.tableSchema = tableSchema; } @@ -174,7 +174,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { return record; } - public BeamSqlRecordType getTableSchema() { + public BeamSqlRowType getTableSchema() { return tableSchema; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java new file mode 100644 index 0000000..1129bdd --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; + +/** + * Field type information in {@link BeamSqlRow}. + * + */ +@AutoValue +public abstract class BeamSqlRowType implements Serializable { + public abstract List<String> getFieldsName(); + public abstract List<Integer> getFieldsType(); + + public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) { + return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes); + } + + public int size() { + return getFieldsName().size(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java index 986decb..d419473 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java @@ -48,5 +48,5 @@ public interface BeamSqlTable { /** * Get the schema info of the table. */ - BeamSqlRecordType getRecordType(); + BeamSqlRowType getRowType(); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java index 7157793..4b7e76b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java @@ -37,19 +37,19 @@ public final class BeamTableUtils { public static BeamSqlRow csvLine2BeamSqlRow( CSVFormat csvFormat, String line, - BeamSqlRecordType beamSqlRecordType) { - BeamSqlRow row = new BeamSqlRow(beamSqlRecordType); + BeamSqlRowType beamSqlRowType) { + BeamSqlRow row = new BeamSqlRow(beamSqlRowType); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0); - if (rawRecord.size() != beamSqlRecordType.size()) { + if (rawRecord.size() != beamSqlRowType.size()) { throw new IllegalArgumentException(String.format( "Expect %d fields, but actually %d", - beamSqlRecordType.size(), rawRecord.size() + beamSqlRowType.size(), rawRecord.size() )); } else { - for (int idx = 0; idx < beamSqlRecordType.size(); idx++) { + for (int idx = 0; idx < beamSqlRowType.size(); idx++) { String raw = rawRecord.get(idx); addFieldWithAutoTypeCasting(row, idx, raw); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java index 39cf8d8..a18f3de 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -21,9 +21,8 @@ import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; import java.util.List; - -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -37,27 +36,27 @@ import org.apache.commons.csv.CSVFormat; */ public class BeamKafkaCSVTable extends BeamKafkaTable { private CSVFormat csvFormat; - public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers, + public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, List<String> topics) { - this(beamSqlRecordType, bootstrapServers, topics, CSVFormat.DEFAULT); + this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); } - public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers, + public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, List<String> topics, CSVFormat format) { - super(beamSqlRecordType, bootstrapServers, topics); + super(beamSqlRowType, bootstrapServers, topics); this.csvFormat = format; } @Override public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> getPTransformForInput() { - return new CsvRecorderDecoder(beamSqlRecordType, csvFormat); + return new CsvRecorderDecoder(beamSqlRowType, csvFormat); } @Override public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() { - return new CsvRecorderEncoder(beamSqlRecordType, csvFormat); + return new CsvRecorderEncoder(beamSqlRowType, csvFormat); } /** @@ -66,10 +65,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { */ public static class CsvRecorderDecoder extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> { - private BeamSqlRecordType recordType; + private BeamSqlRowType rowType; private CSVFormat format; - public CsvRecorderDecoder(BeamSqlRecordType recordType, CSVFormat format) { - this.recordType = recordType; + public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) { + this.rowType = rowType; this.format = format; } @@ -79,7 +78,7 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { @ProcessElement public void processElement(ProcessContext c) { String rowInString = new String(c.element().getValue()); - c.output(csvLine2BeamSqlRow(format, rowInString, recordType)); + c.output(csvLine2BeamSqlRow(format, rowInString, rowType)); } })); } @@ -91,10 +90,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { */ public static class CsvRecorderEncoder extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> { - private BeamSqlRecordType recordType; + private BeamSqlRowType rowType; private CSVFormat format; - public CsvRecorderEncoder(BeamSqlRecordType recordType, CSVFormat format) { - this.recordType = recordType; + public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) { + this.rowType = rowType; this.format = format; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index f27014e..faa2706 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -22,11 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.Serializable; import java.util.List; import java.util.Map; - import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -49,13 +48,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab private List<String> topics; private Map<String, Object> configUpdates; - protected BeamKafkaTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); + protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); } - public BeamKafkaTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers, + public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, List<String> topics) { - super(beamSqlRecordType); + super(beamSqlRowType); this.bootstrapServers = bootstrapServers; this.topics = topics; }
