[BEAM-2745] add BeamRecordSqlType.getFieldTypeByIndex()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c76129a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c76129a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c76129a Branch: refs/heads/DSL_SQL Commit: 7c76129a02c19595b257cff6efdc1fd0e637815d Parents: 6628674 Author: James Xu <xumingmi...@gmail.com> Authored: Tue Aug 8 14:53:18 2017 +0800 Committer: Tyler Akidau <taki...@apache.org> Committed: Wed Aug 9 09:34:10 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/values/BeamRecord.java | 21 ++- .../apache/beam/sdk/values/BeamRecordType.java | 25 +-- .../apache/beam/sdk/extensions/sql/BeamSql.java | 4 +- .../beam/sdk/extensions/sql/BeamSqlEnv.java | 6 +- .../extensions/sql/example/BeamSqlExample.java | 4 +- .../sql/impl/rel/BeamAggregationRel.java | 16 +- .../extensions/sql/impl/rel/BeamJoinRel.java | 10 +- .../extensions/sql/impl/rel/BeamValuesRel.java | 6 +- .../transform/BeamAggregationTransforms.java | 25 +-- .../sql/impl/transform/BeamJoinTransforms.java | 22 +-- .../sql/impl/transform/BeamSqlProjectFn.java | 8 +- .../extensions/sql/impl/utils/CalciteUtils.java | 16 +- .../extensions/sql/schema/BaseBeamTable.java | 6 +- .../sql/schema/BeamPCollectionTable.java | 4 +- .../sql/schema/BeamRecordSqlType.java | 185 +++++++++++++++++++ .../sql/schema/BeamSqlRecordHelper.java | 4 +- .../sql/schema/BeamSqlRecordType.java | 175 ------------------ .../sdk/extensions/sql/schema/BeamSqlTable.java | 2 +- .../extensions/sql/schema/BeamTableUtils.java | 14 +- .../sql/schema/kafka/BeamKafkaCSVTable.java | 14 +- .../sql/schema/kafka/BeamKafkaTable.java | 6 +- .../sql/schema/text/BeamTextCSVTable.java | 6 +- .../schema/text/BeamTextCSVTableIOReader.java | 6 +- .../schema/text/BeamTextCSVTableIOWriter.java | 6 +- .../sql/schema/text/BeamTextTable.java | 4 +- .../sql/BeamSqlDslAggregationTest.java | 14 +- .../beam/sdk/extensions/sql/BeamSqlDslBase.java | 6 +- .../sdk/extensions/sql/BeamSqlDslJoinTest.java | 10 +- .../extensions/sql/BeamSqlDslProjectTest.java | 10 +- .../extensions/sql/BeamSqlDslUdfUdafTest.java | 6 +- .../beam/sdk/extensions/sql/TestUtils.java | 14 +- .../interpreter/BeamSqlFnExecutorTestBase.java | 4 +- ...mSqlBuiltinFunctionsIntegrationTestBase.java | 6 +- ...amSqlComparisonOperatorsIntegrationTest.java | 4 +- .../extensions/sql/mock/MockedBoundedTable.java | 6 +- .../sdk/extensions/sql/mock/MockedTable.java | 4 +- .../sql/mock/MockedUnboundedTable.java | 4 +- .../sql/schema/BeamSqlRowCoderTest.java | 2 +- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 4 +- .../sql/schema/text/BeamTextCSVTableTest.java | 4 +- .../transform/BeamAggregationTransformTest.java | 10 +- .../schema/transform/BeamTransformBaseTest.java | 8 +- 42 files changed, 368 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index 6e4bd4c..a3ede3c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -37,7 +37,20 @@ public class BeamRecord implements Serializable { private List<Object> dataValues; private BeamRecordType dataType; - public BeamRecord(BeamRecordType dataType, List<Object> rawdataValues) { + /** + * Creates a BeamRecord. + * @param dataType type of the record + * @param rawDataValues values of the record, record's size must match size of + * the {@code BeamRecordType}, or can be null, if it is null + * then every field is null. + */ + public BeamRecord(BeamRecordType dataType, List<Object> rawDataValues) { + if (dataType.getFieldNames().size() != rawDataValues.size()) { + throw new IllegalArgumentException( + "Field count in BeamRecordType(" + dataType.getFieldNames().size() + + ") and rawDataValues(" + rawDataValues.size() + ") must match!"); + } + this.dataType = dataType; this.dataValues = new ArrayList<>(dataType.size()); @@ -46,7 +59,7 @@ public class BeamRecord implements Serializable { } for (int idx = 0; idx < dataType.size(); ++idx) { - addField(idx, rawdataValues.get(idx)); + addField(idx, rawDataValues.get(idx)); } } @@ -60,7 +73,7 @@ public class BeamRecord implements Serializable { } public Object getFieldValue(String fieldName) { - return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + return getFieldValue(dataType.getFieldNames().indexOf(fieldName)); } public Byte getByte(String fieldName) { @@ -179,7 +192,7 @@ public class BeamRecord implements Serializable { StringBuilder sb = new StringBuilder(); for (int idx = 0; idx < size(); ++idx) { sb.append( - String.format(",%s=%s", getDataType().getFieldsName().get(idx), getFieldValue(idx))); + String.format(",%s=%s", getDataType().getFieldNames().get(idx), getFieldValue(idx))); } return sb.substring(1); } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java index 3b20b50..6ab783c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.values; +import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; @@ -28,12 +29,12 @@ import org.apache.beam.sdk.coders.Coder; */ @Experimental public class BeamRecordType implements Serializable{ - private List<String> fieldsName; - private List<Coder> fieldsCoder; + private List<String> fieldNames; + private List<Coder> fieldCoders; - public BeamRecordType(List<String> fieldsName, List<Coder> fieldsCoder) { - this.fieldsName = fieldsName; - this.fieldsCoder = fieldsCoder; + public BeamRecordType(List<String> fieldNames, List<Coder> fieldCoders) { + this.fieldNames = fieldNames; + this.fieldCoders = fieldCoders; } /** @@ -49,22 +50,22 @@ public class BeamRecordType implements Serializable{ * Get the coder for {@link BeamRecordCoder}. */ public BeamRecordCoder getRecordCoder(){ - return BeamRecordCoder.of(this, fieldsCoder); + return BeamRecordCoder.of(this, fieldCoders); } - public List<String> getFieldsName(){ - return fieldsName; + public List<String> getFieldNames(){ + return ImmutableList.copyOf(fieldNames); } - public String getFieldByIndex(int index){ - return fieldsName.get(index); + public String getFieldNameByIndex(int index){ + return fieldNames.get(index); } public int findIndexOfField(String fieldName){ - return fieldsName.indexOf(fieldName); + return fieldNames.indexOf(fieldName); } public int size(){ - return fieldsName.size(); + return fieldNames.size(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index d0a6360..ac617ad 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.transforms.PTransform; @@ -179,7 +179,7 @@ public class BeamSql { getSqlEnv().registerTable(sourceTag.getId(), new BeamPCollectionTable(sourceStream, - (BeamSqlRecordType) sourceCoder.getRecordType())); + (BeamRecordSqlType) sourceCoder.getRecordType())); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java index 3c5eb36..4d21425 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.calcite.DataContext; @@ -84,8 +84,8 @@ public class BeamSqlEnv implements Serializable{ } private static class BeamCalciteTable implements ScannableTable, Serializable { - private BeamSqlRecordType beamSqlRowType; - public BeamCalciteTable(BeamSqlRecordType beamSqlRowType) { + private BeamRecordSqlType beamSqlRowType; + public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) { this.beamSqlRowType = beamSqlRowType; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index acb5943..3a46acc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -22,7 +22,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.BeamSql; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; @@ -53,7 +53,7 @@ class BeamSqlExample { //define the input row format List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); - BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes); + BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes); BeamRecord row = new BeamRecord(type, 1, "row", 1.0); //create a source PCollection with Create.of(); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index d91b484..4b557f9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; @@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { /** * Type of sub-rowrecord used as Group-By keys. */ - private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) { - BeamSqlRecordType inputRowType = CalciteUtils.toBeamRowType(relDataType); + private BeamRecordSqlType exKeyFieldsSchema(RelDataType relDataType) { + BeamRecordSqlType inputRowType = CalciteUtils.toBeamRowType(relDataType); List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (int i : groupSet.asList()) { if (i != windowFieldIdx) { - fieldNames.add(inputRowType.getFieldsName().get(i)); - fieldTypes.add(inputRowType.getFieldsType().get(i)); + fieldNames.add(inputRowType.getFieldNameByIndex(i)); + fieldTypes.add(inputRowType.getFieldTypeByIndex(i)); } } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamRecordSqlType.create(fieldNames, fieldTypes); } /** * Type of sub-rowrecord, that represents the list of aggregation fields. */ - private BeamSqlRecordType exAggFieldsSchema() { + private BeamRecordSqlType exAggFieldsSchema() { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (AggregateCall ac : getAggCallList()) { @@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamRecordSqlType.create(fieldNames, fieldTypes); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 2bd15b3..9dceb25 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode { BeamSqlEnv sqlEnv) throws Exception { BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); - BeamSqlRecordType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); + BeamRecordSqlType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); PCollection<BeamRecord> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); @@ -117,9 +117,9 @@ public class BeamJoinRel extends Join implements BeamRelNode { List<Integer> types = new ArrayList<>(pairs.size()); for (int i = 0; i < pairs.size(); i++) { names.add("c" + i); - types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); + types.add(leftRowType.getFieldTypeByIndex(pairs.get(i).getKey())); } - BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types); + BeamRecordSqlType extractKeyRowType = BeamRecordSqlType.create(names, types); Coder extractKeyRowCoder = extractKeyRowType.getRecordCoder(); @@ -255,7 +255,7 @@ public class BeamJoinRel extends Join implements BeamRelNode { } private BeamRecord buildNullRow(BeamRelNode relNode) { - BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); + BeamRecordSqlType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null)); } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index 1d666ca..fde002e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.BeamRecord; @@ -63,12 +63,12 @@ public class BeamValuesRel extends Values implements BeamRelNode { throw new IllegalStateException("Values with empty tuples!"); } - BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); + BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); for (ImmutableList<RexLiteral> tuple : tuples) { List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size()); for (int i = 0; i < tuple.size(); i++) { fieldsValue.add(BeamTableUtils.autoCastField( - beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue())); + beamSQLRowType.getFieldTypeByIndex(i), tuple.get(i).getValue())); } rows.add(new BeamRecord(beamSQLRowType, fieldsValue)); } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java index c6a5d26..0f90bee 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java @@ -35,8 +35,8 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; @@ -59,11 +59,11 @@ public class BeamAggregationTransforms implements Serializable{ * Merge KV to single record. */ public static class MergeAggregationRecord extends DoFn<KV<BeamRecord, BeamRecord>, BeamRecord> { - private BeamSqlRecordType outRowType; + private BeamRecordSqlType outRowType; private List<String> aggFieldNames; private int windowStartFieldIdx; - public MergeAggregationRecord(BeamSqlRecordType outRowType, List<AggregateCall> aggList + public MergeAggregationRecord(BeamRecordSqlType outRowType, List<AggregateCall> aggList , int windowStartFieldIdx) { this.outRowType = outRowType; this.aggFieldNames = new ArrayList<>(); @@ -75,10 +75,11 @@ public class BeamAggregationTransforms implements Serializable{ @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - List<Object> fieldValues = new ArrayList<>(); KV<BeamRecord, BeamRecord> kvRecord = c.element(); + List<Object> fieldValues = new ArrayList<>(); fieldValues.addAll(kvRecord.getKey().getDataValues()); fieldValues.addAll(kvRecord.getValue().getDataValues()); + if (windowStartFieldIdx != -1) { fieldValues.add(windowStartFieldIdx, ((IntervalWindow) window).start().toDate()); } @@ -106,7 +107,7 @@ public class BeamAggregationTransforms implements Serializable{ @Override public BeamRecord apply(BeamRecord input) { - BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input)); + BeamRecordSqlType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input)); List<Object> fieldValues = new ArrayList<>(groupByKeys.size()); for (int idx = 0; idx < groupByKeys.size(); ++idx) { @@ -117,14 +118,14 @@ public class BeamAggregationTransforms implements Serializable{ return keyOfRecord; } - private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) { + private BeamRecordSqlType exTypeOfKeyRecord(BeamRecordSqlType dataType) { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (int idx : groupByKeys) { - fieldNames.add(dataType.getFieldsName().get(idx)); - fieldTypes.add(dataType.getFieldsType().get(idx)); + fieldNames.add(dataType.getFieldNameByIndex(idx)); + fieldTypes.add(dataType.getFieldTypeByIndex(idx)); } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamRecordSqlType.create(fieldNames, fieldTypes); } } @@ -152,10 +153,10 @@ public class BeamAggregationTransforms implements Serializable{ extends CombineFn<BeamRecord, AggregationAccumulator, BeamRecord> { private List<BeamSqlUdaf> aggregators; private List<BeamSqlExpression> sourceFieldExps; - private BeamSqlRecordType finalRowType; + private BeamRecordSqlType finalRowType; public AggregationAdaptor(List<AggregateCall> aggregationCalls, - BeamSqlRecordType sourceRowType) { + BeamRecordSqlType sourceRowType) { aggregators = new ArrayList<>(); sourceFieldExps = new ArrayList<>(); List<String> outFieldsName = new ArrayList<>(); @@ -204,7 +205,7 @@ public class BeamAggregationTransforms implements Serializable{ break; } } - finalRowType = BeamSqlRecordType.create(outFieldsName, outFieldsType); + finalRowType = BeamRecordSqlType.create(outFieldsName, outFieldsType); } @Override public AggregationAccumulator createAccumulator() { http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index 8f34704..9a48c53 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -22,8 +22,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.BeamRecord; @@ -58,12 +58,12 @@ public class BeamJoinTransforms { for (int i = 0; i < joinColumns.size(); i++) { names.add("c" + i); types.add(isLeft - ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType() - .get(joinColumns.get(i).getKey()) - : BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType() - .get(joinColumns.get(i).getValue())); + ? BeamSqlRecordHelper.getSqlRecordType(input).getFieldTypeByIndex( + joinColumns.get(i).getKey()) + : BeamSqlRecordHelper.getSqlRecordType(input).getFieldTypeByIndex( + joinColumns.get(i).getValue())); } - BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + BeamRecordSqlType type = BeamRecordSqlType.create(names, types); // build the row List<Object> fieldValues = new ArrayList<>(joinColumns.size()); @@ -146,13 +146,13 @@ public class BeamJoinTransforms { BeamRecord rightRow) { // build the type List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); - names.addAll(leftRow.getDataType().getFieldsName()); - names.addAll(rightRow.getDataType().getFieldsName()); + names.addAll(leftRow.getDataType().getFieldNames()); + names.addAll(rightRow.getDataType().getFieldNames()); List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); - types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType()); - types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType()); - BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldTypes()); + types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldTypes()); + BeamRecordSqlType type = BeamRecordSqlType.create(names, types); List<Object> fieldValues = new ArrayList<>(leftRow.getDataValues()); fieldValues.addAll(rightRow.getDataValues()); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java index 34d6dbb..aac38c7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -35,10 +35,10 @@ import org.apache.beam.sdk.values.BeamRecord; public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> { private String stepName; private BeamSqlExpressionExecutor executor; - private BeamSqlRecordType outputRowType; + private BeamRecordSqlType outputRowType; public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor, - BeamSqlRecordType outputRowType) { + BeamRecordSqlType outputRowType) { super(); this.stepName = stepName; this.executor = executor; @@ -57,7 +57,7 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> { List<Object> fieldsValue = new ArrayList<>(results.size()); for (int idx = 0; idx < results.size(); ++idx) { fieldsValue.add( - BeamTableUtils.autoCastField(outputRowType.getFieldsType().get(idx), results.get(idx))); + BeamTableUtils.autoCastField(outputRowType.getFieldTypeByIndex(idx), results.get(idx))); } BeamRecord outRow = new BeamRecord(outputRowType, fieldsValue); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index bf96e85..8b6206b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; @@ -78,33 +78,33 @@ public class CalciteUtils { /** * Get the {@code SqlTypeName} for the specified column of a table. */ - public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { - return toCalciteType(schema.getFieldsType().get(index)); + public static SqlTypeName getFieldType(BeamRecordSqlType schema, int index) { + return toCalciteType(schema.getFieldTypeByIndex(index)); } /** * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. */ - public static BeamSqlRecordType toBeamRowType(RelDataType tableInfo) { + public static BeamRecordSqlType toBeamRowType(RelDataType tableInfo) { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (RelDataTypeField f : tableInfo.getFieldList()) { fieldNames.add(f.getName()); fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamRecordSqlType.create(fieldNames, fieldTypes); } /** * Create an instance of {@code RelDataType} so it can be used to create a table. */ - public static RelProtoDataType toCalciteRowType(final BeamSqlRecordType that) { + public static RelProtoDataType toCalciteRowType(final BeamRecordSqlType that) { return new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a) { RelDataTypeFactory.FieldInfoBuilder builder = a.builder(); - for (int idx = 0; idx < that.getFieldsName().size(); ++idx) { - builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx))); + for (int idx = 0; idx < that.getFieldNames().size(); ++idx) { + builder.add(that.getFieldNameByIndex(idx), toCalciteType(that.getFieldTypeByIndex(idx))); } return builder.build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java index 68b120e..0564820 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java @@ -23,12 +23,12 @@ import java.io.Serializable; * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. */ public abstract class BaseBeamTable implements BeamSqlTable, Serializable { - protected BeamSqlRecordType beamSqlRowType; - public BaseBeamTable(BeamSqlRecordType beamSqlRowType) { + protected BeamRecordSqlType beamSqlRowType; + public BaseBeamTable(BeamRecordSqlType beamSqlRowType) { this.beamSqlRowType = beamSqlRowType; } - @Override public BeamSqlRecordType getRowType() { + @Override public BeamRecordSqlType getRowType() { return beamSqlRowType; } } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java index 68905b5..9d9988e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java @@ -32,12 +32,12 @@ public class BeamPCollectionTable extends BaseBeamTable { private BeamIOType ioType; private transient PCollection<BeamRecord> upstream; - protected BeamPCollectionTable(BeamSqlRecordType beamSqlRowType) { + protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) { super(beamSqlRowType); } public BeamPCollectionTable(PCollection<BeamRecord> upstream, - BeamSqlRecordType beamSqlRowType){ + BeamRecordSqlType beamSqlRowType){ this(beamSqlRowType); ioType = upstream.isBounded().equals(IsBounded.BOUNDED) ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java new file mode 100644 index 0000000..1845988 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.BeamRecordType; + +/** + * Type provider for {@link BeamRecord} with SQL types. + * + * <p>Limited SQL types are supported now, visit + * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> + * for more details. + * + */ +public class BeamRecordSqlType extends BeamRecordType { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + + public List<Integer> fieldTypes; + + protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) { + super(fieldsName, fieldsCoder); + } + + private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes + , List<Coder> fieldsCoder) { + super(fieldsName, fieldsCoder); + this.fieldTypes = fieldTypes; + } + + public static BeamRecordSqlType create(List<String> fieldNames, + List<Integer> fieldTypes) { + if (fieldNames.size() != fieldTypes.size()) { + throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match."); + } + List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size()); + for (int idx = 0; idx < fieldTypes.size(); ++idx) { + switch (fieldTypes.get(idx)) { + case Types.INTEGER: + fieldCoders.add(BigEndianIntegerCoder.of()); + break; + case Types.SMALLINT: + fieldCoders.add(ShortCoder.of()); + break; + case Types.TINYINT: + fieldCoders.add(ByteCoder.of()); + break; + case Types.DOUBLE: + fieldCoders.add(DoubleCoder.of()); + break; + case Types.FLOAT: + fieldCoders.add(FloatCoder.of()); + break; + case Types.DECIMAL: + fieldCoders.add(BigDecimalCoder.of()); + break; + case Types.BIGINT: + fieldCoders.add(BigEndianLongCoder.of()); + break; + case Types.VARCHAR: + case Types.CHAR: + fieldCoders.add(StringUtf8Coder.of()); + break; + case Types.TIME: + fieldCoders.add(TimeCoder.of()); + break; + case Types.DATE: + case Types.TIMESTAMP: + fieldCoders.add(DateCoder.of()); + break; + case Types.BOOLEAN: + fieldCoders.add(BooleanCoder.of()); + break; + + default: + throw new UnsupportedOperationException( + "Data type: " + fieldTypes.get(idx) + " not supported yet!"); + } + } + return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders); + } + + @Override + public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { + if (null == fieldValue) {// no need to do type check for NULL value + return; + } + + int fieldType = fieldTypes.get(index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); + if (javaClazz == null) { + throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } + } + + public List<Integer> getFieldTypes() { + return fieldTypes; + } + + public Integer getFieldTypeByIndex(int index){ + return fieldTypes.get(index); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof BeamRecordSqlType) { + BeamRecordSqlType ins = (BeamRecordSqlType) obj; + return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode(); + } + + @Override + public String toString() { + return "BeamRecordSqlType [fieldNames=" + getFieldNames() + + ", fieldTypes=" + fieldTypes + "]"; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java index b910c84..89eefd1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java @@ -39,8 +39,8 @@ import org.apache.beam.sdk.values.BeamRecord; @Experimental public class BeamSqlRecordHelper { - public static BeamSqlRecordType getSqlRecordType(BeamRecord record) { - return (BeamSqlRecordType) record.getDataType(); + public static BeamRecordSqlType getSqlRecordType(BeamRecord record) { + return (BeamRecordSqlType) record.getDataType(); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java deleted file mode 100644 index b7c7438..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.math.BigDecimal; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.ByteCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.BeamRecordType; - -/** - * Type provider for {@link BeamRecord} with SQL types. - * - * <p>Limited SQL types are supported now, visit - * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> - * for more details. - * - */ -public class BeamSqlRecordType extends BeamRecordType { - private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); - static { - SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); - } - - public List<Integer> fieldsType; - - protected BeamSqlRecordType(List<String> fieldsName, List<Coder> fieldsCoder) { - super(fieldsName, fieldsCoder); - } - - private BeamSqlRecordType(List<String> fieldsName, List<Integer> fieldsType - , List<Coder> fieldsCoder) { - super(fieldsName, fieldsCoder); - this.fieldsType = fieldsType; - } - - public static BeamSqlRecordType create(List<String> fieldNames, - List<Integer> fieldTypes) { - if (fieldNames.size() != fieldTypes.size()) { - throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match."); - } - List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size()); - for (int idx = 0; idx < fieldTypes.size(); ++idx) { - switch (fieldTypes.get(idx)) { - case Types.INTEGER: - fieldCoders.add(BigEndianIntegerCoder.of()); - break; - case Types.SMALLINT: - fieldCoders.add(ShortCoder.of()); - break; - case Types.TINYINT: - fieldCoders.add(ByteCoder.of()); - break; - case Types.DOUBLE: - fieldCoders.add(DoubleCoder.of()); - break; - case Types.FLOAT: - fieldCoders.add(FloatCoder.of()); - break; - case Types.DECIMAL: - fieldCoders.add(BigDecimalCoder.of()); - break; - case Types.BIGINT: - fieldCoders.add(BigEndianLongCoder.of()); - break; - case Types.VARCHAR: - case Types.CHAR: - fieldCoders.add(StringUtf8Coder.of()); - break; - case Types.TIME: - fieldCoders.add(TimeCoder.of()); - break; - case Types.DATE: - case Types.TIMESTAMP: - fieldCoders.add(DateCoder.of()); - break; - case Types.BOOLEAN: - fieldCoders.add(BooleanCoder.of()); - break; - - default: - throw new UnsupportedOperationException( - "Data type: " + fieldTypes.get(idx) + " not supported yet!"); - } - } - return new BeamSqlRecordType(fieldNames, fieldTypes, fieldCoders); - } - - @Override - public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { - if (null == fieldValue) {// no need to do type check for NULL value - return; - } - - int fieldType = fieldsType.get(index); - Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); - if (javaClazz == null) { - throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); - } - - if (!fieldValue.getClass().equals(javaClazz)) { - throw new IllegalArgumentException( - String.format("[%s](%s) doesn't match type [%s]", - fieldValue, fieldValue.getClass(), fieldType) - ); - } - } - - public List<Integer> getFieldsType() { - return fieldsType; - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof BeamSqlRecordType) { - BeamSqlRecordType ins = (BeamSqlRecordType) obj; - return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName()); - } else { - return false; - } - } - - @Override - public int hashCode() { - return 31 * getFieldsName().hashCode() + getFieldsType().hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java index b370d9d..828ac43 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java @@ -49,5 +49,5 @@ public interface BeamSqlTable { /** * Get the schema info of the table. */ - BeamSqlRecordType getRowType(); + BeamRecordSqlType getRowType(); } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java index 19d3e39..99f9522 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java @@ -40,27 +40,27 @@ public final class BeamTableUtils { public static BeamRecord csvLine2BeamSqlRow( CSVFormat csvFormat, String line, - BeamSqlRecordType beamSqlRowType) { - List<Object> fieldsValue = new ArrayList<>(beamSqlRowType.size()); + BeamRecordSqlType beamRecordSqlType) { + List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.size()); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0); - if (rawRecord.size() != beamSqlRowType.size()) { + if (rawRecord.size() != beamRecordSqlType.size()) { throw new IllegalArgumentException(String.format( "Expect %d fields, but actually %d", - beamSqlRowType.size(), rawRecord.size() + beamRecordSqlType.size(), rawRecord.size() )); } else { - for (int idx = 0; idx < beamSqlRowType.size(); idx++) { + for (int idx = 0; idx < beamRecordSqlType.size(); idx++) { String raw = rawRecord.get(idx); - fieldsValue.add(autoCastField(beamSqlRowType.getFieldsType().get(idx), raw)); + fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw)); } } } catch (IOException e) { throw new IllegalArgumentException("decodeRecord failed!", e); } - return new BeamRecord(beamSqlRowType, fieldsValue); + return new BeamRecord(beamRecordSqlType, fieldsValue); } public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java index f137379..8c7e6f0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.schema.kafka; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -34,12 +34,12 @@ import org.apache.commons.csv.CSVFormat; */ public class BeamKafkaCSVTable extends BeamKafkaTable { private CSVFormat csvFormat; - public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers, + public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, List<String> topics) { this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); } - public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers, + public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, List<String> topics, CSVFormat format) { super(beamSqlRowType, bootstrapServers, topics); this.csvFormat = format; @@ -63,9 +63,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { */ public static class CsvRecorderDecoder extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> { - private BeamSqlRecordType rowType; + private BeamRecordSqlType rowType; private CSVFormat format; - public CsvRecorderDecoder(BeamSqlRecordType rowType, CSVFormat format) { + public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) { this.rowType = rowType; this.format = format; } @@ -88,9 +88,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { */ public static class CsvRecorderEncoder extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> { - private BeamSqlRecordType rowType; + private BeamRecordSqlType rowType; private CSVFormat format; - public CsvRecorderEncoder(BeamSqlRecordType rowType, CSVFormat format) { + public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) { this.rowType = rowType; this.format = format; } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java index fac57bf..1d57839 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.BeamRecord; @@ -48,11 +48,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab private List<String> topics; private Map<String, Object> configUpdates; - protected BeamKafkaTable(BeamSqlRecordType beamSqlRowType) { + protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) { super(beamSqlRowType); } - public BeamKafkaTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers, + public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, List<String> topics) { super(beamSqlRowType); this.bootstrapServers = bootstrapServers; http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java index 0ec418c..79e56e6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.BeamRecord; @@ -46,11 +46,11 @@ public class BeamTextCSVTable extends BeamTextTable { /** * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. */ - public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern) { + public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern) { this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); } - public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern, + public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern, CSVFormat csvFormat) { super(beamSqlRowType, filePattern); this.csvFormat = csvFormat; http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java index ecb77e0..018dae5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -35,10 +35,10 @@ public class BeamTextCSVTableIOReader extends PTransform<PCollection<String>, PCollection<BeamRecord>> implements Serializable { private String filePattern; - protected BeamSqlRecordType beamSqlRowType; + protected BeamRecordSqlType beamSqlRowType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRowType, String filePattern, + public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; this.beamSqlRowType = beamSqlRowType; http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java index c616973..53eb382 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; @@ -36,10 +36,10 @@ import org.apache.commons.csv.CSVFormat; public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone> implements Serializable { private String filePattern; - protected BeamSqlRecordType beamSqlRowType; + protected BeamRecordSqlType beamSqlRowType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRowType, String filePattern, + public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; this.beamSqlRowType = beamSqlRowType; http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java index 4284366..80e81aa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text; import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; /** * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). @@ -29,7 +29,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; public abstract class BeamTextTable extends BaseBeamTable implements Serializable { protected String filePattern; - protected BeamTextTable(BeamSqlRecordType beamSqlRowType, String filePattern) { + protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) { super(beamSqlRowType); this.filePattern = filePattern; } http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 19ca398..4e74dbb 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; @@ -54,7 +54,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamRecord> result = input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamRecord record = new BeamRecord(resultType, 0, 4L); @@ -95,7 +95,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testAggregationFunctions", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", "max5", "min5", "max6", "min6"), @@ -141,7 +141,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamRecord> result = input.apply("testDistinct", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamRecord record1 = new BeamRecord(resultType, 1, 1000L); @@ -179,7 +179,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testTumbleWindow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); @@ -215,7 +215,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamRecord> result = input.apply("testHopWindow", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); @@ -254,7 +254,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testSessionWindow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamRecordSqlType resultType = BeamRecordSqlType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index 02427ae..ef75ee2 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -25,7 +25,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; @@ -52,7 +52,7 @@ public class BeamSqlDslBase { @Rule public ExpectedException exceptions = ExpectedException.none(); - public static BeamSqlRecordType rowTypeInTableA; + public static BeamRecordSqlType rowTypeInTableA; public static List<BeamRecord> recordsInTableA; //bounded PCollections @@ -65,7 +65,7 @@ public class BeamSqlDslBase { @BeforeClass public static void prepareClass() throws ParseException { - rowTypeInTableA = BeamSqlRecordType.create( + rowTypeInTableA = BeamRecordSqlType.create( Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", "f_timestamp", "f_int2", "f_decimal"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index d5d0a24..0876dd9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -24,7 +24,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo import java.sql.Types; import java.util.Arrays; import org.apache.beam.sdk.coders.BeamRecordCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.BeamRecord; @@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlRecordType SOURCE_RECORD_TYPE = - BeamSqlRecordType.create( + private static final BeamRecordSqlType SOURCE_RECORD_TYPE = + BeamRecordSqlType.create( Arrays.asList( "order_id", "site_id", "price" ), @@ -53,8 +53,8 @@ public class BeamSqlDslJoinTest { private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder(); - private static final BeamSqlRecordType RESULT_RECORD_TYPE = - BeamSqlRecordType.create( + private static final BeamRecordSqlType RESULT_RECORD_TYPE = + BeamRecordSqlType.create( Arrays.asList( "order_id", "site_id", "price", "order_id0", "site_id0", "price0" ), http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java index c8041a8..46aea99 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; @@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testPartialFields", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamRecord record = new BeamRecord(resultType @@ -115,7 +115,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamRecord record1 = new BeamRecord(resultType @@ -158,7 +158,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testPartialFieldsInRows", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamRecord record1 = new BeamRecord(resultType @@ -201,7 +201,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input) .apply("testLiteralField", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("literal_field"), Arrays.asList(Types.INTEGER)); BeamRecord record = new BeamRecord(resultType, 1); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 25e76e9..7302376 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; import java.util.Iterator; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.testing.PAssert; @@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { */ @Test public void testUdaf() throws Exception { - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "squaresum"), Arrays.asList(Types.INTEGER, Types.INTEGER)); BeamRecord record = new BeamRecord(resultType, 0, 30); @@ -67,7 +67,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { */ @Test public void testUdf() throws Exception{ - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"), + BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "cubicvalue"), Arrays.asList(Types.INTEGER, Types.INTEGER)); BeamRecord record = new BeamRecord(resultType, 2, 8); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index e9dc88f..aa1fc29 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.BeamRecord; @@ -69,7 +69,7 @@ public class TestUtils { * {@code} */ public static class RowsBuilder { - private BeamSqlRecordType type; + private BeamRecordSqlType type; private List<BeamRecord> rows = new ArrayList<>(); /** @@ -86,7 +86,7 @@ public class TestUtils { * @args pairs of column type and column names. */ public static RowsBuilder of(final Object... args) { - BeamSqlRecordType beamSQLRowType = buildBeamSqlRowType(args); + BeamRecordSqlType beamSQLRowType = buildBeamSqlRowType(args); RowsBuilder builder = new RowsBuilder(); builder.type = beamSQLRowType; @@ -103,7 +103,7 @@ public class TestUtils { * )}</pre> * @beamSQLRowType the record type. */ - public static RowsBuilder of(final BeamSqlRecordType beamSQLRowType) { + public static RowsBuilder of(final BeamRecordSqlType beamSQLRowType) { RowsBuilder builder = new RowsBuilder(); builder.type = beamSQLRowType; @@ -153,7 +153,7 @@ public class TestUtils { * ) * }</pre> */ - public static BeamSqlRecordType buildBeamSqlRowType(Object... args) { + public static BeamRecordSqlType buildBeamSqlRowType(Object... args) { List<Integer> types = new ArrayList<>(); List<String> names = new ArrayList<>(); @@ -162,7 +162,7 @@ public class TestUtils { names.add((String) args[i + 1]); } - return BeamSqlRecordType.create(names, types); + return BeamRecordSqlType.create(names, types); } /** @@ -179,7 +179,7 @@ public class TestUtils { * ) * }</pre> */ - public static List<BeamRecord> buildRows(BeamSqlRecordType type, List args) { + public static List<BeamRecord> buildRows(BeamRecordSqlType type, List args) { List<BeamRecord> rows = new ArrayList<>(); int fieldCount = type.size(); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java index 86e2ca4..97905c5 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; @@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase { RelDataTypeSystem.DEFAULT); public static RelDataType relDataType; - public static BeamSqlRecordType beamRowType; + public static BeamRecordSqlType beamRowType; public static BeamRecord record; public static RelBuilder relBuilder; http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index b58a17f..5898e2e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -32,7 +32,7 @@ import java.util.TimeZone; import org.apache.beam.sdk.extensions.sql.BeamSql; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.BeamRecord; @@ -62,7 +62,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { public final TestPipeline pipeline = TestPipeline.create(); protected PCollection<BeamRecord> getTestPCollection() { - BeamSqlRecordType type = BeamSqlRecordType.create( + BeamRecordSqlType type = BeamRecordSqlType.create( Arrays.asList("ts", "c_tinyint", "c_smallint", "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), @@ -155,7 +155,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder - .of(BeamSqlRecordType.create(names, types)) + .of(BeamRecordSqlType.create(names, types)) .addRows(values) .getRows() ); http://git-wip-us.apache.org/repos/asf/beam/blob/7c76129a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java index 3569e31..4ce2f45 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java @@ -22,7 +22,7 @@ import java.math.BigDecimal; import java.sql.Types; import java.util.Arrays; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.Test; @@ -282,7 +282,7 @@ public class BeamSqlComparisonOperatorsIntegrationTest } @Override protected PCollection<BeamRecord> getTestPCollection() { - BeamSqlRecordType type = BeamSqlRecordType.create( + BeamRecordSqlType type = BeamRecordSqlType.create( Arrays.asList( "c_tinyint_0", "c_tinyint_1", "c_tinyint_2", "c_smallint_0", "c_smallint_1", "c_smallint_2",