Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 3625dbd9e -> f1c2b6540


http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
new file mode 100644
index 0000000..3816063
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.beam.dsls.sql.planner.BeamQueryPlanner;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements ScannableTable, Serializable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = -1262988061830914193L;
+  private RelDataType relDataType;
+
+  protected BeamSQLRecordType beamSqlRecordType;
+
+  public BaseBeamTable(RelProtoDataType protoRowType) {
+    this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
+    this.beamSqlRecordType = BeamSQLRecordType.from(relDataType);
+  }
+
+  /**
+   * In Beam SQL, there's no difference between a batch query and a streaming
+   * query. {@link BeamIOType} is used to validate the sources.
+   */
+  public abstract BeamIOType getSourceType();
+
+  /**
+   * create a {@code IO.read()} instance to read from source.
+   *
+   */
+  public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> 
buildIOReader();
+
+  /**
+   * create a {@code IO.write()} instance to write to target.
+   *
+   */
+  public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> 
buildIOWriter();
+
+  @Override
+  public Enumerable<Object[]> scan(DataContext root) {
+    // not used as Beam SQL uses its own execution engine
+    return null;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return relDataType;
+  }
+
+  /**
+   * Not used {@link Statistic} to optimize the plan.
+   */
+  @Override
+  public Statistic getStatistic() {
+    return Statistics.UNKNOWN;
+  }
+
+  /**
+   * all sources are treated as TABLE in Beam SQL.
+   */
+  @Override
+  public TableType getJdbcTableType() {
+    return TableType.TABLE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
new file mode 100644
index 0000000..5e55b0f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+  BOUNDED, UNBOUNDED;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
new file mode 100644
index 0000000..dc8e381
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Field type information in {@link BeamSQLRow}.
+ *
+ */
+//@DefaultCoder(BeamSQLRecordTypeCoder.class)
+public class BeamSQLRecordType implements Serializable {
+  /**
+   *
+   */
+  private static final long serialVersionUID = -5318734648766104712L;
+  private List<String> fieldsName = new ArrayList<>();
+  private List<SqlTypeName> fieldsType = new ArrayList<>();
+
+  public static BeamSQLRecordType from(RelDataType tableInfo) {
+    BeamSQLRecordType record = new BeamSQLRecordType();
+    for (RelDataTypeField f : tableInfo.getFieldList()) {
+      record.fieldsName.add(f.getName());
+      record.fieldsType.add(f.getType().getSqlTypeName());
+    }
+    return record;
+  }
+
+  public int size() {
+    return fieldsName.size();
+  }
+
+  public List<String> getFieldsName() {
+    return fieldsName;
+  }
+
+  public void setFieldsName(List<String> fieldsName) {
+    this.fieldsName = fieldsName;
+  }
+
+  public List<SqlTypeName> getFieldsType() {
+    return fieldsType;
+  }
+
+  public void setFieldsType(List<SqlTypeName> fieldsType) {
+    this.fieldsType = fieldsType;
+  }
+
+  @Override
+  public String toString() {
+    return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + 
fieldsType + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
new file mode 100644
index 0000000..2989cb9
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * A {@link Coder} for {@link BeamSQLRecordType}.
+ *
+ */
+public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+
+  private static final BeamSQLRecordTypeCoder INSTANCE = new 
BeamSQLRecordTypeCoder();
+  private BeamSQLRecordTypeCoder(){}
+
+  public static BeamSQLRecordTypeCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(BeamSQLRecordType value, OutputStream outStream,
+      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, 
IOException {
+    Context nested = context.nested();
+    intCoder.encode(value.size(), outStream, nested);
+    for(String fieldName : value.getFieldsName()){
+      stringCoder.encode(fieldName, outStream, nested);
+    }
+    for(SqlTypeName fieldType : value.getFieldsType()){
+      stringCoder.encode(fieldType.name(), outStream, nested);
+    }
+    outStream.flush();
+  }
+
+  @Override
+  public BeamSQLRecordType decode(InputStream inStream,
+      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, 
IOException {
+    BeamSQLRecordType typeRecord = new BeamSQLRecordType();
+    Context nested = context.nested();
+    int size = intCoder.decode(inStream, nested);
+    for(int idx=0; idx<size; ++idx){
+      typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested));
+    }
+    for(int idx=0; idx<size; ++idx){
+      
typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, 
nested)));
+    }
+    return typeRecord;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic()
+      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+    // TODO Auto-generated method stub
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
new file mode 100644
index 0000000..db93168
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Repersent a generic ROW record in Beam SQL.
+ *
+ */
+public class BeamSQLRow implements Serializable {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 4569220242480160895L;
+
+  private List<Integer> nullFields = new ArrayList<>();
+  private List<Object> dataValues;
+  private BeamSQLRecordType dataType;
+
+  public BeamSQLRow(BeamSQLRecordType dataType) {
+    this.dataType = dataType;
+    this.dataValues = new ArrayList<>();
+    for(int idx=0; idx<dataType.size(); ++idx){
+      dataValues.add(null);
+    }
+  }
+
+  public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
+    this.dataValues = dataValues;
+    this.dataType = dataType;
+  }
+
+  public void addField(String fieldName, Object fieldValue) {
+    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+  }
+
+  public void addField(int index, Object fieldValue) {
+    if(fieldValue == null){
+      dataValues.set(index, fieldValue);
+      if(!nullFields.contains(index)){nullFields.add(index);}
+      return;
+    }
+
+    SqlTypeName fieldType = dataType.getFieldsType().get(index);
+    switch (fieldType) {
+    case INTEGER:
+    case SMALLINT:
+    case TINYINT:
+      if(!(fieldValue instanceof Integer)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }
+      break;
+    case DOUBLE:
+      if(!(fieldValue instanceof Double)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }
+      break;
+    case BIGINT:
+      if(!(fieldValue instanceof Long)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }
+      break;
+    case FLOAT:
+      if(!(fieldValue instanceof Float)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }
+      break;
+    case VARCHAR:
+      if(!(fieldValue instanceof String)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }
+      break;
+    case TIME:
+    case TIMESTAMP:
+      if(!(fieldValue instanceof Date)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }
+      break;
+    default:
+      throw new UnsupportedDataTypeException(fieldType);
+    }
+    dataValues.set(index, fieldValue);
+  }
+
+
+  public int getInteger(int idx) {
+    return (Integer) getFieldValue(idx);
+  }
+
+  public double getDouble(int idx) {
+    return (Double) getFieldValue(idx);
+  }
+
+  public long getLong(int idx) {
+    return (Long) getFieldValue(idx);
+  }
+
+  public String getString(int idx) {
+    return (String) getFieldValue(idx);
+  }
+
+  public Date getDate(int idx) {
+    return (Date) getFieldValue(idx);
+  }
+
+  public Object getFieldValue(String fieldName) {
+    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+  }
+
+  public Object getFieldValue(int fieldIdx) {
+    if(nullFields.contains(fieldIdx)){
+      return null;
+    }
+
+    Object fieldValue = dataValues.get(fieldIdx);
+    SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
+
+    switch (fieldType) {
+    case INTEGER:
+    case SMALLINT:
+    case TINYINT:
+      if(!(fieldValue instanceof Integer)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }else{
+        return Integer.valueOf(fieldValue.toString());
+      }
+    case DOUBLE:
+      if(!(fieldValue instanceof Double)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }else{
+        return Double.valueOf(fieldValue.toString());
+      }
+    case BIGINT:
+      if(!(fieldValue instanceof Long)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }else{
+        return Long.valueOf(fieldValue.toString());
+      }
+    case FLOAT:
+      if(!(fieldValue instanceof Float)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }else{
+        return Float.valueOf(fieldValue.toString());
+      }
+    case VARCHAR:
+      if(!(fieldValue instanceof String)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }else{
+        return fieldValue.toString();
+      }
+    case TIME:
+    case TIMESTAMP:
+      if(!(fieldValue instanceof Date)){
+        throw new InvalidFieldException(String.format("[%s] doesn't match type 
[%s]", fieldValue, fieldType));
+      }else{
+        return fieldValue;
+      }
+    default:
+      throw new UnsupportedDataTypeException(fieldType);
+    }
+  }
+
+  public int size() {
+    return dataValues.size();
+  }
+
+  public List<Object> getDataValues() {
+    return dataValues;
+  }
+
+  public void setDataValues(List<Object> dataValues) {
+    this.dataValues = dataValues;
+  }
+
+  public BeamSQLRecordType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(BeamSQLRecordType dataType) {
+    this.dataType = dataType;
+  }
+
+  public void setNullFields(List<Integer> nullFields) {
+    this.nullFields = nullFields;
+  }
+
+  public List<Integer> getNullFields() {
+    return nullFields;
+  }
+
+  @Override
+  public String toString() {
+    return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + 
"]";
+  }
+
+  /**
+   * Return data fields as key=value.
+   */
+  public String valueInString() {
+    StringBuffer sb = new StringBuffer();
+    for (int idx = 0; idx < size(); ++idx) {
+      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), 
getFieldValue(idx)));
+    }
+    return sb.substring(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    BeamSQLRow other = (BeamSQLRow) obj;
+    return toString().equals(other.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
new file mode 100644
index 0000000..00af18d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A {@link Coder} encodes {@link BeamSQLRow}.
+ *
+ */
+public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
+  private static final BeamSQLRecordTypeCoder recordTypeCoder = 
BeamSQLRecordTypeCoder.of();
+
+  private static final ListCoder<Integer> listCoder = 
ListCoder.of(BigEndianIntegerCoder.of());
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final BigEndianIntegerCoder intCoder = 
BigEndianIntegerCoder.of();
+  private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+  private static final DoubleCoder doubleCoder = DoubleCoder.of();
+
+  private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
+  private BeamSqlRowCoder(){}
+
+  public static BeamSqlRowCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(BeamSQLRow value, OutputStream outStream,
+      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, 
IOException {
+    recordTypeCoder.encode(value.getDataType(), outStream, context);
+    listCoder.encode(value.getNullFields(), outStream, context);
+
+    Context nested = context.nested();
+
+    for (int idx = 0; idx < value.size(); ++idx) {
+      if(value.getNullFields().contains(idx)){
+        continue;
+      }
+
+      switch (value.getDataType().getFieldsType().get(idx)) {
+      case INTEGER:
+      case SMALLINT:
+      case TINYINT:
+        intCoder.encode(value.getInteger(idx), outStream, nested);
+        break;
+      case DOUBLE:
+      case FLOAT:
+        doubleCoder.encode(value.getDouble(idx), outStream, nested);
+        break;
+      case BIGINT:
+        longCoder.encode(value.getLong(idx), outStream, nested);
+        break;
+      case VARCHAR:
+        stringCoder.encode(value.getString(idx), outStream, nested);
+        break;
+      case TIME:
+      case TIMESTAMP:
+        longCoder.encode(value.getDate(idx).getTime(), outStream, nested);
+        break;
+
+      default:
+        throw new 
UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
+      }
+    }
+  }
+
+  @Override
+  public BeamSQLRow decode(InputStream inStream, 
org.apache.beam.sdk.coders.Coder.Context context)
+      throws CoderException, IOException {
+    BeamSQLRecordType type = recordTypeCoder.decode(inStream, context);
+    List<Integer> nullFields = listCoder.decode(inStream, context);
+
+    BeamSQLRow record = new BeamSQLRow(type);
+    record.setNullFields(nullFields);
+
+    for (int idx = 0; idx < type.size(); ++idx) {
+      if(nullFields.contains(idx)){
+        continue;
+      }
+
+      switch (type.getFieldsType().get(idx)) {
+      case INTEGER:
+      case SMALLINT:
+      case TINYINT:
+        record.addField(idx, intCoder.decode(inStream, context));
+        break;
+      case DOUBLE:
+      case FLOAT:
+        record.addField(idx, doubleCoder.decode(inStream, context));
+        break;
+      case BIGINT:
+        record.addField(idx, longCoder.decode(inStream, context));
+        break;
+      case VARCHAR:
+        record.addField(idx, stringCoder.decode(inStream, context));
+        break;
+      case TIME:
+      case TIMESTAMP:
+        record.addField(idx, new Date(longCoder.decode(inStream, context)));
+        break;
+
+      default:
+        throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
+      }
+    }
+
+    return record;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic()
+      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
new file mode 100644
index 0000000..6240426
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+public class InvalidFieldException extends RuntimeException {
+
+  public InvalidFieldException() {
+    super();
+  }
+
+  public InvalidFieldException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
new file mode 100644
index 0000000..9a2235e
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class UnsupportedDataTypeException extends RuntimeException {
+
+  public UnsupportedDataTypeException(SqlTypeName unsupportedType){
+    super(String.format("Not support data type [%s]", unsupportedType));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..2570763
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 4754022536543333984L;
+
+  public static final String DELIMITER = ",";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamKafkaCSVTable.class);
+
+  public BeamKafkaCSVTable(RelProtoDataType protoRowType, String 
bootstrapServers,
+      List<String> topics) {
+    super(protoRowType, bootstrapServers, topics);
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
+      getPTransformForInput() {
+    return new CsvRecorderDecoder(beamSqlRecordType);
+  }
+
+  @Override
+  public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
+      getPTransformForOutput() {
+    return new CsvRecorderEncoder(beamSqlRecordType);
+  }
+
+  /**
+   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}.
+   *
+   */
+  public static class CsvRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSQLRow>> {
+    private BeamSQLRecordType recordType;
+
+    public CsvRecorderDecoder(BeamSQLRecordType recordType) {
+      this.recordType = recordType;
+    }
+
+    @Override
+    public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> 
input) {
+      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, 
BeamSQLRow>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          String rowInString = new String(c.element().getValue());
+          String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER);
+          if (parts.length != recordType.size()) {
+            LOG.error(String.format("invalid record: ", rowInString));
+          } else {
+            BeamSQLRow sourceRecord = new BeamSQLRow(recordType);
+            for (int idx = 0; idx < parts.length; ++idx) {
+              sourceRecord.addField(idx, parts[idx]);
+            }
+            c.output(sourceRecord);
+          }
+        }
+      }));
+    }
+  }
+
+  /**
+   * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}.
+   *
+   */
+  public static class CsvRecorderEncoder
+      extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], 
byte[]>>> {
+    private BeamSQLRecordType recordType;
+
+    public CsvRecorderEncoder(BeamSQLRecordType recordType) {
+      this.recordType = recordType;
+    }
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> 
input) {
+      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, 
KV<byte[], byte[]>>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          BeamSQLRow in = c.element();
+          StringBuffer sb = new StringBuffer();
+          for (int idx = 0; idx < in.size(); ++idx) {
+            sb.append(DELIMITER);
+            sb.append(in.getFieldValue(idx).toString());
+          }
+          c.output(KV.of(new byte[] {}, sb.substring(1).getBytes()));
+        }
+      }));
+
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..482383b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamIOType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements 
Serializable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = -634715473399906527L;
+
+  private String bootstrapServers;
+  private List<String> topics;
+  private Map<String, Object> configUpdates;
+
+  protected BeamKafkaTable(RelProtoDataType protoRowType) {
+    super(protoRowType);
+  }
+
+  public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers,
+      List<String> topics) {
+    super(protoRowType);
+    this.bootstrapServers = bootstrapServers;
+    this.topics = topics;
+  }
+
+  public BeamKafkaTable updateConsumerProperties(Map<String, Object> 
configUpdates) {
+    this.configUpdates = configUpdates;
+    return this;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  public abstract PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSQLRow>>
+      getPTransformForInput();
+
+  public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], 
byte[]>>>
+      getPTransformForOutput();
+
+  @Override
+  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
+    return new PTransform<PBegin, PCollection<BeamSQLRow>>() {
+
+      @Override
+      public PCollection<BeamSQLRow> expand(PBegin input) {
+        return input.apply("read",
+            KafkaIO.<byte[], 
byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
+                
.updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
+                .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
+            .apply("in_format", getPTransformForInput());
+
+      }
+    };
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+    checkArgument(topics != null && topics.size() == 1,
+        "Only one topic can be acceptable as output.");
+
+    return new PTransform<PCollection<BeamSQLRow>, PDone>() {
+      @Override
+      public PDone expand(PCollection<BeamSQLRow> input) {
+        return input.apply("out_reformat", 
getPTransformForOutput()).apply("persistent",
+            KafkaIO.<byte[], 
byte[]>write().withBootstrapServers(bootstrapServers)
+                .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of())
+                .withValueCoder(ByteArrayCoder.of()));
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
new file mode 100644
index 0000000..822fce7
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.beam.dsls.sql.schema.kafka;

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
new file mode 100644
index 0000000..ef9cc7d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.beam.dsls.sql.schema;

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
new file mode 100644
index 0000000..06db280
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.rel.BeamFilterRel;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step.
+ *
+ */
+public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> {
+  /**
+   *
+   */
+  private static final long serialVersionUID = -1256111753670606705L;
+
+  private String stepName;
+  private BeamSQLExpressionExecutor executor;
+
+  public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) {
+    super();
+    this.stepName = stepName;
+    this.executor = executor;
+  }
+
+  @Setup
+  public void setup() {
+    executor.prepare();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    BeamSQLRow in = c.element();
+
+    List<Object> result = executor.execute(in);
+
+    if ((Boolean) result.get(0)) {
+      c.output(in);
+    }
+  }
+
+  @Teardown
+  public void close() {
+    executor.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
 
b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
new file mode 100644
index 0000000..1014c0d
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * A test PTransform to display output in console.
+ *
+ */
+public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> {
+  /**
+   *
+   */
+  private static final long serialVersionUID = -1256111753670606705L;
+
+  private String stepName;
+
+  public BeamSQLOutputToConsoleFn(String stepName) {
+    super();
+    this.stepName = stepName;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    System.out.println("Output: " + c.element().getDataValues());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
new file mode 100644
index 0000000..12061d2
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.rel.BeamProjectRel;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ *
+ * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step.
+ *
+ */
+public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = -1046605249999014608L;
+  private String stepName;
+  private BeamSQLExpressionExecutor executor;
+  private BeamSQLRecordType outputRecordType;
+
+  public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor,
+      BeamSQLRecordType outputRecordType) {
+    super();
+    this.stepName = stepName;
+    this.executor = executor;
+    this.outputRecordType = outputRecordType;
+  }
+
+  @Setup
+  public void setup() {
+    executor.prepare();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    List<Object> results = executor.execute(c.element());
+
+    BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
+    for (int idx = 0; idx < results.size(); ++idx) {
+      outRow.addField(idx, results.get(idx));
+    }
+
+    c.output(outRow);
+  }
+
+  @Teardown
+  public void close() {
+    executor.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java 
b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
new file mode 100644
index 0000000..2607abf
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL 
pipeline.
+ */
+package org.beam.dsls.sql.transform;

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/resources/log4j.properties 
b/dsls/sql/src/main/resources/log4j.properties
new file mode 100644
index 0000000..709484b
--- /dev/null
+++ b/dsls/sql/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
new file mode 100644
index 0000000..56e45c4
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.junit.BeforeClass;
+
+/**
+ * prepare {@code BeamSqlRunner} for test.
+ *
+ */
+public class BasePlanner {
+  public static BeamSqlRunner runner = new BeamSqlRunner();
+
+  @BeforeClass
+  public static void prepare() {
+    runner.addTable("ORDER_DETAILS", getTable());
+    runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+    runner.addTable("SUB_ORDER_RAM", getTable());
+  }
+
+  private static BaseBeamTable getTable() {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", 
SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).add("order_time", 
SqlTypeName.TIMESTAMP).build();
+      }
+    };
+
+    return new MockedBeamSQLTable(protoRowType);
+  }
+
+  public static BaseBeamTable getTable(String bootstrapServer, String topic) {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", 
SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).add("order_time", 
SqlTypeName.TIMESTAMP).build();
+      }
+    };
+
+    Map<String, Object> consumerPara = new HashMap<String, Object>();
+    consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+    return new BeamKafkaCSVTable(protoRowType, bootstrapServer, 
Arrays.asList(topic))
+        .updateConsumerProperties(consumerPara);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
new file mode 100644
index 0000000..a77878f
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to explain queries.
+ *
+ */
+public class BeamPlannerExplainTest extends BasePlanner {
+
+  @Test
+  public void selectAll() throws Exception {
+    String sql = "SELECT * FROM ORDER_DETAILS";
+    String plan = runner.explainQuery(sql);
+
+    String expectedPlan =
+        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[$3])\n"
+        + "  BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+  }
+
+  @Test
+  public void selectWithFilter() throws Exception {
+    String sql = "SELECT " + " order_id, site_id, price " + "FROM 
ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 and price > 20";
+    String plan = runner.explainQuery(sql);
+
+    String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], 
price=[$2])\n"
+        + "  BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+        + "    BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+  }
+
+  @Test
+  public void insertSelectFilter() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
+        + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 and price > 20";
+    String plan = runner.explainQuery(sql);
+
+    String expectedPlan =
+        "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], 
flattened=[true])\n"
+        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[null])\n"
+        + "    BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+        + "      BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+        + "        BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
new file mode 100644
index 0000000..eb097a9
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to execute a query.
+ *
+ */
+public class BeamPlannerSubmitTest extends BasePlanner {
+  @Test
+  public void insertSelectFilter() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+        + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE 
SITE_ID = 0 and price > 20";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    runner.getPlanner().planner.close();
+
+    pipeline.run().waitUntilFinish();
+
+    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+    Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", 
MockedBeamSQLTable.CONTENT.get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
new file mode 100644
index 0000000..31f5578
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamIOType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * A mock table use to check input/output.
+ *
+ */
+public class MockedBeamSQLTable extends BaseBeamTable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1373168368414036932L;
+
+  public static final List<String> CONTENT = new ArrayList<>();
+
+  public MockedBeamSQLTable(RelProtoDataType protoRowType) {
+    super(protoRowType);
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  @Override
+  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
+    BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
+    row1.addField(0, 12345L);
+    row1.addField(1, 0);
+    row1.addField(2, 10.5);
+    row1.addField(3, new Date());
+
+    BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
+    row2.addField(0, 12345L);
+    row2.addField(1, 1);
+    row2.addField(2, 20.5);
+    row2.addField(3, new Date());
+
+    BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
+    row3.addField(0, 12345L);
+    row3.addField(1, 0);
+    row3.addField(2, 20.5);
+    row3.addField(3, new Date());
+
+    BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
+    row4.addField(0, null);
+    row4.addField(1, null);
+    row4.addField(2, 20.5);
+    row4.addField(3, new Date());
+
+    return Create.of(row1, row2, row3);
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+    return new OutputStore();
+  }
+
+  /**
+   * Keep output in {@code CONTENT} for validation.
+   *
+   */
+  public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, 
PDone> {
+
+    @Override
+    public PDone expand(PCollection<BeamSQLRow> input) {
+      input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() {
+
+        @Setup
+        public void setup() {
+          CONTENT.clear();
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          CONTENT.add(c.element().valueInString());
+        }
+
+        @Teardown
+        public void close() {
+
+        }
+
+      }));
+      return PDone.in(input.getPipeline());
+    }
+
+  }
+
+}

Reply via email to