Repository: beam Updated Branches: refs/heads/DSL_SQL db982cfe1 -> dedabff1f
[BEAM-2310] Support encoding/decoding of TIME and new DECIMAL data type Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23a037e8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23a037e8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23a037e8 Branch: refs/heads/DSL_SQL Commit: 23a037e8fe98aeab176d3aceea94df02536f185c Parents: db982cf Author: James Xu <[email protected]> Authored: Wed May 17 22:48:00 2017 +0800 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed May 24 11:51:45 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 268 ++++++++++--------- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 23 +- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 37 ++- 3 files changed, 193 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java index bc75eb1..ca045c8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -18,6 +18,7 @@ package org.apache.beam.dsls.sql.schema; import java.io.Serializable; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.GregorianCalendar; @@ -84,63 +85,69 @@ public class BeamSQLRow implements Serializable { SqlTypeName fieldType = dataType.getFieldsType().get(index); switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case VARCHAR: - case CHAR: - if (!(fieldValue instanceof String)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - default: - throw new UnsupportedDataTypeException(fieldType); + case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TINYINT: + if (!(fieldValue instanceof Byte)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DECIMAL: + if (!(fieldValue instanceof BigDecimal)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case VARCHAR: + case CHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIME: + if (!(fieldValue instanceof GregorianCalendar)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + default: + throw new UnsupportedDataTypeException(fieldType); } dataValues.set(index, fieldValue); } @@ -177,6 +184,14 @@ public class BeamSQLRow implements Serializable { return (Date) getFieldValue(idx); } + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + public Object getFieldValue(String fieldName) { return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); } @@ -190,72 +205,79 @@ public class BeamSQLRow implements Serializable { SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case VARCHAR: - case CHAR: - if (!(fieldValue instanceof String)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - default: - throw new UnsupportedDataTypeException(fieldType); + case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TINYINT: + if (!(fieldValue instanceof Byte)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case DECIMAL: + if (!(fieldValue instanceof BigDecimal)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case VARCHAR: + case CHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TIME: + if (!(fieldValue instanceof GregorianCalendar)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + default: + throw new UnsupportedDataTypeException(fieldType); } } http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index bfcb487..0bfe467 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -21,7 +21,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Date; +import java.util.GregorianCalendar; import java.util.List; + +import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -46,6 +49,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); private static final DoubleCoder doubleCoder = DoubleCoder.of(); private static final InstantCoder instantCoder = InstantCoder.of(); + private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder(); private BeamSqlRowCoder(){} @@ -81,6 +85,9 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ case FLOAT: doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested()); break; + case DECIMAL: + bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested()); + break; case BIGINT: longCoder.encode(value.getLong(idx), outStream, context.nested()); break; @@ -88,8 +95,12 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ case CHAR: stringCoder.encode(value.getString(idx), outStream, context.nested()); break; + case TIME: + longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), + outStream, context.nested()); + break; case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream, context); + longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested()); break; default: @@ -134,12 +145,20 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ case BIGINT: record.addField(idx, longCoder.decode(inStream, context.nested())); break; + case DECIMAL: + record.addField(idx, bigDecimalCoder.decode(inStream, context.nested())); + break; case VARCHAR: case CHAR: record.addField(idx, stringCoder.decode(inStream, context.nested())); break; + case TIME: + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date(longCoder.decode(inStream, context.nested()))); + record.addField(idx, calendar); + break; case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream, context))); + record.addField(idx, new Date(longCoder.decode(inStream, context.nested()))); break; default: http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index f207794..bc6343b 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -18,6 +18,10 @@ package org.apache.beam.dsls.sql.schema; +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; + import org.apache.beam.sdk.testing.CoderProperties; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; @@ -38,11 +42,16 @@ public class BeamSqlRowCoderTest { @Override public RelDataType apply(RelDataTypeFactory a0) { return a0.builder() - .add("id", SqlTypeName.INTEGER) - .add("order_id", SqlTypeName.BIGINT) - .add("price", SqlTypeName.FLOAT) - .add("amount", SqlTypeName.DOUBLE) - .add("user_name", SqlTypeName.VARCHAR) + .add("col_tinyint", SqlTypeName.TINYINT) + .add("col_smallint", SqlTypeName.SMALLINT) + .add("col_integer", SqlTypeName.INTEGER) + .add("col_bigint", SqlTypeName.BIGINT) + .add("col_float", SqlTypeName.FLOAT) + .add("col_double", SqlTypeName.DOUBLE) + .add("col_decimal", SqlTypeName.DECIMAL) + .add("col_string_varchar", SqlTypeName.VARCHAR) + .add("col_time", SqlTypeName.TIME) + .add("col_timestamp", SqlTypeName.TIMESTAMP) .build(); } }; @@ -51,11 +60,19 @@ public class BeamSqlRowCoderTest { protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); - row.addField(0, 1); - row.addField(1, 1L); - row.addField(2, 1.1F); - row.addField(3, 1.1); - row.addField(4, "hello"); + row.addField("col_tinyint", Byte.valueOf("1")); + row.addField("col_smallint", Short.valueOf("1")); + row.addField("col_integer", 1); + row.addField("col_bigint", 1L); + row.addField("col_float", 1.1F); + row.addField("col_double", 1.1); + row.addField("col_decimal", BigDecimal.ZERO); + row.addField("col_string_varchar", "hello"); + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date()); + row.addField("col_time", calendar); + row.addField("col_timestamp", new Date()); + BeamSqlRowCoder coder = BeamSqlRowCoder.of(); CoderProperties.coderDecodeEncodeEqual(coder, row);
