[BEAM-2621] BeamSqlRecordType -> BeamSqlRowType

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9c8a8a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9c8a8a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9c8a8a1

Branch: refs/heads/DSL_SQL
Commit: a9c8a8a1e1f21d31c973e12b3c05c118e21fa43b
Parents: a1f7cf6
Author: James Xu <[email protected]>
Authored: Mon Jul 17 17:55:56 2017 +0800
Committer: JingsongLi <[email protected]>
Committed: Wed Jul 19 10:05:17 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    | 12 +++---
 .../beam/dsls/sql/example/BeamSqlExample.java   |  4 +-
 .../interpreter/BeamSqlExpressionExecutor.java  |  2 +-
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |  4 +-
 .../operator/BeamSqlCaseExpression.java         |  8 ++--
 .../operator/BeamSqlCastExpression.java         | 22 +++++------
 .../operator/BeamSqlCompareExpression.java      |  6 +--
 .../interpreter/operator/BeamSqlExpression.java |  2 +-
 .../operator/BeamSqlInputRefExpression.java     |  4 +-
 .../operator/BeamSqlIsNotNullExpression.java    |  4 +-
 .../operator/BeamSqlIsNullExpression.java       |  4 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |  2 +-
 .../operator/BeamSqlReinterpretExpression.java  |  6 +--
 .../operator/BeamSqlUdfExpression.java          |  4 +-
 .../operator/BeamSqlWindowEndExpression.java    |  4 +-
 .../operator/BeamSqlWindowExpression.java       |  4 +-
 .../operator/BeamSqlWindowStartExpression.java  |  4 +-
 .../arithmetic/BeamSqlArithmeticExpression.java |  6 +--
 .../date/BeamSqlCurrentDateExpression.java      |  2 +-
 .../date/BeamSqlCurrentTimeExpression.java      |  2 +-
 .../date/BeamSqlCurrentTimestampExpression.java |  2 +-
 .../date/BeamSqlDateCeilExpression.java         |  4 +-
 .../date/BeamSqlDateFloorExpression.java        |  4 +-
 .../operator/date/BeamSqlExtractExpression.java |  4 +-
 .../operator/logical/BeamSqlAndExpression.java  |  4 +-
 .../operator/logical/BeamSqlNotExpression.java  |  4 +-
 .../operator/logical/BeamSqlOrExpression.java   |  4 +-
 .../math/BeamSqlMathBinaryExpression.java       |  4 +-
 .../math/BeamSqlMathUnaryExpression.java        |  4 +-
 .../operator/math/BeamSqlPiExpression.java      |  2 +-
 .../string/BeamSqlCharLengthExpression.java     |  4 +-
 .../string/BeamSqlConcatExpression.java         |  6 +--
 .../string/BeamSqlInitCapExpression.java        |  4 +-
 .../operator/string/BeamSqlLowerExpression.java |  4 +-
 .../string/BeamSqlOverlayExpression.java        | 10 ++---
 .../string/BeamSqlPositionExpression.java       |  8 ++--
 .../string/BeamSqlSubstringExpression.java      |  8 ++--
 .../operator/string/BeamSqlTrimExpression.java  | 10 ++---
 .../operator/string/BeamSqlUpperExpression.java |  4 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 22 +++++------
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  2 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  2 +-
 .../apache/beam/dsls/sql/rel/BeamJoinRel.java   | 12 +++---
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  4 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  2 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  9 ++---
 .../beam/dsls/sql/schema/BaseBeamTable.java     | 10 ++---
 .../dsls/sql/schema/BeamPCollectionTable.java   |  8 ++--
 .../beam/dsls/sql/schema/BeamSqlRecordType.java | 40 --------------------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 10 ++---
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  6 +--
 .../beam/dsls/sql/schema/BeamSqlRowType.java    | 40 ++++++++++++++++++++
 .../beam/dsls/sql/schema/BeamSqlTable.java      |  2 +-
 .../beam/dsls/sql/schema/BeamTableUtils.java    | 10 ++---
 .../sql/schema/kafka/BeamKafkaCSVTable.java     | 29 +++++++-------
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   | 11 +++---
 .../dsls/sql/schema/text/BeamTextCSVTable.java  | 14 +++----
 .../schema/text/BeamTextCSVTableIOReader.java   | 11 +++---
 .../schema/text/BeamTextCSVTableIOWriter.java   |  9 ++---
 .../dsls/sql/schema/text/BeamTextTable.java     |  6 +--
 .../transform/BeamAggregationTransforms.java    | 26 ++++++-------
 .../dsls/sql/transform/BeamJoinTransforms.java  |  6 +--
 .../dsls/sql/transform/BeamSqlProjectFn.java    | 16 ++++----
 .../beam/dsls/sql/utils/CalciteUtils.java       | 12 +++---
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 14 +++----
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    | 26 ++++++-------
 .../beam/dsls/sql/BeamSqlDslJoinTest.java       | 10 ++---
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    | 10 ++---
 .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java    |  6 +--
 .../org/apache/beam/dsls/sql/TestUtils.java     | 28 +++++++-------
 ...mSqlBuiltinFunctionsIntegrationTestBase.java |  6 +--
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  8 ++--
 .../beam/dsls/sql/mock/MockedBoundedTable.java  | 14 +++----
 .../apache/beam/dsls/sql/mock/MockedTable.java  |  6 +--
 .../dsls/sql/mock/MockedUnboundedTable.java     | 14 +++----
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |  6 +--
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  7 ++--
 .../sql/schema/text/BeamTextCSVTableTest.java   | 17 ++++-----
 .../transform/BeamAggregationTransformTest.java | 13 +++----
 .../schema/transform/BeamTransformBaseTest.java | 12 +++---
 80 files changed, 354 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index e8c8c97..0e1ac98 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
@@ -73,7 +73,7 @@ public class BeamSqlEnv implements Serializable{
    *
    */
   public void registerTable(String tableName, BaseBeamTable table) {
-    schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
+    schema.add(tableName, new BeamCalciteTable(table.getRowType()));
     planner.getSourceTables().put(tableName, table);
   }
 
@@ -85,13 +85,13 @@ public class BeamSqlEnv implements Serializable{
   }
 
   private static class BeamCalciteTable implements ScannableTable, 
Serializable {
-    private BeamSqlRecordType beamSqlRecordType;
-    public BeamCalciteTable(BeamSqlRecordType beamSqlRecordType) {
-      this.beamSqlRecordType = beamSqlRecordType;
+    private BeamSqlRowType beamSqlRowType;
+    public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+      this.beamSqlRowType = beamSqlRowType;
     }
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return CalciteUtils.toCalciteRecordType(this.beamSqlRecordType)
+      return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
           .apply(BeamQueryPlanner.TYPE_FACTORY);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 04fe451..91df2be 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -21,9 +21,9 @@ import java.sql.Types;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -51,7 +51,7 @@ class BeamSqlExample {
     //define the input row format
     List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
     List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, 
Types.DOUBLE);
-    BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+    BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
     BeamSqlRow row = new BeamSqlRow(type);
     row.addField(0, 1);
     row.addField(1, "row");

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
index a314bf4..3732933 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -37,7 +37,7 @@ public interface BeamSqlExpressionExecutor extends 
Serializable {
    * apply transformation to input record {@link BeamSqlRow}.
    *
    */
-  List<Object> execute(BeamSqlRow inputRecord);
+  List<Object> execute(BeamSqlRow inputRow);
 
   void close();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 64bc880..0be918d 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -427,10 +427,10 @@ public class BeamSqlFnExecutor implements 
BeamSqlExpressionExecutor {
   }
 
   @Override
-  public List<Object> execute(BeamSqlRow inputRecord) {
+  public List<Object> execute(BeamSqlRow inputRow) {
     List<Object> results = new ArrayList<>();
     for (BeamSqlExpression exp : exps) {
-      results.add(exp.evaluate(inputRecord).getValue());
+      results.add(exp.evaluate(inputRow).getValue());
     }
     return results;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
index a15c42e..a30916b 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -49,16 +49,16 @@ public class BeamSqlCaseExpression extends 
BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     for (int i = 0; i < operands.size() - 1; i += 2) {
-      if (opValueEvaluated(i, inputRecord)) {
+      if (opValueEvaluated(i, inputRow)) {
         return BeamSqlPrimitive.of(
             outputType,
-            opValueEvaluated(i + 1, inputRecord)
+            opValueEvaluated(i + 1, inputRow)
         );
       }
     }
     return BeamSqlPrimitive.of(outputType,
-        opValueEvaluated(operands.size() - 1, inputRecord));
+        opValueEvaluated(operands.size() - 1, inputRow));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
index 7e8ab03..524d1df 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -72,40 +72,40 @@ public class BeamSqlCastExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     SqlTypeName castOutputType = getOutputType();
     switch (castOutputType) {
       case INTEGER:
         return BeamSqlPrimitive
-            .of(SqlTypeName.INTEGER, 
SqlFunctions.toInt(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.INTEGER, 
SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
       case DOUBLE:
         return BeamSqlPrimitive
-            .of(SqlTypeName.DOUBLE, 
SqlFunctions.toDouble(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.DOUBLE, 
SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
       case SMALLINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.SMALLINT, 
SqlFunctions.toShort(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.SMALLINT, 
SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
       case TINYINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.TINYINT, 
SqlFunctions.toByte(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.TINYINT, 
SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
       case BIGINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, 
SqlFunctions.toLong(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.BIGINT, 
SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
       case DECIMAL:
         return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
-            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord)));
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
       case FLOAT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.FLOAT, 
SqlFunctions.toFloat(opValueEvaluated(index, inputRecord)));
+            .of(SqlTypeName.FLOAT, 
SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
       case CHAR:
       case VARCHAR:
         return BeamSqlPrimitive
-            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, 
inputRecord).toString());
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, 
inputRow).toString());
       case DATE:
         return BeamSqlPrimitive
-            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), 
outputDateFormat));
+            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), 
outputDateFormat));
       case TIMESTAMP:
         return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-            toTimeStamp(opValueEvaluated(index, inputRecord), 
outputTimestampFormat));
+            toTimeStamp(opValueEvaluated(index, inputRow), 
outputTimestampFormat));
     }
     throw new UnsupportedOperationException(
         String.format("Cast to type %s not supported", castOutputType));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
index 3d96616..5076ccc 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -48,9 +48,9 @@ public abstract class BeamSqlCompareExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
-    Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
     switch (operands.get(0).outputType) {
     case BIGINT:
     case DECIMAL:

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 33feb3e..9d2815c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -62,7 +62,7 @@ public abstract class BeamSqlExpression implements 
Serializable {
    * Apply input record {@link BeamSqlRow} to this expression,
    * the output value is wrapped with {@link BeamSqlPrimitive}.
    */
-  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRecord);
+  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
 
   public List<BeamSqlExpression> getOperands() {
     return operands;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
index b6d2b0b..710460b 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -37,7 +37,7 @@ public class BeamSqlInputRefExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    return BeamSqlPrimitive.of(outputType, 
inputRecord.getFieldValue(inputRef));
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
index e08e737..23d9c83 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -44,8 +44,8 @@ public class BeamSqlIsNotNullExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
index d4e070d..4d3fd45 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -44,8 +44,8 @@ public class BeamSqlIsNullExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index c5c80b9..51724bb 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -145,7 +145,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
index 783466c..efdb2df 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -42,13 +42,13 @@ public class BeamSqlReinterpretExpression extends 
BeamSqlExpression {
         && SqlTypeName.DATETIME_TYPES.contains(opType(0));
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     if (opType(0) == SqlTypeName.TIME) {
-      GregorianCalendar date = opValueEvaluated(0, inputRecord);
+      GregorianCalendar date = opValueEvaluated(0, inputRow);
       return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
 
     } else {
-      Date date = opValueEvaluated(0, inputRecord);
+      Date date = opValueEvaluated(0, inputRow);
       return BeamSqlPrimitive.of(outputType, date.getTime());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
index 6f18307..e389ef9 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -51,14 +51,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression 
{
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     if (method == null) {
       reConstructMethod();
     }
     try {
       List<Object> paras = new ArrayList<>();
       for (BeamSqlExpression e : getOperands()) {
-        paras.add(e.evaluate(inputRecord).getValue());
+        paras.add(e.evaluate(inputRow).getValue());
       }
 
       return BeamSqlPrimitive.of(getOutputType(),

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
index 8bc090f..ecc6939 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -34,9 +34,9 @@ public class BeamSqlWindowEndExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRecord.getWindowEnd().getMillis()));
+        new Date(inputRow.getWindowEnd().getMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
index eb4c03b..71f0672 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -42,9 +42,9 @@ public class BeamSqlWindowExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        (Date) operands.get(0).evaluate(inputRecord).getValue());
+        (Date) operands.get(0).evaluate(inputRow).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
index 1e2c0a2..f3aba2e 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -35,9 +35,9 @@ public class BeamSqlWindowStartExpression extends 
BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRecord.getWindowStart().getMillis()));
+        new Date(inputRow.getWindowStart().getMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index eac4c72..d62123c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -50,11 +50,11 @@ public abstract class BeamSqlArithmeticExpression extends 
BeamSqlExpression {
     super(operands, outputType);
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow 
inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow 
inputRow) {
     BigDecimal left = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(0, inputRecord).toString()));
+        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
     BigDecimal right = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(1, inputRecord).toString()));
+        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
 
     BigDecimal result = calc(left, right);
     return getCorrectlyTypedResult(result);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index 2f83140..c7df5ab 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -39,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends 
BeamSqlExpression {
     return getOperands().size() == 0;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index c15123a..46e5a43 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -45,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends 
BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
     ret.setTime(new Date());
     return BeamSqlPrimitive.of(outputType, ret);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 0ea12f1..303846d 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -43,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends 
BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 68f1aa9..59e3e9c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -42,8 +42,8 @@ public class BeamSqlDateCeilExpression extends 
BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Date date = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
index 4d446e3..64234f5 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -42,8 +42,8 @@ public class BeamSqlDateFloorExpression extends 
BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Date date = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
index bc8ed0f..d41a249 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -61,8 +61,8 @@ public class BeamSqlExtractExpression extends 
BeamSqlExpression {
         && opType(1) == SqlTypeName.BIGINT;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Long time = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Long time = opValueEvaluated(1, inputRow);
 
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
index 5da43f4..5f6abe0 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -33,10 +33,10 @@ public class BeamSqlAndExpression extends 
BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
     boolean result = true;
     for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
       result = result && expOut.getValue();
       if (!result) {
         break;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
index ffa0184..6df52aa 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -43,8 +43,8 @@ public class BeamSqlNotExpression extends 
BeamSqlLogicalExpression {
     return super.accept();
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    Boolean value = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Boolean value = opValueEvaluated(0, inputRow);
     if (value == null) {
       return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
     } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
index 9ca57f0..450638c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -33,10 +33,10 @@ public class BeamSqlOrExpression extends 
BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
     boolean result = false;
     for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
         result = result || expOut.getValue();
         if (result) {
           break;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index f79bcf6..2d444f8 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -39,10 +39,10 @@ public abstract class BeamSqlMathBinaryExpression extends 
BeamSqlExpression {
     return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && 
isOperandNumeric(opType(1));
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow 
inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow 
inputRow) {
     BeamSqlExpression leftOp = op(0);
     BeamSqlExpression rightOp = op(1);
-    return calculate(leftOp.evaluate(inputRecord), 
rightOp.evaluate(inputRecord));
+    return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index a65333c..4733d09 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -45,9 +45,9 @@ public abstract class BeamSqlMathUnaryExpression extends 
BeamSqlExpression {
     return acceptance;
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow 
inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow 
inputRow) {
     BeamSqlExpression operand = op(0);
-    return calculate(operand.evaluate(inputRecord));
+    return calculate(operand.evaluate(inputRow));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
index 4645951..9db810e 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
@@ -36,7 +36,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
     return numberOfOperands() == 0;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 3ed9b80..7c61061 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlCharLengthExpression extends 
BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.INTEGER);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
index e8e4e50..93e1f71 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -52,9 +52,9 @@ public class BeamSqlConcatExpression extends 
BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String left = opValueEvaluated(0, inputRecord);
-    String right = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String left = opValueEvaluated(0, inputRow);
+    String right = opValueEvaluated(1, inputRow);
 
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
         new StringBuilder(left.length() + right.length())

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
index 51dfe28..7726e27 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlInitCapExpression extends 
BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
 
     StringBuilder ret = new StringBuilder(str);
     boolean isInit = true;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
index f70fb1a..cb198ec 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlLowerExpression extends 
BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
index 20d9962..cb6a523 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -55,15 +55,15 @@ public class BeamSqlOverlayExpression extends 
BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
-    String replaceStr = opValueEvaluated(1, inputRecord);
-    int idx = opValueEvaluated(2, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+    String replaceStr = opValueEvaluated(1, inputRow);
+    int idx = opValueEvaluated(2, inputRow);
     // the index is 1 based.
     idx -= 1;
     int length = replaceStr.length();
     if (operands.size() == 4) {
-      length = opValueEvaluated(3, inputRecord);
+      length = opValueEvaluated(3, inputRow);
     }
 
     StringBuilder result = new StringBuilder(

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
index 1d09b51..144acbf 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -57,12 +57,12 @@ public class BeamSqlPositionExpression extends 
BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String targetStr = opValueEvaluated(0, inputRecord);
-    String containingStr = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String targetStr = opValueEvaluated(0, inputRow);
+    String containingStr = opValueEvaluated(1, inputRow);
     int from = -1;
     if (operands.size() == 3) {
-      Number tmp = opValueEvaluated(2, inputRecord);
+      Number tmp = opValueEvaluated(2, inputRow);
       from = tmp.intValue();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
index d9bbc98..8b33125 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -55,9 +55,9 @@ public class BeamSqlSubstringExpression extends 
BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
-    int idx = opValueEvaluated(1, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+    int idx = opValueEvaluated(1, inputRow);
     int startIdx = idx;
     if (startIdx > 0) {
       // NOTE: SQL substring is 1 based(rather than 0 based)
@@ -70,7 +70,7 @@ public class BeamSqlSubstringExpression extends 
BeamSqlExpression {
     }
 
     if (operands.size() == 3) {
-      int length = opValueEvaluated(2, inputRecord);
+      int length = opValueEvaluated(2, inputRow);
       if (length < 0) {
         length = 0;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
index ac4d060..5e6c2bb 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -58,14 +58,14 @@ public class BeamSqlTrimExpression extends 
BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
     if (operands.size() == 1) {
       return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-          opValueEvaluated(0, inputRecord).toString().trim());
+          opValueEvaluated(0, inputRow).toString().trim());
     } else {
-      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRecord);
-      String targetStr = opValueEvaluated(1, inputRecord);
-      String containingStr = opValueEvaluated(2, inputRecord);
+      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
+      String targetStr = opValueEvaluated(1, inputRow);
+      String containingStr = opValueEvaluated(2, inputRow);
 
       switch (type) {
         case LEADING:

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
index 8fcaca4..efa9c95 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -33,8 +33,8 @@ public class BeamSqlUpperExpression extends 
BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    String str = opValueEvaluated(0, inputRecord);
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 5389ec7..9dcb079 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -20,9 +20,9 @@ package org.apache.beam.dsls.sql.rel;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -105,13 +105,13 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
         stageName + "combineBy",
         Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
             new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
-                CalciteUtils.toBeamRecordType(input.getRowType()))))
+                CalciteUtils.toBeamRowType(input.getRowType()))))
         .setCoder(KvCoder.of(keyCoder, aggCoder));
 
     PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + 
"mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            CalciteUtils.toBeamRecordType(getRowType()), getAggCallList(), 
windowFieldIdx)));
-    mergedStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+            CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), 
windowFieldIdx)));
+    mergedStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return mergedStream;
   }
@@ -119,23 +119,23 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
   /**
    * Type of sub-rowrecord used as Group-By keys.
    */
-  private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSqlRecordType inputRecordType = 
CalciteUtils.toBeamRecordType(relDataType);
+  private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (int i : groupSet.asList()) {
       if (i != windowFieldIdx) {
-        fieldNames.add(inputRecordType.getFieldsName().get(i));
-        fieldTypes.add(inputRecordType.getFieldsType().get(i));
+        fieldNames.add(inputRowType.getFieldsName().get(i));
+        fieldTypes.add(inputRowType.getFieldsType().get(i));
       }
     }
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Type of sub-rowrecord, that represents the list of aggregation fields.
    */
-  private BeamSqlRecordType exAggFieldsSchema() {
+  private BeamSqlRowType exAggFieldsSchema() {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (AggregateCall ac : getAggCallList()) {
@@ -143,7 +143,7 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
       fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
     }
 
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 07b5c7c..f802104 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -62,7 +62,7 @@ public class BeamFilterRel extends Filter implements 
BeamRelNode {
 
     PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
         ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-    filterStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+    filterStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return filterStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index b26d2b8..6754991 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -56,7 +56,7 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
       //If not, the source PColection is provided with 
BaseBeamTable.buildIOReader().
       BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
       return sourceTable.buildIOReader(inputPCollections.getPipeline())
-          .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+          .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
index 3c92e42..3ebf152 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.Coder;
@@ -97,7 +97,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
       BeamSqlEnv sqlEnv)
       throws Exception {
     BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamSqlRecordType leftRowType = 
CalciteUtils.toBeamRecordType(left.getRowType());
+    BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
     PCollection<BeamSqlRow> leftRows = 
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
 
     final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
@@ -119,7 +119,7 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
       names.add("c" + i);
       types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
     }
-    BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, 
types);
+    BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
 
     Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
 
@@ -213,7 +213,7 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     PCollection<BeamSqlRow> ret = joinedRows
         .apply(stageName + "_JoinParts2WholeRow",
             MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
-        .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+        .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
     return ret;
   }
 
@@ -249,13 +249,13 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     PCollection<BeamSqlRow> ret = leftRows
         .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
             joinType, rightNullRow, rowsView, 
swapped)).withSideInputs(rowsView))
-        .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+        .setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return ret;
   }
 
   private BeamSqlRow buildNullRow(BeamRelNode relNode) {
-    BeamSqlRecordType leftType = 
CalciteUtils.toBeamRecordType(relNode.getRowType());
+    BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
     BeamSqlRow nullRow = new BeamSqlRow(leftType);
     for (int i = 0; i < leftType.size(); i++) {
       nullRow.addField(i, null);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 2cdfc72..8f8e5ce 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -72,8 +72,8 @@ public class BeamProjectRel extends Project implements 
BeamRelNode {
 
     PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSqlProjectFn(getRelTypeName(), executor,
-            CalciteUtils.toBeamRecordType(rowType))));
-    projectStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+            CalciteUtils.toBeamRowType(rowType))));
+    projectStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return projectStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 75f9717..ba344df 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -149,7 +149,7 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
 
     PCollection<BeamSqlRow> orderedStream = rawStream.apply(
         "flatten", Flatten.<BeamSqlRow>iterables());
-    orderedStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+    orderedStream.setCoder(new 
BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
 
     return orderedStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index 030d2c8..43b74c3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -19,13 +19,12 @@
 package org.apache.beam.dsls.sql.rel;
 
 import com.google.common.collect.ImmutableList;
-
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Create;
@@ -65,9 +64,9 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
       throw new IllegalStateException("Values with empty tuples!");
     }
 
-    BeamSqlRecordType beamSQLRecordType = 
CalciteUtils.toBeamRecordType(this.getRowType());
+    BeamSqlRowType beamSQLRowType = 
CalciteUtils.toBeamRowType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
+      BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
       for (int i = 0; i < tuple.size(); i++) {
         BeamTableUtils.addFieldWithAutoTypeCasting(row, i, 
tuple.get(i).getValue());
       }
@@ -75,6 +74,6 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
     }
 
     return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
-        .setCoder(new BeamSqlRowCoder(beamSQLRecordType));
+        .setCoder(new BeamSqlRowCoder(beamSQLRowType));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 6d49bcc..dfa2785 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
  * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
  */
 public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
-  protected BeamSqlRecordType beamSqlRecordType;
-  public BaseBeamTable(BeamSqlRecordType beamSqlRecordType) {
-    this.beamSqlRecordType = beamSqlRecordType;
+  protected BeamSqlRowType beamSqlRowType;
+  public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+    this.beamSqlRowType = beamSqlRowType;
   }
 
-  @Override public BeamSqlRecordType getRecordType() {
-    return beamSqlRecordType;
+  @Override public BeamSqlRowType getRowType() {
+    return beamSqlRowType;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index 8309097..5b63780 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -31,13 +31,13 @@ public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
   private transient PCollection<BeamSqlRow> upstream;
 
-  protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
-      BeamSqlRecordType beamSqlRecordType){
-    this(beamSqlRecordType);
+      BeamSqlRowType beamSqlRowType){
+    this(beamSqlRowType);
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
     this.upstream = upstream;

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
deleted file mode 100644
index 52bd652..0000000
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Field type information in {@link BeamSqlRow}.
- *
- */
-@AutoValue
-public abstract class BeamSqlRecordType implements Serializable {
-  public abstract List<String> getFieldsName();
-  public abstract List<Integer> getFieldsType();
-
-  public static BeamSqlRecordType create(List<String> fieldNames, 
List<Integer> fieldTypes) {
-    return new 
org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, 
fieldTypes);
-  }
-
-  public int size() {
-    return getFieldsName().size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 5c0dbc0..d789446 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -61,12 +61,12 @@ public class BeamSqlRow implements Serializable {
 
   private List<Integer> nullFields = new ArrayList<>();
   private List<Object> dataValues;
-  private BeamSqlRecordType dataType;
+  private BeamSqlRowType dataType;
 
   private Instant windowStart = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
   private Instant windowEnd = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
-  public BeamSqlRow(BeamSqlRecordType dataType) {
+  public BeamSqlRow(BeamSqlRowType dataType) {
     this.dataType = dataType;
     this.dataValues = new ArrayList<>();
     for (int idx = 0; idx < dataType.size(); ++idx) {
@@ -75,7 +75,7 @@ public class BeamSqlRow implements Serializable {
     }
   }
 
-  public BeamSqlRow(BeamSqlRecordType dataType, List<Object> dataValues) {
+  public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
     this(dataType);
     for (int idx = 0; idx < dataValues.size(); ++idx) {
       addField(idx, dataValues.get(idx));
@@ -237,11 +237,11 @@ public class BeamSqlRow implements Serializable {
     this.dataValues = dataValues;
   }
 
-  public BeamSqlRecordType getDataType() {
+  public BeamSqlRowType getDataType() {
     return dataType;
   }
 
-  public void setDataType(BeamSqlRecordType dataType) {
+  public void setDataType(BeamSqlRowType dataType) {
     this.dataType = dataType;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index c798b35..f14864a 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
  *  A {@link Coder} encodes {@link BeamSqlRow}.
  */
 public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
-  private BeamSqlRecordType tableSchema;
+  private BeamSqlRowType tableSchema;
 
   private static final ListCoder<Integer> listCoder = 
ListCoder.of(BigEndianIntegerCoder.of());
 
@@ -52,7 +52,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
   private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
   private static final ByteCoder byteCoder = ByteCoder.of();
 
-  public BeamSqlRowCoder(BeamSqlRecordType tableSchema) {
+  public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
     this.tableSchema = tableSchema;
   }
 
@@ -174,7 +174,7 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSqlRow> {
     return record;
   }
 
-  public BeamSqlRecordType getTableSchema() {
+  public BeamSqlRowType getTableSchema() {
     return tableSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
new file mode 100644
index 0000000..1129bdd
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+@AutoValue
+public abstract class BeamSqlRowType implements Serializable {
+  public abstract List<String> getFieldsName();
+  public abstract List<Integer> getFieldsType();
+
+  public static BeamSqlRowType create(List<String> fieldNames, List<Integer> 
fieldTypes) {
+    return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+  }
+
+  public int size() {
+    return getFieldsName().size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
index 986decb..d419473 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
@@ -48,5 +48,5 @@ public interface BeamSqlTable {
   /**
    * Get the schema info of the table.
    */
-   BeamSqlRecordType getRecordType();
+   BeamSqlRowType getRowType();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index 7157793..4b7e76b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -37,19 +37,19 @@ public final class BeamTableUtils {
   public static BeamSqlRow csvLine2BeamSqlRow(
       CSVFormat csvFormat,
       String line,
-      BeamSqlRecordType beamSqlRecordType) {
-    BeamSqlRow row = new BeamSqlRow(beamSqlRecordType);
+      BeamSqlRowType beamSqlRowType) {
+    BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
 
-      if (rawRecord.size() != beamSqlRecordType.size()) {
+      if (rawRecord.size() != beamSqlRowType.size()) {
         throw new IllegalArgumentException(String.format(
             "Expect %d fields, but actually %d",
-            beamSqlRecordType.size(), rawRecord.size()
+            beamSqlRowType.size(), rawRecord.size()
         ));
       } else {
-        for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
+        for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
           String raw = rawRecord.get(idx);
           addFieldWithAutoTypeCasting(row, idx, raw);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index 39cf8d8..a18f3de 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -21,9 +21,8 @@ import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
 import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
 
 import java.util.List;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -37,27 +36,27 @@ import org.apache.commons.csv.CSVFormat;
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
   private CSVFormat csvFormat;
-  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String 
bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String 
bootstrapServers,
       List<String> topics) {
-    this(beamSqlRecordType, bootstrapServers, topics, CSVFormat.DEFAULT);
+    this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
   }
 
-  public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String 
bootstrapServers,
+  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String 
bootstrapServers,
       List<String> topics, CSVFormat format) {
-    super(beamSqlRecordType, bootstrapServers, topics);
+    super(beamSqlRowType, bootstrapServers, topics);
     this.csvFormat = format;
   }
 
   @Override
   public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
       getPTransformForInput() {
-    return new CsvRecorderDecoder(beamSqlRecordType, csvFormat);
+    return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
   }
 
   @Override
   public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput() {
-    return new CsvRecorderEncoder(beamSqlRecordType, csvFormat);
+    return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
   }
 
   /**
@@ -66,10 +65,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
    */
   public static class CsvRecorderDecoder
       extends PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSqlRow>> {
-    private BeamSqlRecordType recordType;
+    private BeamSqlRowType rowType;
     private CSVFormat format;
-    public CsvRecorderDecoder(BeamSqlRecordType recordType, CSVFormat format) {
-      this.recordType = recordType;
+    public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+      this.rowType = rowType;
       this.format = format;
     }
 
@@ -79,7 +78,7 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
         @ProcessElement
         public void processElement(ProcessContext c) {
           String rowInString = new String(c.element().getValue());
-          c.output(csvLine2BeamSqlRow(format, rowInString, recordType));
+          c.output(csvLine2BeamSqlRow(format, rowInString, rowType));
         }
       }));
     }
@@ -91,10 +90,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
    */
   public static class CsvRecorderEncoder
       extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], 
byte[]>>> {
-    private BeamSqlRecordType recordType;
+    private BeamSqlRowType rowType;
     private CSVFormat format;
-    public CsvRecorderEncoder(BeamSqlRecordType recordType, CSVFormat format) {
-      this.recordType = recordType;
+    public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+      this.rowType = rowType;
       this.format = format;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index f27014e..faa2706 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -22,11 +22,10 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -49,13 +48,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable 
implements Serializab
   private List<String> topics;
   private Map<String, Object> configUpdates;
 
-  protected BeamKafkaTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
-  public BeamKafkaTable(BeamSqlRecordType beamSqlRecordType, String 
bootstrapServers,
+  public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
       List<String> topics) {
-    super(beamSqlRecordType);
+    super(beamSqlRowType);
     this.bootstrapServers = bootstrapServers;
     this.topics = topics;
   }

Reply via email to