Repository: beam
Updated Branches:
  refs/heads/DSL_SQL cf95571d9 -> f7ee8d33e


Fix inconsistent mapping for SQL FLOAT


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd70852f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd70852f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd70852f

Branch: refs/heads/DSL_SQL
Commit: dd70852f6fa811a410938e5baa51cb8e602c931b
Parents: cf95571
Author: James Xu <[email protected]>
Authored: Thu May 4 11:44:14 2017 +0800
Committer: Davor Bonaci <[email protected]>
Committed: Sat May 6 19:23:39 2017 -0700

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |  9 +-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   | 93 ++++++++++----------
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    | 63 +++++++++++++
 3 files changed, 117 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dd70852f/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index e2f09be..6139ada 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -24,7 +24,7 @@
     <artifactId>beam-dsls-parent</artifactId>
     <version>0.7.0-SNAPSHOT</version>
   </parent>
-  
+
   <artifactId>beam-dsls-sql</artifactId>
   <name>Apache Beam :: DSLs :: SQL</name>
   <description>Beam SQL provides a new interface to generate a Beam pipeline 
from SQL statement</description>
@@ -36,7 +36,7 @@
     <maven.build.timestamp.format>yyyy-MM-dd 
HH:mm</maven.build.timestamp.format>
     <calcite-version>1.11.0</calcite-version>
   </properties>
-  
+
   <build>
     <resources>
       <resource>
@@ -199,5 +199,10 @@
       <artifactId>calcite-linq4j</artifactId>
       <version>${calcite-version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/dd70852f/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 22ffaad..9b2474a 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -63,28 +63,29 @@ public class BeamSqlRowCoder extends 
StandardCoder<BeamSQLRow>{
       }
 
       switch (value.getDataType().getFieldsType().get(idx)) {
-      case INTEGER:
-        intCoder.encode(value.getInteger(idx), outStream, context.nested());
-        break;
-      case SMALLINT:
-      case TINYINT:
-        intCoder.encode((int) value.getShort(idx), outStream, 
context.nested());
-        break;
-      case DOUBLE:
-        doubleCoder.encode(value.getDouble(idx), outStream, context.nested());
-        break;
-      case FLOAT:
-        doubleCoder.encode((double) value.getFloat(idx), outStream, 
context.nested());
-        break;
-      case BIGINT:
-        longCoder.encode(value.getLong(idx), outStream, context.nested());
-        break;
-      case VARCHAR:
-        stringCoder.encode(value.getString(idx), outStream, context.nested());
-        break;
-
-      default:
-        throw new 
UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
+        case INTEGER:
+          intCoder.encode(value.getInteger(idx), outStream, context.nested());
+          break;
+        case SMALLINT:
+        case TINYINT:
+          intCoder.encode((int) value.getShort(idx), outStream, 
context.nested());
+          break;
+        case DOUBLE:
+          doubleCoder.encode(value.getDouble(idx), outStream, 
context.nested());
+          break;
+        case FLOAT:
+          doubleCoder.encode(Double.parseDouble(
+              String.valueOf(value.getFloat(idx))), outStream, 
context.nested());
+          break;
+        case BIGINT:
+          longCoder.encode(value.getLong(idx), outStream, context.nested());
+          break;
+        case VARCHAR:
+          stringCoder.encode(value.getString(idx), outStream, 
context.nested());
+          break;
+
+        default:
+          throw new 
UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
       }
     }
     //add a dummy field to indicate the end of record
@@ -106,30 +107,30 @@ public class BeamSqlRowCoder extends 
StandardCoder<BeamSQLRow>{
       }
 
       switch (type.getFieldsType().get(idx)) {
-      case INTEGER:
-        record.addField(idx, intCoder.decode(inStream, context.nested()));
-        break;
-      case SMALLINT:
-        record.addField(idx, intCoder.decode(inStream, 
context.nested()).shortValue());
-        break;
-      case TINYINT:
-        record.addField(idx, intCoder.decode(inStream, 
context.nested()).byteValue());
-        break;
-      case DOUBLE:
-        record.addField(idx, doubleCoder.decode(inStream, context.nested()));
-        break;
-      case FLOAT:
-        record.addField(idx, doubleCoder.decode(inStream, 
context.nested()).floatValue());
-        break;
-      case BIGINT:
-        record.addField(idx, longCoder.decode(inStream, context.nested()));
-        break;
-      case VARCHAR:
-        record.addField(idx, stringCoder.decode(inStream, context.nested()));
-        break;
-
-      default:
-        throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
+        case INTEGER:
+          record.addField(idx, intCoder.decode(inStream, context.nested()));
+          break;
+        case SMALLINT:
+          record.addField(idx, intCoder.decode(inStream, 
context.nested()).shortValue());
+          break;
+        case TINYINT:
+          record.addField(idx, intCoder.decode(inStream, 
context.nested()).byteValue());
+          break;
+        case DOUBLE:
+          record.addField(idx, doubleCoder.decode(inStream, context.nested()));
+          break;
+        case FLOAT:
+          record.addField(idx, doubleCoder.decode(inStream, 
context.nested()).floatValue());
+          break;
+        case BIGINT:
+          record.addField(idx, longCoder.decode(inStream, context.nested()));
+          break;
+        case VARCHAR:
+          record.addField(idx, stringCoder.decode(inStream, context.nested()));
+          break;
+
+        default:
+          throw new 
UnsupportedDataTypeException(type.getFieldsType().get(idx));
       }
     }
     intCoder.decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/dd70852f/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
new file mode 100644
index 0000000..f207794
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for BeamSqlRowCoder.
+ */
+public class BeamSqlRowCoderTest {
+
+  @Test
+  public void encodeAndDecode() throws Exception {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+            .add("id", SqlTypeName.INTEGER)
+            .add("order_id", SqlTypeName.BIGINT)
+            .add("price", SqlTypeName.FLOAT)
+            .add("amount", SqlTypeName.DOUBLE)
+            .add("user_name", SqlTypeName.VARCHAR)
+            .build();
+      }
+    };
+
+    BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from(
+        protoRowType.apply(new JavaTypeFactoryImpl(
+        RelDataTypeSystem.DEFAULT)));
+    BeamSQLRow row = new BeamSQLRow(beamSQLRecordType);
+    row.addField(0, 1);
+    row.addField(1, 1L);
+    row.addField(2, 1.1F);
+    row.addField(3, 1.1);
+    row.addField(4, "hello");
+
+    BeamSqlRowCoder coder = BeamSqlRowCoder.of();
+    CoderProperties.coderDecodeEncodeEqual(coder, row);
+  }
+}

Reply via email to