Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 10962a34d -> 8f922f74b


move BeamRecord to sdk/core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52933a64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52933a64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52933a64

Branch: refs/heads/DSL_SQL
Commit: 52933a640393a107eddbd3d88670507b03595e1f
Parents: 10962a3
Author: mingmxu <[email protected]>
Authored: Wed Aug 2 01:20:50 2017 -0700
Committer: mingmxu <[email protected]>
Committed: Wed Aug 2 23:52:29 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java |  84 ++++++
 .../org/apache/beam/sdk/values/BeamRecord.java  | 279 ++++++++++++++++++
 .../beam/sdk/values/BeamRecordTypeProvider.java |  59 ++++
 .../apache/beam/sdk/extensions/sql/BeamSql.java |   2 +-
 .../beam/sdk/extensions/sql/BeamSqlCli.java     |   4 +-
 .../sdk/extensions/sql/schema/BeamSqlRow.java   | 293 +------------------
 .../extensions/sql/schema/BeamSqlRowCoder.java  |  79 ++---
 .../extensions/sql/schema/BeamSqlRowType.java   |  91 +++++-
 8 files changed, 555 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
new file mode 100644
index 0000000..ad27f4e
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordTypeProvider;
+
+/**
+ *  A {@link Coder} for {@link BeamRecord}. It wraps the {@link Coder} for 
each element directly.
+ */
+@Experimental
+public class BeamRecordCoder extends CustomCoder<BeamRecord> {
+  private static final ListCoder<Integer> nullListCoder = 
ListCoder.of(BigEndianIntegerCoder.of());
+  private static final InstantCoder instantCoder = InstantCoder.of();
+
+  private BeamRecordTypeProvider recordType;
+  private List<Coder> coderArray;
+
+  public BeamRecordCoder(BeamRecordTypeProvider recordType, List<Coder> 
coderArray) {
+    this.recordType = recordType;
+    this.coderArray = coderArray;
+  }
+
+  @Override
+  public void encode(BeamRecord value, OutputStream outStream)
+      throws CoderException, IOException {
+    nullListCoder.encode(value.getNullFields(), outStream);
+    for (int idx = 0; idx < value.size(); ++idx) {
+      if (value.getNullFields().contains(idx)) {
+        continue;
+      }
+
+      coderArray.get(idx).encode(value.getInteger(idx), outStream);
+    }
+
+    instantCoder.encode(value.getWindowStart(), outStream);
+    instantCoder.encode(value.getWindowEnd(), outStream);
+  }
+
+  @Override
+  public BeamRecord decode(InputStream inStream) throws CoderException, 
IOException {
+    List<Integer> nullFields = nullListCoder.decode(inStream);
+
+    BeamRecord record = new BeamRecord(recordType);
+    record.setNullFields(nullFields);
+    for (int idx = 0; idx < recordType.size(); ++idx) {
+      if (nullFields.contains(idx)) {
+        continue;
+      }
+
+      record.addField(idx, coderArray.get(idx).decode(inStream));
+    }
+
+    record.setWindowStart(instantCoder.decode(inStream));
+    record.setWindowEnd(instantCoder.decode(inStream));
+
+    return record;
+  }
+
+  @Override
+  public void verifyDeterministic()
+      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
new file mode 100644
index 0000000..d1c1c17
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.joda.time.Instant;
+
+/**
+ * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
+ * {@link BeamRecordTypeProvider}, represents one element in a
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ */
+@Experimental
+public class BeamRecord implements Serializable {
+  //null values are indexed here, to handle properly in Coder.
+  private List<Integer> nullFields = new ArrayList<>();
+  private List<Object> dataValues;
+  private BeamRecordTypeProvider dataType;
+
+  private Instant windowStart = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+  private Instant windowEnd = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
+  public BeamRecord(BeamRecordTypeProvider dataType) {
+    this.dataType = dataType;
+    this.dataValues = new ArrayList<>();
+    for (int idx = 0; idx < dataType.size(); ++idx) {
+      dataValues.add(null);
+      nullFields.add(idx);
+    }
+  }
+
+  public BeamRecord(BeamRecordTypeProvider dataType, List<Object> dataValues) {
+    this(dataType);
+    for (int idx = 0; idx < dataValues.size(); ++idx) {
+      addField(idx, dataValues.get(idx));
+    }
+  }
+
+  public void updateWindowRange(BeamRecord upstreamRecord, BoundedWindow 
window){
+    windowStart = upstreamRecord.windowStart;
+    windowEnd = upstreamRecord.windowEnd;
+
+    if (window instanceof IntervalWindow) {
+      IntervalWindow iWindow = (IntervalWindow) window;
+      windowStart = iWindow.start();
+      windowEnd = iWindow.end();
+    }
+  }
+
+  public void addField(String fieldName, Object fieldValue) {
+    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+  }
+
+  public void addField(int index, Object fieldValue) {
+    if (fieldValue == null) {
+      return;
+    } else {
+      if (nullFields.contains(index)) {
+        nullFields.remove(nullFields.indexOf(index));
+      }
+    }
+
+    dataType.validateValueType(index, fieldValue);
+    dataValues.set(index, fieldValue);
+  }
+
+  public Object getFieldValue(String fieldName) {
+    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+  }
+
+  public byte getByte(String fieldName) {
+    return (Byte) getFieldValue(fieldName);
+  }
+
+  public short getShort(String fieldName) {
+    return (Short) getFieldValue(fieldName);
+  }
+
+  public int getInteger(String fieldName) {
+    return (Integer) getFieldValue(fieldName);
+  }
+
+  public float getFloat(String fieldName) {
+    return (Float) getFieldValue(fieldName);
+  }
+
+  public double getDouble(String fieldName) {
+    return (Double) getFieldValue(fieldName);
+  }
+
+  public long getLong(String fieldName) {
+    return (Long) getFieldValue(fieldName);
+  }
+
+  public String getString(String fieldName) {
+    return (String) getFieldValue(fieldName);
+  }
+
+  public Date getDate(String fieldName) {
+    return (Date) getFieldValue(fieldName);
+  }
+
+  public GregorianCalendar getGregorianCalendar(String fieldName) {
+    return (GregorianCalendar) getFieldValue(fieldName);
+  }
+
+  public BigDecimal getBigDecimal(String fieldName) {
+    return (BigDecimal) getFieldValue(fieldName);
+  }
+
+  public boolean getBoolean(String fieldName) {
+    return (boolean) getFieldValue(fieldName);
+  }
+
+  public Object getFieldValue(int fieldIdx) {
+    if (nullFields.contains(fieldIdx)) {
+      return null;
+    }
+
+    return dataValues.get(fieldIdx);
+  }
+
+  public byte getByte(int idx) {
+    return (Byte) getFieldValue(idx);
+  }
+
+  public short getShort(int idx) {
+    return (Short) getFieldValue(idx);
+  }
+
+  public int getInteger(int idx) {
+    return (Integer) getFieldValue(idx);
+  }
+
+  public float getFloat(int idx) {
+    return (Float) getFieldValue(idx);
+  }
+
+  public double getDouble(int idx) {
+    return (Double) getFieldValue(idx);
+  }
+
+  public long getLong(int idx) {
+    return (Long) getFieldValue(idx);
+  }
+
+  public String getString(int idx) {
+    return (String) getFieldValue(idx);
+  }
+
+  public Date getDate(int idx) {
+    return (Date) getFieldValue(idx);
+  }
+
+  public GregorianCalendar getGregorianCalendar(int idx) {
+    return (GregorianCalendar) getFieldValue(idx);
+  }
+
+  public BigDecimal getBigDecimal(int idx) {
+    return (BigDecimal) getFieldValue(idx);
+  }
+
+  public boolean getBoolean(int idx) {
+    return (boolean) getFieldValue(idx);
+  }
+
+  public int size() {
+    return dataValues.size();
+  }
+
+  public List<Object> getDataValues() {
+    return dataValues;
+  }
+
+  public void setDataValues(List<Object> dataValues) {
+    this.dataValues = dataValues;
+  }
+
+  public BeamRecordTypeProvider getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(BeamRecordTypeProvider dataType) {
+    this.dataType = dataType;
+  }
+
+  public void setNullFields(List<Integer> nullFields) {
+    this.nullFields = nullFields;
+  }
+
+  public List<Integer> getNullFields() {
+    return nullFields;
+  }
+
+  /**
+   * is the specified field NULL?
+   */
+  public boolean isNull(int idx) {
+    return nullFields.contains(idx);
+  }
+
+  public Instant getWindowStart() {
+    return windowStart;
+  }
+
+  public Instant getWindowEnd() {
+    return windowEnd;
+  }
+
+  public void setWindowStart(Instant windowStart) {
+    this.windowStart = windowStart;
+  }
+
+  public void setWindowEnd(Instant windowEnd) {
+    this.windowEnd = windowEnd;
+  }
+
+  @Override
+  public String toString() {
+    return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + 
dataValues + ", dataType="
+        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + 
windowEnd + "]";
+  }
+
+  /**
+   * Return data fields as key=value.
+   */
+  public String valueInString() {
+    StringBuilder sb = new StringBuilder();
+    for (int idx = 0; idx < size(); ++idx) {
+      sb.append(
+          String.format(",%s=%s", getDataType().getFieldsName().get(idx), 
getFieldValue(idx)));
+    }
+    return sb.substring(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    BeamRecord other = (BeamRecord) obj;
+    return toString().equals(other.toString());
+  }
+
+  @Override public int hashCode() {
+    return 31 * (31 * getDataType().hashCode() + getDataValues().hashCode())
+        + getNullFields().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
new file mode 100644
index 0000000..63a961c
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordTypeProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * The default type provider used in {@link BeamRecord}.
+ */
+@Experimental
+public class BeamRecordTypeProvider implements Serializable{
+  private List<String> fieldsName;
+
+  public BeamRecordTypeProvider(List<String> fieldsName) {
+    this.fieldsName = fieldsName;
+  }
+
+  /**
+   * Validate input fieldValue for a field.
+   * @throws IllegalArgumentException throw exception when the validation 
fails.
+   */
+   public void validateValueType(int index, Object fieldValue)
+      throws IllegalArgumentException{
+     //do nothing by default.
+   }
+
+   public List<String> getFieldsName(){
+     return fieldsName;
+   }
+
+   public String getFieldByIndex(int index){
+     return fieldsName.get(index);
+   }
+
+   public int findIndexOfField(String fieldName){
+     return fieldsName.indexOf(fieldName);
+   }
+
+  public int size(){
+    return fieldsName.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index e0d7a78..0dabf40 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -167,7 +167,7 @@ public class BeamSql {
         BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) 
sourceStream.getCoder();
 
         getSqlEnv().registerTable(sourceTag.getId(),
-            new BeamPCollectionTable(sourceStream, 
sourceCoder.getTableSchema()));
+            new BeamPCollectionTable(sourceStream, 
sourceCoder.getSqlRecordType()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 3bea46a..967dee5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -56,8 +56,8 @@ public class BeamSqlCli {
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, 
Pipeline basePipeline
-      , BeamSqlEnv sqlEnv) throws Exception{
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, 
Pipeline basePipeline,
+      BeamSqlEnv sqlEnv) throws Exception{
     PCollection<BeamSqlRow> resultStream =
         sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
     return resultStream;

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
index 2e0efe8..cb5c7ea 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
@@ -17,298 +17,25 @@
  */
 package org.apache.beam.sdk.extensions.sql.schema;
 
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.Instant;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Represent a generic ROW record in Beam SQL.
- *
+ * {@link BeamSqlRow} represents one row element in a {@link PCollection},
+ * with type provider {@link BeamSqlRowType}.
  */
-public class BeamSqlRow implements Serializable {
-  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new 
HashMap<>();
-  static {
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
-
-  private List<Integer> nullFields = new ArrayList<>();
-  private List<Object> dataValues;
-  private BeamSqlRowType dataType;
-
-  private Instant windowStart = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
-  private Instant windowEnd = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
-  public BeamSqlRow(BeamSqlRowType dataType) {
-    this.dataType = dataType;
-    this.dataValues = new ArrayList<>();
-    for (int idx = 0; idx < dataType.size(); ++idx) {
-      dataValues.add(null);
-      nullFields.add(idx);
-    }
-  }
-
+public class BeamSqlRow extends BeamRecord {
   public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
-    this(dataType);
-    for (int idx = 0; idx < dataValues.size(); ++idx) {
-      addField(idx, dataValues.get(idx));
-    }
-  }
-
-  public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow 
window){
-    windowStart = upstreamRecord.windowStart;
-    windowEnd = upstreamRecord.windowEnd;
-
-    if (window instanceof IntervalWindow) {
-      IntervalWindow iWindow = (IntervalWindow) window;
-      windowStart = iWindow.start();
-      windowEnd = iWindow.end();
-    }
-  }
-
-  public void addField(String fieldName, Object fieldValue) {
-    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
-  }
-
-  public void addField(int index, Object fieldValue) {
-    if (fieldValue == null) {
-      return;
-    } else {
-      if (nullFields.contains(index)) {
-        nullFields.remove(nullFields.indexOf(index));
-      }
-    }
-
-    validateValueType(index, fieldValue);
-    dataValues.set(index, fieldValue);
-  }
-
-  private void validateValueType(int index, Object fieldValue) {
-    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
-    Class javaClazz = 
SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
-    if (javaClazz == null) {
-      throw new UnsupportedOperationException("Data type: " + fieldType + " 
not supported yet!");
-    }
-
-    if (!fieldValue.getClass().equals(javaClazz)) {
-      throw new IllegalArgumentException(
-          String.format("[%s](%s) doesn't match type [%s]",
-              fieldValue, fieldValue.getClass(), fieldType)
-      );
-    }
-  }
-
-  public Object getFieldValue(String fieldName) {
-    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
-  }
-
-  public byte getByte(String fieldName) {
-    return (Byte) getFieldValue(fieldName);
-  }
-
-  public short getShort(String fieldName) {
-    return (Short) getFieldValue(fieldName);
-  }
-
-  public int getInteger(String fieldName) {
-    return (Integer) getFieldValue(fieldName);
-  }
-
-  public float getFloat(String fieldName) {
-    return (Float) getFieldValue(fieldName);
-  }
-
-  public double getDouble(String fieldName) {
-    return (Double) getFieldValue(fieldName);
-  }
-
-  public long getLong(String fieldName) {
-    return (Long) getFieldValue(fieldName);
-  }
-
-  public String getString(String fieldName) {
-    return (String) getFieldValue(fieldName);
-  }
-
-  public Date getDate(String fieldName) {
-    return (Date) getFieldValue(fieldName);
-  }
-
-  public GregorianCalendar getGregorianCalendar(String fieldName) {
-    return (GregorianCalendar) getFieldValue(fieldName);
-  }
-
-  public BigDecimal getBigDecimal(String fieldName) {
-    return (BigDecimal) getFieldValue(fieldName);
-  }
-
-  public boolean getBoolean(String fieldName) {
-    return (boolean) getFieldValue(fieldName);
-  }
-
-  public Object getFieldValue(int fieldIdx) {
-    if (nullFields.contains(fieldIdx)) {
-      return null;
-    }
-
-    return dataValues.get(fieldIdx);
-  }
-
-  public byte getByte(int idx) {
-    return (Byte) getFieldValue(idx);
+    super(dataType, dataValues);
   }
 
-  public short getShort(int idx) {
-    return (Short) getFieldValue(idx);
-  }
-
-  public int getInteger(int idx) {
-    return (Integer) getFieldValue(idx);
-  }
-
-  public float getFloat(int idx) {
-    return (Float) getFieldValue(idx);
-  }
-
-  public double getDouble(int idx) {
-    return (Double) getFieldValue(idx);
-  }
-
-  public long getLong(int idx) {
-    return (Long) getFieldValue(idx);
-  }
-
-  public String getString(int idx) {
-    return (String) getFieldValue(idx);
-  }
-
-  public Date getDate(int idx) {
-    return (Date) getFieldValue(idx);
-  }
-
-  public GregorianCalendar getGregorianCalendar(int idx) {
-    return (GregorianCalendar) getFieldValue(idx);
-  }
-
-  public BigDecimal getBigDecimal(int idx) {
-    return (BigDecimal) getFieldValue(idx);
-  }
-
-  public boolean getBoolean(int idx) {
-    return (boolean) getFieldValue(idx);
-  }
-
-  public int size() {
-    return dataValues.size();
-  }
-
-  public List<Object> getDataValues() {
-    return dataValues;
-  }
-
-  public void setDataValues(List<Object> dataValues) {
-    this.dataValues = dataValues;
-  }
-
-  public BeamSqlRowType getDataType() {
-    return dataType;
-  }
-
-  public void setDataType(BeamSqlRowType dataType) {
-    this.dataType = dataType;
-  }
-
-  public void setNullFields(List<Integer> nullFields) {
-    this.nullFields = nullFields;
-  }
-
-  public List<Integer> getNullFields() {
-    return nullFields;
-  }
-
-  /**
-   * is the specified field NULL?
-   */
-  public boolean isNull(int idx) {
-    return nullFields.contains(idx);
-  }
-
-  public Instant getWindowStart() {
-    return windowStart;
-  }
-
-  public Instant getWindowEnd() {
-    return windowEnd;
-  }
-
-  public void setWindowStart(Instant windowStart) {
-    this.windowStart = windowStart;
-  }
-
-  public void setWindowEnd(Instant windowEnd) {
-    this.windowEnd = windowEnd;
-  }
-
-  @Override
-  public String toString() {
-    return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + 
dataValues + ", dataType="
-        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + 
windowEnd + "]";
-  }
-
-  /**
-   * Return data fields as key=value.
-   */
-  public String valueInString() {
-    StringBuilder sb = new StringBuilder();
-    for (int idx = 0; idx < size(); ++idx) {
-      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), 
getFieldValue(idx)));
-    }
-    return sb.substring(1);
+  public BeamSqlRow(BeamSqlRowType dataType) {
+    super(dataType);
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    BeamSqlRow other = (BeamSqlRow) obj;
-    return toString().equals(other.toString());
-  }
-
-  @Override public int hashCode() {
-    return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + 
nullFields.hashCode();
+  public BeamSqlRowType getDataType() {
+    return (BeamSqlRowType) super.getDataType();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
index bf097d4..3d760c4 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.schema;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.sql.Types;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
@@ -34,13 +35,12 @@ import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 
 /**
  *  A {@link Coder} encodes {@link BeamSqlRow}.
  */
 public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
-  private BeamSqlRowType tableSchema;
+  private BeamSqlRowType sqlRecordType;
 
   private static final ListCoder<Integer> listCoder = 
ListCoder.of(BigEndianIntegerCoder.of());
 
@@ -52,58 +52,59 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSqlRow> {
   private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
   private static final ByteCoder byteCoder = ByteCoder.of();
 
-  public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
-    this.tableSchema = tableSchema;
+  public BeamSqlRowCoder(BeamSqlRowType sqlRecordType) {
+    this.sqlRecordType = sqlRecordType;
   }
 
   @Override
-  public void encode(BeamSqlRow value, OutputStream outStream) throws 
CoderException, IOException {
+  public void encode(BeamSqlRow value, OutputStream outStream)
+      throws CoderException, IOException {
     listCoder.encode(value.getNullFields(), outStream);
     for (int idx = 0; idx < value.size(); ++idx) {
       if (value.getNullFields().contains(idx)) {
         continue;
       }
 
-      switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
-        case INTEGER:
+      switch (sqlRecordType.getFieldsType().get(idx)) {
+        case Types.INTEGER:
           intCoder.encode(value.getInteger(idx), outStream);
           break;
-        case SMALLINT:
+        case Types.SMALLINT:
           intCoder.encode((int) value.getShort(idx), outStream);
           break;
-        case TINYINT:
+        case Types.TINYINT:
           byteCoder.encode(value.getByte(idx), outStream);
           break;
-        case DOUBLE:
+        case Types.DOUBLE:
           doubleCoder.encode(value.getDouble(idx), outStream);
           break;
-        case FLOAT:
+        case Types.FLOAT:
           doubleCoder.encode((double) value.getFloat(idx), outStream);
           break;
-        case DECIMAL:
+        case Types.DECIMAL:
           bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
           break;
-        case BIGINT:
+        case Types.BIGINT:
           longCoder.encode(value.getLong(idx), outStream);
           break;
-        case VARCHAR:
-        case CHAR:
+        case Types.VARCHAR:
+        case Types.CHAR:
           stringCoder.encode(value.getString(idx), outStream);
           break;
-        case TIME:
+        case Types.TIME:
           
longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), 
outStream);
           break;
-        case DATE:
-        case TIMESTAMP:
+        case Types.DATE:
+        case Types.TIMESTAMP:
           longCoder.encode(value.getDate(idx).getTime(), outStream);
           break;
-        case BOOLEAN:
+        case Types.BOOLEAN:
           byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
           break;
 
         default:
           throw new UnsupportedOperationException(
-              "Data type: " + value.getDataType().getFieldsType().get(idx) + " 
not supported yet!");
+              "Data type: " + sqlRecordType.getFieldsType().get(idx) + " not 
supported yet!");
       }
     }
 
@@ -115,55 +116,55 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSqlRow> {
   public BeamSqlRow decode(InputStream inStream) throws CoderException, 
IOException {
     List<Integer> nullFields = listCoder.decode(inStream);
 
-    BeamSqlRow record = new BeamSqlRow(tableSchema);
+    BeamSqlRow record = new BeamSqlRow(sqlRecordType);
     record.setNullFields(nullFields);
-    for (int idx = 0; idx < tableSchema.size(); ++idx) {
+    for (int idx = 0; idx < sqlRecordType.size(); ++idx) {
       if (nullFields.contains(idx)) {
         continue;
       }
 
-      switch (CalciteUtils.getFieldType(tableSchema, idx)) {
-        case INTEGER:
+      switch (sqlRecordType.getFieldsType().get(idx)) {
+        case Types.INTEGER:
           record.addField(idx, intCoder.decode(inStream));
           break;
-        case SMALLINT:
+        case Types.SMALLINT:
           record.addField(idx, intCoder.decode(inStream).shortValue());
           break;
-        case TINYINT:
+        case Types.TINYINT:
           record.addField(idx, byteCoder.decode(inStream));
           break;
-        case DOUBLE:
+        case Types.DOUBLE:
           record.addField(idx, doubleCoder.decode(inStream));
           break;
-        case FLOAT:
+        case Types.FLOAT:
           record.addField(idx, doubleCoder.decode(inStream).floatValue());
           break;
-        case BIGINT:
+        case Types.BIGINT:
           record.addField(idx, longCoder.decode(inStream));
           break;
-        case DECIMAL:
+        case Types.DECIMAL:
           record.addField(idx, bigDecimalCoder.decode(inStream));
           break;
-        case VARCHAR:
-        case CHAR:
+        case Types.VARCHAR:
+        case Types.CHAR:
           record.addField(idx, stringCoder.decode(inStream));
           break;
-        case TIME:
+        case Types.TIME:
           GregorianCalendar calendar = new GregorianCalendar();
           calendar.setTime(new Date(longCoder.decode(inStream)));
           record.addField(idx, calendar);
           break;
-        case DATE:
-        case TIMESTAMP:
+        case Types.DATE:
+        case Types.TIMESTAMP:
           record.addField(idx, new Date(longCoder.decode(inStream)));
           break;
-        case BOOLEAN:
+        case Types.BOOLEAN:
           record.addField(idx, byteCoder.decode(inStream) == 1);
           break;
 
         default:
           throw new UnsupportedOperationException("Data type: "
-              + 
CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx))
+              + sqlRecordType.getFieldsType().get(idx)
               + " not supported yet!");
       }
     }
@@ -174,8 +175,8 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSqlRow> {
     return record;
   }
 
-  public BeamSqlRowType getTableSchema() {
-    return tableSchema;
+  public BeamSqlRowType getSqlRecordType() {
+    return sqlRecordType;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/52933a64/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
index 018fe81..7584dad 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
@@ -17,24 +17,93 @@
  */
 package org.apache.beam.sdk.extensions.sql.schema;
 
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.values.BeamRecordTypeProvider;
 
 /**
- * Field type information in {@link BeamSqlRow}.
+ * Type provider for {@link BeamSqlRow} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a 
href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type";>data 
types</a>
+ * for more details.
  *
  */
-@AutoValue
-public abstract class BeamSqlRowType implements Serializable {
-  public abstract List<String> getFieldsName();
-  public abstract List<Integer> getFieldsType();
+public class BeamSqlRowType extends BeamRecordTypeProvider {
+  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new 
HashMap<>();
+  static {
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+  }
+
+  public List<Integer> fieldsType;
+
+  protected BeamSqlRowType(List<String> fieldsName) {
+    super(fieldsName);
+  }
+
+  public BeamSqlRowType(List<String> fieldsName, List<Integer> fieldsType) {
+    super(fieldsName);
+    this.fieldsType = fieldsType;
+  }
+
+  public static BeamSqlRowType create(List<String> fieldNames,
+      List<Integer> fieldTypes) {
+    return new BeamSqlRowType(fieldNames, fieldTypes);
+  }
+
+  @Override
+  public void validateValueType(int index, Object fieldValue) throws 
IllegalArgumentException {
+    int fieldType = fieldsType.get(index);
+    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+    if (javaClazz == null) {
+      throw new IllegalArgumentException("Data type: " + fieldType + " not 
supported yet!");
+    }
+
+    if (!fieldValue.getClass().equals(javaClazz)) {
+      throw new IllegalArgumentException(
+          String.format("[%s](%s) doesn't match type [%s]",
+              fieldValue, fieldValue.getClass(), fieldType)
+      );
+    }
+  }
+
+  public List<Integer> getFieldsType() {
+    return fieldsType;
+  }
 
-  public static BeamSqlRowType create(List<String> fieldNames, List<Integer> 
fieldTypes) {
-    return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+  @Override
+  public boolean equals(Object obj) {
+    if (obj != null && obj instanceof BeamSqlRowType) {
+      BeamSqlRowType ins = (BeamSqlRowType) obj;
+      return fieldsType.equals(ins.getFieldsType()) && 
getFieldsName().equals(ins.getFieldsName());
+    } else {
+      return false;
+    }
   }
 
-  public int size() {
-    return getFieldsName().size();
+  @Override
+  public int hashCode() {
+    return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
   }
 }

Reply via email to