Repository: beam Updated Branches: refs/heads/DSL_SQL abe0f1a0a -> dcd769c8a
[BEAM-2443] apply AutoValue to BeamSqlRecordType Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20453733 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20453733 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20453733 Branch: refs/heads/DSL_SQL Commit: 20453733ea8679e9fe421950a69921ace80dd381 Parents: abe0f1a Author: James Xu <[email protected]> Authored: Fri Jun 16 14:31:55 2017 +0800 Committer: Tyler Akidau <[email protected]> Committed: Fri Jun 16 14:19:02 2017 -0700 ---------------------------------------------------------------------- dsls/sql/pom.xml | 5 +++ .../beam/dsls/sql/example/BeamSqlExample.java | 9 ++--- .../beam/dsls/sql/rel/BeamAggregationRel.java | 20 ++++++----- .../beam/dsls/sql/schema/BeamSqlRecordType.java | 38 +++++--------------- .../transform/BeamAggregationTransforms.java | 22 +++++++----- .../beam/dsls/sql/utils/CalciteUtils.java | 11 +++--- 6 files changed, 51 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index e70c88c..d866313 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -190,5 +190,10 @@ <version>0.10.1.0</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 6bb1617..31f8302 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -18,6 +18,8 @@ package org.apache.beam.dsls.sql.example; import java.sql.Types; +import java.util.Arrays; +import java.util.List; import org.apache.beam.dsls.sql.BeamSql; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -47,10 +49,9 @@ class BeamSqlExample { Pipeline p = Pipeline.create(options); //define the input row format - BeamSqlRecordType type = new BeamSqlRecordType(); - type.addField("c1", Types.INTEGER); - type.addField("c2", Types.VARCHAR); - type.addField("c3", Types.DOUBLE); + List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); + List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); + BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes); BeamSqlRow row = new BeamSqlRow(type); row.addField(0, 1); row.addField(1, "row"); http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 595563d..bcdc44f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -17,8 +17,8 @@ */ package org.apache.beam.dsls.sql.rel; +import java.util.ArrayList; import java.util.List; - import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -125,25 +125,29 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { */ private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) { BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType); - BeamSqlRecordType typeOfKey = new BeamSqlRecordType(); + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); for (int i : groupSet.asList()) { if (i != windowFieldIdx) { - typeOfKey.addField(inputRecordType.getFieldsName().get(i), - inputRecordType.getFieldsType().get(i)); + fieldNames.add(inputRecordType.getFieldsName().get(i)); + fieldTypes.add(inputRecordType.getFieldsType().get(i)); } } - return typeOfKey; + return BeamSqlRecordType.create(fieldNames, fieldTypes); } /** * Type of sub-rowrecord, that represents the list of aggregation fields. */ private BeamSqlRecordType exAggFieldsSchema() { - BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType(); + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); for (AggregateCall ac : getAggCallList()) { - typeOfAggFields.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName())); + fieldNames.add(ac.name); + fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); } - return typeOfAggFields; + + return BeamSqlRecordType.create(fieldNames, fieldTypes); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java index 08ba39f..9fc3945 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java @@ -17,46 +17,24 @@ */ package org.apache.beam.dsls.sql.schema; +import com.google.auto.value.AutoValue; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; /** * Field type information in {@link BeamSqlRow}. * */ -public class BeamSqlRecordType implements Serializable { - private List<String> fieldsName = new ArrayList<>(); - private List<Integer> fieldsType = new ArrayList<>(); +@AutoValue +public abstract class BeamSqlRecordType implements Serializable { + public abstract List<String> getFieldsName(); + public abstract List<Integer> getFieldsType(); - public void addField(String fieldName, Integer fieldType) { - fieldsName.add(fieldName); - fieldsType.add(fieldType); + public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) { + return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes); } public int size() { - return fieldsName.size(); + return getFieldsName().size(); } - - public List<String> getFieldsName() { - return fieldsName; - } - - public void setFieldsName(List<String> fieldsName) { - this.fieldsName = fieldsName; - } - - public List<Integer> getFieldsType() { - return fieldsType; - } - - public void setFieldsType(List<Integer> fieldsType) { - this.fieldsType = fieldsType; - } - - @Override - public String toString() { - return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]"; - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java index e804b94..83d473a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java @@ -106,13 +106,14 @@ public class BeamAggregationTransforms implements Serializable{ } private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) { - BeamSqlRecordType typeOfKey = new BeamSqlRecordType(); + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); for (int idx : groupByKeys) { - typeOfKey.addField(dataType.getFieldsName().get(idx), dataType.getFieldsType().get(idx)); + fieldNames.add(dataType.getFieldsName().get(idx)); + fieldTypes.add(dataType.getFieldsType().get(idx)); } - return typeOfKey; + return BeamSqlRecordType.create(fieldNames, fieldTypes); } - } /** @@ -152,19 +153,21 @@ public class BeamAggregationTransforms implements Serializable{ public AggregationCombineFn(List<AggregateCall> aggregationCalls, BeamSqlRecordType sourceRowRecordType) { - this.aggDataType = new BeamSqlRecordType(); this.aggFunctions = new ArrayList<>(); this.aggElementExpressions = new ArrayList<>(); boolean hasAvg = false; boolean hasCount = false; int countIndex = -1; + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); for (int idx = 0; idx < aggregationCalls.size(); ++idx) { AggregateCall ac = aggregationCalls.get(idx); //verify it's supported. verifySupportedAggregation(ac); - aggDataType.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName())); + fieldNames.add(ac.name); + fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); SqlAggFunction aggFn = ac.getAggregation(); switch (aggFn.getName()) { @@ -190,10 +193,12 @@ public class BeamAggregationTransforms implements Serializable{ } aggFunctions.add(aggFn.getName()); } + + // add a COUNT holder if only have AVG if (hasAvg && !hasCount) { - aggDataType.addField("__COUNT", - CalciteUtils.toJavaType(SqlTypeName.BIGINT)); + fieldNames.add("__COUNT"); + fieldTypes.add(CalciteUtils.toJavaType(SqlTypeName.BIGINT)); aggFunctions.add("COUNT"); aggElementExpressions.add(BeamSqlPrimitive.<Long>of(SqlTypeName.BIGINT, 1L)); @@ -202,6 +207,7 @@ public class BeamAggregationTransforms implements Serializable{ countIndex = aggDataType.size() - 1; } + this.aggDataType = BeamSqlRecordType.create(fieldNames, fieldTypes); this.countIndex = countIndex; } http://git-wip-us.apache.org/repos/asf/beam/blob/20453733/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java index 46b4911..69ca44b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -19,7 +19,9 @@ package org.apache.beam.dsls.sql.utils; import java.sql.Types; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.calcite.rel.type.RelDataType; @@ -82,12 +84,13 @@ public class CalciteUtils { * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table. */ public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) { - BeamSqlRecordType record = new BeamSqlRecordType(); + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); for (RelDataTypeField f : tableInfo.getFieldList()) { - record.getFieldsName().add(f.getName()); - record.getFieldsType().add(toJavaType(f.getType().getSqlTypeName())); + fieldNames.add(f.getName()); + fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); } - return record; + return BeamSqlRecordType.create(fieldNames, fieldTypes); } /**
