Repository: beam Updated Branches: refs/heads/DSL_SQL cf95571d9 -> f7ee8d33e
Fix inconsistent mapping for SQL FLOAT Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd70852f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd70852f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd70852f Branch: refs/heads/DSL_SQL Commit: dd70852f6fa811a410938e5baa51cb8e602c931b Parents: cf95571 Author: James Xu <[email protected]> Authored: Thu May 4 11:44:14 2017 +0800 Committer: Davor Bonaci <[email protected]> Committed: Sat May 6 19:23:39 2017 -0700 ---------------------------------------------------------------------- dsls/sql/pom.xml | 9 +- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 93 ++++++++++---------- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 63 +++++++++++++ 3 files changed, 117 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dd70852f/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index e2f09be..6139ada 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -24,7 +24,7 @@ <artifactId>beam-dsls-parent</artifactId> <version>0.7.0-SNAPSHOT</version> </parent> - + <artifactId>beam-dsls-sql</artifactId> <name>Apache Beam :: DSLs :: SQL</name> <description>Beam SQL provides a new interface to generate a Beam pipeline from SQL statement</description> @@ -36,7 +36,7 @@ <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format> <calcite-version>1.11.0</calcite-version> </properties> - + <build> <resources> <resource> @@ -199,5 +199,10 @@ <artifactId>calcite-linq4j</artifactId> <version>${calcite-version}</version> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/dd70852f/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 22ffaad..9b2474a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -63,28 +63,29 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ } switch (value.getDataType().getFieldsType().get(idx)) { - case INTEGER: - intCoder.encode(value.getInteger(idx), outStream, context.nested()); - break; - case SMALLINT: - case TINYINT: - intCoder.encode((int) value.getShort(idx), outStream, context.nested()); - break; - case DOUBLE: - doubleCoder.encode(value.getDouble(idx), outStream, context.nested()); - break; - case FLOAT: - doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested()); - break; - case BIGINT: - longCoder.encode(value.getLong(idx), outStream, context.nested()); - break; - case VARCHAR: - stringCoder.encode(value.getString(idx), outStream, context.nested()); - break; - - default: - throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx)); + case INTEGER: + intCoder.encode(value.getInteger(idx), outStream, context.nested()); + break; + case SMALLINT: + case TINYINT: + intCoder.encode((int) value.getShort(idx), outStream, context.nested()); + break; + case DOUBLE: + doubleCoder.encode(value.getDouble(idx), outStream, context.nested()); + break; + case FLOAT: + doubleCoder.encode(Double.parseDouble( + String.valueOf(value.getFloat(idx))), outStream, context.nested()); + break; + case BIGINT: + longCoder.encode(value.getLong(idx), outStream, context.nested()); + break; + case VARCHAR: + stringCoder.encode(value.getString(idx), outStream, context.nested()); + break; + + default: + throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx)); } } //add a dummy field to indicate the end of record @@ -106,30 +107,30 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ } switch (type.getFieldsType().get(idx)) { - case INTEGER: - record.addField(idx, intCoder.decode(inStream, context.nested())); - break; - case SMALLINT: - record.addField(idx, intCoder.decode(inStream, context.nested()).shortValue()); - break; - case TINYINT: - record.addField(idx, intCoder.decode(inStream, context.nested()).byteValue()); - break; - case DOUBLE: - record.addField(idx, doubleCoder.decode(inStream, context.nested())); - break; - case FLOAT: - record.addField(idx, doubleCoder.decode(inStream, context.nested()).floatValue()); - break; - case BIGINT: - record.addField(idx, longCoder.decode(inStream, context.nested())); - break; - case VARCHAR: - record.addField(idx, stringCoder.decode(inStream, context.nested())); - break; - - default: - throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); + case INTEGER: + record.addField(idx, intCoder.decode(inStream, context.nested())); + break; + case SMALLINT: + record.addField(idx, intCoder.decode(inStream, context.nested()).shortValue()); + break; + case TINYINT: + record.addField(idx, intCoder.decode(inStream, context.nested()).byteValue()); + break; + case DOUBLE: + record.addField(idx, doubleCoder.decode(inStream, context.nested())); + break; + case FLOAT: + record.addField(idx, doubleCoder.decode(inStream, context.nested()).floatValue()); + break; + case BIGINT: + record.addField(idx, longCoder.decode(inStream, context.nested())); + break; + case VARCHAR: + record.addField(idx, stringCoder.decode(inStream, context.nested())); + break; + + default: + throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); } } intCoder.decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/dd70852f/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java new file mode 100644 index 0000000..f207794 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema; + +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Tests for BeamSqlRowCoder. + */ +public class BeamSqlRowCoderTest { + + @Test + public void encodeAndDecode() throws Exception { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("id", SqlTypeName.INTEGER) + .add("order_id", SqlTypeName.BIGINT) + .add("price", SqlTypeName.FLOAT) + .add("amount", SqlTypeName.DOUBLE) + .add("user_name", SqlTypeName.VARCHAR) + .build(); + } + }; + + BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from( + protoRowType.apply(new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT))); + BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); + row.addField(0, 1); + row.addField(1, 1L); + row.addField(2, 1.1F); + row.addField(3, 1.1); + row.addField(4, "hello"); + + BeamSqlRowCoder coder = BeamSqlRowCoder.of(); + CoderProperties.coderDecodeEncodeEqual(coder, row); + } +}
