MockedBeamSqlTable -> MockedBoundedTable

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc66698e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc66698e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc66698e

Branch: refs/heads/DSL_SQL
Commit: bc66698e6880c7788bcea78006c67bfca66b17ce
Parents: 2149719
Author: James Xu <xumingmi...@gmail.com>
Authored: Fri Jun 30 14:54:26 2017 +0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 5 09:33:53 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/dsls/sql/TestUtils.java     |  81 +++++++---
 .../beam/dsls/sql/mock/MockedBoundedTable.java  | 126 +++++++++++++++
 .../apache/beam/dsls/sql/mock/MockedTable.java  |  42 +++++
 .../dsls/sql/mock/MockedUnboundedTable.java     | 113 +++++++++++++
 .../dsls/sql/planner/MockedBeamSqlTable.java    | 162 -------------------
 .../beam/dsls/sql/planner/MockedTable.java      |  33 ----
 .../dsls/sql/planner/MockedUnboundedTable.java  | 120 --------------
 .../beam/dsls/sql/rel/BeamIntersectRelTest.java |  78 ++++-----
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    | 141 ++++++++--------
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  |  21 ++-
 .../BeamJoinRelUnboundedVsUnboundedTest.java    |  10 +-
 .../beam/dsls/sql/rel/BeamMinusRelTest.java     |  77 +++++----
 .../sql/rel/BeamSetOperatorRelBaseTest.java     |  68 +++-----
 .../beam/dsls/sql/rel/BeamSortRelTest.java      | 161 +++++++++---------
 .../beam/dsls/sql/rel/BeamUnionRelTest.java     |  47 +++---
 .../beam/dsls/sql/rel/BeamValuesRelTest.java    |  72 +++++----
 16 files changed, 691 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 375027a..cfad333 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -20,7 +20,6 @@ package org.apache.beam.dsls.sql;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -62,7 +61,7 @@ public class TestUtils {
    *   Types.INTEGER, "order_id",
    *   Types.INTEGER, "sum_site_id",
    *   Types.VARCHAR, "buyer"
-   * ).values(
+   * ).addRows(
    *   1, 3, "james",
    *   2, 5, "bond"
    *   ).getStringRows()
@@ -81,15 +80,7 @@ public class TestUtils {
      * @args pairs of column type and column names.
      */
     public static RowsBuilder of(final Object... args) {
-      List<Integer> types = new ArrayList<>();
-      List<String> names = new ArrayList<>();
-      int lastTypeIndex = 0;
-      for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
-        types.add((int) args[lastTypeIndex]);
-        names.add((String) args[lastTypeIndex + 1]);
-      }
-
-      BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, 
types);
+      BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args);
       RowsBuilder builder = new RowsBuilder();
       builder.type = beamSQLRecordType;
 
@@ -97,20 +88,12 @@ public class TestUtils {
     }
 
     /**
-     * Add values to the builder.
+     * Add rows to the builder.
      *
      * <p>Note: check the class javadoc for for detailed example.
      */
-    public RowsBuilder values(final Object... args) {
-      int fieldCount = type.size();
-      for (int i = 0; i < args.length; i += fieldCount) {
-        BeamSqlRow row = new BeamSqlRow(type);
-        for (int j = 0; j < fieldCount; j++) {
-          row.addField(j, args[i + j]);
-        }
-        this.rows.add(row);
-      }
-
+    public RowsBuilder addRows(final Object... args) {
+      this.rows.addAll(buildRows(type, args));
       return this;
     }
 
@@ -122,4 +105,58 @@ public class TestUtils {
       return beamSqlRows2Strings(rows);
     }
   }
+
+  /**
+   * Convenient way to build a {@code BeamSqlRecordType}.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   *   buildBeamSqlRecordType(
+   *       Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time"
+   *   )
+   * }</pre>
+   */
+  public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) {
+    List<Integer> types = new ArrayList<>();
+    List<String> names = new ArrayList<>();
+
+    for (int i = 0; i < args.length - 1; i += 2) {
+      types.add((int) args[i]);
+      names.add((String) args[i + 1]);
+    }
+
+    return BeamSqlRecordType.create(names, types);
+  }
+
+  /**
+   * Convenient way to build a {@code BeamSqlRow}s.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   *   buildRows(
+   *       recordType,
+   *       1, 1, 1, // the first row
+   *       2, 2, 2, // the second row
+   *       ...
+   *   )
+   * }</pre>
+   */
+  public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, Object... 
args) {
+    List<BeamSqlRow> rows = new ArrayList<>();
+    int fieldCount = type.size();
+
+    for (int i = 0; i < args.length; i += fieldCount) {
+      BeamSqlRow row = new BeamSqlRow(type);
+      for (int j = 0; j < fieldCount; j++) {
+        row.addField(j, args[i + j]);
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
new file mode 100644
index 0000000..0fb8a80
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.mock;
+
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildRows;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Mocked table for bounded data sources.
+ */
+public class MockedBoundedTable extends MockedTable {
+  /** rows written to this table. */
+  private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new 
ConcurrentLinkedQueue<>();
+  /** rows flow out from this table. */
+  private final List<BeamSqlRow> rows = new ArrayList<>();
+
+  public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
+  }
+
+  /**
+   * Convenient way to build a mocked bounded table.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   * MockedUnboundedTable
+   *   .of(Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time")
+   * }</pre>
+   */
+  public static MockedBoundedTable of(final Object... args){
+    return new MockedBoundedTable(buildBeamSqlRecordType(args));
+  }
+
+
+  /**
+   * Add rows to the builder.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>{@code
+   * addRows(
+   *   1, 3, "james", -- first row
+   *   2, 5, "bond"   -- second row
+   *   ...
+   * )
+   * }</pre>
+   */
+  public MockedBoundedTable addRows(Object... args) {
+    List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+    this.rows.addAll(rows);
+    return this;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.BOUNDED;
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply(
+        "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), 
Create.of(rows));
+  }
+
+  @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> 
buildIOWriter() {
+    return new OutputStore();
+  }
+
+  /**
+   * Keep output in {@code CONTENT} for validation.
+   *
+   */
+  public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, 
PDone> {
+
+    @Override
+    public PDone expand(PCollection<BeamSqlRow> input) {
+      input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          CONTENT.add(c.element());
+        }
+
+        @Teardown
+        public void close() {
+          CONTENT.clear();
+        }
+
+      }));
+      return PDone.in(input.getPipeline());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
new file mode 100644
index 0000000..eed740a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.mock;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Base class for mocked table.
+ */
+public abstract class MockedTable extends BaseBeamTable {
+  public static final AtomicInteger COUNTER = new AtomicInteger();
+  public MockedTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    throw new UnsupportedOperationException("buildIOWriter unsupported!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
new file mode 100644
index 0000000..12d8d37
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.mock;
+
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildRows;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.calcite.util.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A mocked unbounded table.
+ */
+public class MockedUnboundedTable extends MockedTable {
+  /** rows flow out from this table with the specified watermark instant. */
+  private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new 
ArrayList<>();
+  /** specify the index of column in the row which stands for the event time 
field. */
+  private int timestampField;
+  private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
+  }
+
+  /**
+   * Convenient way to build a mocked unbounded table.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   * MockedUnboundedTable
+   *   .of(Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time")
+   * }</pre>
+   */
+  public static MockedUnboundedTable of(final Object... args){
+    return new MockedUnboundedTable(buildBeamSqlRecordType(args));
+  }
+
+  public MockedUnboundedTable timestampColumnIndex(int idx) {
+    this.timestampField = idx;
+    return this;
+  }
+
+  /**
+   * Add rows to the builder.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>{@code
+   * addRows(
+   *   duration,      -- duration which stands for the corresponding watermark 
instant
+   *   1, 3, "james", -- first row
+   *   2, 5, "bond"   -- second row
+   *   ...
+   * )
+   * }</pre>
+   */
+  public MockedUnboundedTable addRows(Duration duration, Object... args) {
+    List<BeamSqlRow> rows = buildRows(getRecordType(), args);
+    // record the watermark + rows
+    this.timestampedRows.add(Pair.of(duration, rows));
+    return this;
+  }
+
+  @Override public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    TestStream.Builder<BeamSqlRow> values = TestStream.create(
+        new BeamSqlRowCoder(beamSqlRecordType));
+
+    for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+      values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
+      for (int i = 0; i < pair.getValue().size(); i++) {
+        values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
+            new Instant(pair.getValue().get(i).getDate(timestampField))));
+      }
+    }
+
+    return pipeline.begin().apply(
+        "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
+        values.advanceWatermarkToInfinity());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
deleted file mode 100644
index bb10369..0000000
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Mocked table for bounded data sources.
- */
-public class MockedBeamSqlTable extends BaseBeamTable {
-  private static final AtomicInteger COUNTER = new AtomicInteger();
-  private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new 
ConcurrentLinkedQueue<>();
-  private List<BeamSqlRow> inputRecords;
-  public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
-  }
-
-  /**
-   * Convenient way to build a mocked table with mock data:
-   *
-   * <p>e.g.
-   *
-   * <pre>{@code
-   * MockedBeamSqlTable
-   *   .of(SqlTypeName.BIGINT, "order_id",
-   *       SqlTypeName.INTEGER, "site_id",
-   *       SqlTypeName.DOUBLE, "price",
-   *       SqlTypeName.TIMESTAMP, "order_time",
-   *
-   *       1L, 2, 1.0, new Date(),
-   *       1L, 1, 2.0, new Date(),
-   *       2L, 4, 3.0, new Date(),
-   *       2L, 1, 4.0, new Date(),
-   *       5L, 5, 5.0, new Date(),
-   *       6L, 6, 6.0, new Date(),
-   *       7L, 7, 7.0, new Date(),
-   *       8L, 8888, 8.0, new Date(),
-   *       8L, 999, 9.0, new Date(),
-   *       10L, 100, 10.0, new Date())
-   * }</pre>
-   */
-  // FIXME: refactor this method
-  //        1) use Types rather than SqlTypeName
-  //        2) use RowsBuilder rather than duplicate the logic here
-  public static MockedBeamSqlTable of(final Object... args){
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        RelDataTypeFactory.FieldInfoBuilder builder = a0.builder();
-
-        int lastTypeIndex = 0;
-        for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
-          if (args[lastTypeIndex] instanceof SqlTypeName) {
-            builder.add(args[lastTypeIndex + 1].toString(),
-                (SqlTypeName) args[lastTypeIndex]);
-          } else {
-            break;
-          }
-        }
-        return builder.build();
-      }
-    };
-
-    List<BeamSqlRow> rows = new ArrayList<>();
-    BeamSqlRecordType beamSQLRecordType = CalciteUtils
-        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-    int fieldCount = beamSQLRecordType.size();
-
-    for (int i = fieldCount * 2; i < args.length; i += fieldCount) {
-      BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
-      for (int j = 0; j < fieldCount; j++) {
-        row.addField(j, args[i + j]);
-      }
-      rows.add(row);
-    }
-    MockedBeamSqlTable table = new MockedBeamSqlTable(beamSQLRecordType);
-    table.inputRecords = rows;
-
-    return table;
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return BeamIOType.BOUNDED;
-  }
-
-  @Override
-
-  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply(
-        "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), 
Create.of(inputRecords));
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
-    return new OutputStore();
-  }
-
-  public List<BeamSqlRow> getInputRecords() {
-    return inputRecords;
-  }
-
-  /**
-   * Keep output in {@code CONTENT} for validation.
-   *
-   */
-  public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, 
PDone> {
-
-    @Override
-    public PDone expand(PCollection<BeamSqlRow> input) {
-      input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          CONTENT.add(c.element());
-        }
-
-        @Teardown
-        public void close() {
-
-        }
-
-      }));
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
deleted file mode 100644
index d096a61..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-
-/**
- * Base class for mocked table.
- */
-public abstract class MockedTable extends BaseBeamTable {
-  public static final AtomicInteger COUNTER = new AtomicInteger();
-  public MockedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
deleted file mode 100644
index 3f22df3..0000000
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.calcite.util.Pair;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A mocked unbounded table.
- */
-public class MockedUnboundedTable extends MockedTable {
-  private List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new 
ArrayList<>();
-  private int timestampField;
-  private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
-  }
-
-  /**
-   * Convenient way to build a mocked table.
-   *
-   * <p>e.g.
-   *
-   * <pre>{@code
-   * MockedUnboundedTable
-   *   .of(Types.BIGINT, "order_id",
-   *       Types.INTEGER, "site_id",
-   *       Types.DOUBLE, "price",
-   *       Types.TIMESTAMP, "order_time")
-   * }</pre>
-   */
-  public static MockedUnboundedTable of(final Object... args){
-    List<Integer> types = new ArrayList<>();
-    List<String> names = new ArrayList<>();
-    int lastTypeIndex = 0;
-    for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
-      types.add((int) args[lastTypeIndex]);
-      names.add((String) args[lastTypeIndex + 1]);
-    }
-
-    return new MockedUnboundedTable(
-        BeamSqlRecordType.create(names, types)
-    );
-  }
-
-  public MockedUnboundedTable timestampColumnIndex(int idx) {
-    this.timestampField = idx;
-    return this;
-  }
-
-  public MockedUnboundedTable addRows(Duration duration, Object... args) {
-    List<BeamSqlRow> rows = new ArrayList<>();
-    int fieldCount = getRecordType().size();
-
-    for (int i = 0; i < args.length; i += fieldCount) {
-      BeamSqlRow row = new BeamSqlRow(getRecordType());
-      for (int j = 0; j < fieldCount; j++) {
-        row.addField(j, args[i + j]);
-      }
-      rows.add(row);
-    }
-
-    // record the watermark + rows
-    this.timestampedRows.add(Pair.of(duration, rows));
-    return this;
-  }
-
-  @Override public BeamIOType getSourceType() {
-    return BeamIOType.UNBOUNDED;
-  }
-
-  @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
-    TestStream.Builder<BeamSqlRow> values = TestStream.create(
-        new BeamSqlRowCoder(beamSqlRecordType));
-
-    for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
-      values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
-      for (int i = 0; i < pair.getValue().size(); i++) {
-        values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
-            new Instant(pair.getValue().get(i).getDate(timestampField))));
-      }
-    }
-
-    return pipeline.begin().apply(
-        "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
-        values.advanceWatermarkToInfinity());
-  }
-
-  @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> 
buildIOWriter() {
-    throw new 
UnsupportedOperationException("MockedUnboundedTable#buildIOWriter 
unsupported!");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
index 47fdc16..3b37143 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.sql.Types;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,29 +39,33 @@ public class BeamIntersectRelTest {
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-          1L, 1, 1.0,
-          1L, 1, 1.0,
-          2L, 2, 2.0,
-          4L, 4, 4.0
-      );
-
-  private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-          1L, 1, 1.0,
-          2L, 2, 2.0,
-          3L, 3, 3.0
-      );
 
   @BeforeClass
-  public static void setUp() {
-    sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
-    sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS1",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            4L, 4, 4.0
+        )
+    );
+
+    sqlEnv.registerTable("ORDER_DETAILS2",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            3L, 3, 3.0
+        )
+    );
   }
 
   @Test
@@ -74,14 +79,14 @@ public class BeamIntersectRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-        SqlTypeName.BIGINT, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.DOUBLE, "price",
-
-        1L, 1, 1.0,
-        2L, 2, 2.0
-    ).getInputRecords());
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getRows());
 
     pipeline.run().waitUntilFinish();
   }
@@ -99,14 +104,15 @@ public class BeamIntersectRelTest {
     PAssert.that(rows).satisfies(new CheckSize(3));
 
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 1, 1.0,
             1L, 1, 1.0,
             2L, 2, 2.0
-        ).getInputRecords());
+        ).getRows());
 
     pipeline.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
index 505b742..d15cb81 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.sql.Types;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,24 +42,28 @@ public class BeamJoinRelBoundedVsBoundedTest {
   @BeforeClass
   public static void prepare() {
     beamSqlEnv.registerTable("ORDER_DETAILS",
-        MockedBeamSqlTable
-            .of(SqlTypeName.INTEGER, "order_id",
-                SqlTypeName.INTEGER, "site_id",
-                SqlTypeName.INTEGER, "price",
-
-                1, 2, 3,
-                2, 3, 3,
-                3, 4, 5));
+        MockedBoundedTable.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price"
+        ).addRows(
+            1, 2, 3,
+            2, 3, 3,
+            3, 4, 5
+        )
+    );
 
     beamSqlEnv.registerTable("ORDER_DETAILS0",
-        MockedBeamSqlTable
-            .of(SqlTypeName.INTEGER, "order_id0",
-                SqlTypeName.INTEGER, "site_id0",
-                SqlTypeName.INTEGER, "price0",
-
-                1, 2, 3,
-                2, 3, 3,
-                3, 4, 5));
+        MockedBoundedTable.of(
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            1, 2, 3,
+            2, 3, 3,
+            3, 4, 5
+        )
+    );
 
   }
 
@@ -73,16 +78,17 @@ public class BeamJoinRelBoundedVsBoundedTest {
         ;
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
beamSqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.INTEGER, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.INTEGER, "price",
-        SqlTypeName.INTEGER, "order_id0",
-        SqlTypeName.INTEGER, "site_id0",
-        SqlTypeName.INTEGER, "price0",
-
-        2, 3, 3, 1, 2, 3
-        ).getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            2, 3, 3, 1, 2, 3
+        ).getRows());
     pipeline.run();
   }
 
@@ -98,18 +104,19 @@ public class BeamJoinRelBoundedVsBoundedTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
beamSqlEnv);
     pipeline.enableAbandonedNodeEnforcement(false);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.INTEGER, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.INTEGER, "price",
-        SqlTypeName.INTEGER, "order_id0",
-        SqlTypeName.INTEGER, "site_id0",
-        SqlTypeName.INTEGER, "price0",
-
-        1, 2, 3, null, null, null,
-        2, 3, 3, 1, 2, 3,
-        3, 4, 5, null, null, null
-    ).getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            1, 2, 3, null, null, null,
+            2, 3, 3, 1, 2, 3,
+            3, 4, 5, null, null, null
+        ).getRows());
     pipeline.run();
   }
 
@@ -124,18 +131,19 @@ public class BeamJoinRelBoundedVsBoundedTest {
         ;
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
beamSqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.INTEGER, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.INTEGER, "price",
-        SqlTypeName.INTEGER, "order_id0",
-        SqlTypeName.INTEGER, "site_id0",
-        SqlTypeName.INTEGER, "price0",
-
-        2, 3, 3, 1, 2, 3,
-        null, null, null, 2, 3, 3,
-        null, null, null, 3, 4, 5
-    ).getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.INTEGER, "order_id0",
+            Types.INTEGER, "site_id0",
+            Types.INTEGER, "price0"
+        ).addRows(
+            2, 3, 3, 1, 2, 3,
+            null, null, null, 2, 3, 3,
+            null, null, null, 3, 4, 5
+        ).getRows());
     pipeline.run();
   }
 
@@ -150,20 +158,21 @@ public class BeamJoinRelBoundedVsBoundedTest {
         ;
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
beamSqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.INTEGER, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.INTEGER, "price",
-        SqlTypeName.INTEGER, "order_id0",
-        SqlTypeName.INTEGER, "site_id0",
-        SqlTypeName.INTEGER, "price0",
-
-        2, 3, 3, 1, 2, 3,
-        1, 2, 3, null, null, null,
-        3, 4, 5, null, null, null,
-        null, null, null, 2, 3, 3,
-        null, null, null, 3, 4, 5
-    ).getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+          Types.INTEGER, "order_id",
+          Types.INTEGER, "site_id",
+          Types.INTEGER, "price",
+          Types.INTEGER, "order_id0",
+          Types.INTEGER, "site_id0",
+          Types.INTEGER, "price0"
+        ).addRows(
+          2, 3, 3, 1, 2, 3,
+          1, 2, 3, null, null, null,
+          3, 4, 5, null, null, null,
+          null, null, null, 2, 3, 3,
+          null, null, null, 3, 4, 5
+        ).getRows());
     pipeline.run();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 2ddb00b..3f0c98e 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -23,15 +23,14 @@ import java.util.Date;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
-import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.mock.MockedUnboundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.joda.time.Duration;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -79,10 +78,10 @@ public class BeamJoinRelUnboundedVsBoundedTest {
         )
     );
 
-    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable
-        .of(SqlTypeName.INTEGER, "order_id",
-            SqlTypeName.VARCHAR, "buyer",
-
+    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable
+        .of(Types.INTEGER, "order_id",
+            Types.VARCHAR, "buyer"
+        ).addRows(
             1, "james",
             2, "bond"
         ));
@@ -106,7 +105,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
                 Types.INTEGER, "order_id",
                 Types.INTEGER, "sum_site_id",
                 Types.VARCHAR, "buyer"
-            ).values(
+            ).addRows(
                 1, 3, "james",
                 2, 5, "bond"
             ).getStringRows()
@@ -132,7 +131,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
                 Types.INTEGER, "order_id",
                 Types.INTEGER, "sum_site_id",
                 Types.VARCHAR, "buyer"
-            ).values(
+            ).addRows(
                 1, 3, "james",
                 2, 5, "bond"
             ).getStringRows()
@@ -159,7 +158,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
                 Types.INTEGER, "order_id",
                 Types.INTEGER, "sum_site_id",
                 Types.VARCHAR, "buyer"
-            ).values(
+            ).addRows(
                 1, 3, "james",
                 2, 5, "bond",
                 3, 3, null
@@ -200,7 +199,7 @@ public class BeamJoinRelUnboundedVsBoundedTest {
                 Types.INTEGER, "order_id",
                 Types.INTEGER, "sum_site_id",
                 Types.VARCHAR, "buyer"
-            ).values(
+            ).addRows(
                 1, 3, "james",
                 2, 5, "bond",
                 3, 3, null

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 18a5f60..d76e875 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -23,7 +23,7 @@ import java.util.Date;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.mock.MockedUnboundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.testing.PAssert;
@@ -95,7 +95,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
                 Types.INTEGER, "order_id",
                 Types.INTEGER, "sum_site_id",
                 Types.INTEGER, "order_id0",
-                Types.INTEGER, "sum_site_id0").values(
+                Types.INTEGER, "sum_site_id0").addRows(
                 1, 3, 1, 3,
                 2, 5, 2, 5
             ).getStringRows()
@@ -129,7 +129,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
                 Types.INTEGER, "sum_site_id",
                 Types.INTEGER, "order_id0",
                 Types.INTEGER, "sum_site_id0"
-            ).values(
+            ).addRows(
                 1, 1, 1, 3,
                 2, 2, null, null,
                 2, 2, 2, 5,
@@ -159,7 +159,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
                 Types.INTEGER, "sum_site_id",
                 Types.INTEGER, "order_id0",
                 Types.INTEGER, "sum_site_id0"
-            ).values(
+            ).addRows(
                 1, 3, 1, 1,
                 null, null, 2, 2,
                 2, 5, 2, 2,
@@ -190,7 +190,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest {
                 Types.INTEGER, "sum_site_id",
                 Types.INTEGER, "order_id",
                 Types.INTEGER, "sum_site_id0"
-            ).values(
+            ).addRows(
                 1, 1, 1, 3,
                 6, 2, null, null,
                 7, 2, null, null,

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
index bb5e7ee..80da8fb 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.sql.Types;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -38,30 +39,34 @@ public class BeamMinusRelTest {
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-          1L, 1, 1.0,
-          1L, 1, 1.0,
-          2L, 2, 2.0,
-          4L, 4, 4.0,
-          4L, 4, 4.0
-      );
 
-  private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-          1L, 1, 1.0,
-          2L, 2, 2.0,
-          3L, 3, 3.0
-      );
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS1",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            4L, 4, 4.0,
+            4L, 4, 4.0
+        )
+    );
 
-  @Before
-  public void setUp() {
-    sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
-    sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+    sqlEnv.registerTable("ORDER_DETAILS2",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            3L, 3, 3.0
+        )
+    );
   }
 
   @Test
@@ -75,12 +80,13 @@ public class BeamMinusRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-        SqlTypeName.BIGINT, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.DOUBLE, "price",
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             4L, 4, 4.0
-    ).getInputRecords());
+        ).getRows());
 
     pipeline.run();
   }
@@ -98,13 +104,14 @@ public class BeamMinusRelTest {
     PAssert.that(rows).satisfies(new CheckSize(2));
 
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             4L, 4, 4.0,
             4L, 4, 4.0
-        ).getInputRecords());
+        ).getRows());
 
     pipeline.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
index f10a767..d0b01df 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -18,22 +18,19 @@
 
 package org.apache.beam.dsls.sql.rel;
 
-import java.util.ArrayList;
+import java.sql.Types;
 import java.util.Date;
-import java.util.List;
-
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -46,20 +43,21 @@ public class BeamSetOperatorRelBaseTest {
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  public static final Date THE_DATE = new Date();
-  private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-          SqlTypeName.TIMESTAMP, "order_time",
-
-          1L, 1, 1.0, THE_DATE,
-          2L, 2, 2.0, THE_DATE);
+  public static final Date THE_DATE = new Date(100000);
 
   @BeforeClass
   public static void prepare() {
-    THE_DATE.setTime(100000);
-    sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price",
+            Types.TIMESTAMP, "order_time"
+        ).addRows(
+            1L, 1, 1.0, THE_DATE,
+            2L, 2, 2.0, THE_DATE
+        )
+    );
   }
 
   @Test
@@ -74,17 +72,17 @@ public class BeamSetOperatorRelBaseTest {
         + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
-    List<BeamSqlRow> expRows =
-        MockedBeamSqlTable.of(
-        SqlTypeName.BIGINT, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.BIGINT, "cnt",
-
-        1L, 1, 1L,
-        2L, 2, 1L
-    ).getInputRecords();
     // compare valueInString to ignore the windowStart & windowEnd
-    PAssert.that(rows.apply(ParDo.of(new 
ToString()))).containsInAnyOrder(toString(expRows));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.BIGINT, "order_id",
+                Types.INTEGER, "site_id",
+                Types.BIGINT, "cnt"
+            ).addRows(
+                1L, 1, 1L,
+                2L, 2, 1L
+            ).getStringRows());
     pipeline.run();
   }
 
@@ -105,20 +103,4 @@ public class BeamSetOperatorRelBaseTest {
     BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
     pipeline.run();
   }
-
-  static class ToString extends DoFn<BeamSqlRow, String> {
-    @ProcessElement
-    public void processElement(ProcessContext ctx) {
-      ctx.output(ctx.element().valueInString());
-    }
-  }
-
-  static List<String> toString (List<BeamSqlRow> rows) {
-    List<String> strs = new ArrayList<>();
-    for (BeamSqlRow row : rows) {
-      strs.add(row.valueInString());
-    }
-
-    return strs;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index d5c18fc..1067926 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.sql.Types;
 import java.util.Date;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -40,27 +41,35 @@ public class BeamSortRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
-  private static MockedBeamSqlTable subOrderRamTable = MockedBeamSqlTable.of(
-      SqlTypeName.BIGINT, "order_id",
-      SqlTypeName.INTEGER, "site_id",
-      SqlTypeName.DOUBLE, "price");
-
-  private static MockedBeamSqlTable orderDetailTable = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-          SqlTypeName.TIMESTAMP, "order_time",
-
-          1L, 2, 1.0, new Date(),
-          1L, 1, 2.0, new Date(),
-          2L, 4, 3.0, new Date(),
-          2L, 1, 4.0, new Date(),
-          5L, 5, 5.0, new Date(),
-          6L, 6, 6.0, new Date(),
-          7L, 7, 7.0, new Date(),
-          8L, 8888, 8.0, new Date(),
-          8L, 999, 9.0, new Date(),
-          10L, 100, 10.0, new Date());
+  @Before
+  public void prepare() {
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price",
+            Types.TIMESTAMP, "order_time"
+        ).addRows(
+            1L, 2, 1.0, new Date(),
+            1L, 1, 2.0, new Date(),
+            2L, 4, 3.0, new Date(),
+            2L, 1, 4.0, new Date(),
+            5L, 5, 5.0, new Date(),
+            6L, 6, 6.0, new Date(),
+            7L, 7, 7.0, new Date(),
+            8L, 8888, 8.0, new Date(),
+            8L, 999, 9.0, new Date(),
+            10L, 100, 10.0, new Date()
+        )
+    );
+    sqlEnv.registerTable("SUB_ORDER_RAM",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        )
+    );
+  }
 
   @Test
   public void testOrderBy_basic() throws Exception {
@@ -70,34 +79,38 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc, site_id desc limit 4";
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.BIGINT, "order_id",
-        SqlTypeName.INTEGER, "site_id",
-        SqlTypeName.DOUBLE, "price",
+    PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
+        Types.BIGINT, "order_id",
+        Types.INTEGER, "site_id",
+        Types.DOUBLE, "price"
+    ).addRows(
         1L, 2, 1.0,
         1L, 1, 2.0,
         2L, 4, 3.0,
         2L, 1, 4.0
-    ).getInputRecords());
+    ).getRows());
     pipeline.run().waitUntilFinish();
   }
 
   @Test
   public void testOrderBy_nullsFirst() throws Exception {
-    sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
-        .of(SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 2, 1.0,
             1L, null, 2.0,
             2L, 1, 3.0,
             2L, null, 4.0,
-            5L, 5, 5.0));
-    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
-        .of(SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price"));
+            5L, 5, 5.0
+        )
+    );
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"));
 
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
         + " order_id, site_id, price "
@@ -106,36 +119,36 @@ public class BeamSortRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, null, 2.0,
             1L, 2, 1.0,
             2L, null, 4.0,
             2L, 1, 3.0
-        ).getInputRecords()
+        ).getRows()
     );
     pipeline.run().waitUntilFinish();
   }
 
   @Test
   public void testOrderBy_nullsLast() throws Exception {
-    sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
-        .of(SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+    sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 2, 1.0,
             1L, null, 2.0,
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
-        .of(SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price"));
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+        .of(Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"));
 
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
         + " order_id, site_id, price "
@@ -144,16 +157,16 @@ public class BeamSortRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 2, 1.0,
             1L, null, 2.0,
             2L, 1, 3.0,
             2L, null, 4.0
-        ).getInputRecords()
+        ).getRows()
     );
     pipeline.run().waitUntilFinish();
   }
@@ -167,16 +180,16 @@ public class BeamSortRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             5L, 5, 5.0,
             6L, 6, 6.0,
             7L, 7, 7.0,
             8L, 8888, 8.0
-        ).getInputRecords()
+        ).getRows()
     );
     pipeline.run().waitUntilFinish();
   }
@@ -190,11 +203,11 @@ public class BeamSortRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 2, 1.0,
             1L, 1, 2.0,
             2L, 4, 3.0,
@@ -205,7 +218,7 @@ public class BeamSortRelTest {
             8L, 8888, 8.0,
             8L, 999, 9.0,
             10L, 100, 10.0
-        ).getInputRecords()
+        ).getRows()
     );
     pipeline.run().waitUntilFinish();
   }
@@ -221,10 +234,4 @@ public class BeamSortRelTest {
     TestPipeline pipeline = TestPipeline.create();
     BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
   }
-
-  @Before
-  public void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
-    sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
index c5aa132..cad3290 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.sql.Types;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,17 +39,19 @@ public class BeamUnionRelTest {
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
-      .of(SqlTypeName.BIGINT, "order_id",
-          SqlTypeName.INTEGER, "site_id",
-          SqlTypeName.DOUBLE, "price",
-
-          1L, 1, 1.0,
-          2L, 2, 2.0);
 
   @BeforeClass
   public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+    sqlEnv.registerTable("ORDER_DETAILS",
+        MockedBoundedTable.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        )
+    );
   }
 
   @Test
@@ -62,14 +65,14 @@ public class BeamUnionRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 1, 1.0,
             2L, 2, 2.0
-        ).getInputRecords()
+        ).getRows()
     );
     pipeline.run();
   }
@@ -85,16 +88,16 @@ public class BeamUnionRelTest {
 
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
-        MockedBeamSqlTable.of(
-            SqlTypeName.BIGINT, "order_id",
-            SqlTypeName.INTEGER, "site_id",
-            SqlTypeName.DOUBLE, "price",
-
+        TestUtils.RowsBuilder.of(
+            Types.BIGINT, "order_id",
+            Types.INTEGER, "site_id",
+            Types.DOUBLE, "price"
+        ).addRows(
             1L, 1, 1.0,
             1L, 1, 1.0,
             2L, 2, 2.0,
             2L, 2, 2.0
-        ).getInputRecords()
+        ).getRows()
     );
     pipeline.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index 81b1a13..9d13f9b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.sql.Types;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,24 +39,37 @@ public class BeamValuesRelTest {
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static MockedBeamSqlTable stringTable = MockedBeamSqlTable
-      .of(SqlTypeName.VARCHAR, "name",
-          SqlTypeName.VARCHAR, "description");
 
-  private static MockedBeamSqlTable intTable = MockedBeamSqlTable
-      .of(SqlTypeName.INTEGER, "c0",
-          SqlTypeName.INTEGER, "c1");
+  @BeforeClass
+  public static void prepare() {
+    sqlEnv.registerTable("string_table",
+        MockedBoundedTable.of(
+            Types.VARCHAR, "name",
+            Types.VARCHAR, "description"
+        )
+    );
+    sqlEnv.registerTable("int_table",
+        MockedBoundedTable.of(
+            Types.INTEGER, "c0",
+            Types.INTEGER, "c1"
+        )
+    );
+  }
 
   @Test
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.VARCHAR, "name",
-        SqlTypeName.VARCHAR, "description",
-        "hello", "world",
-        "james", "bond").getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.VARCHAR, "name",
+            Types.VARCHAR, "description"
+        ).addRows(
+            "hello", "world",
+            "james", "bond"
+        ).getRows()
+    );
     pipeline.run();
   }
 
@@ -63,11 +77,14 @@ public class BeamValuesRelTest {
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 
as int))";
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.INTEGER, "c0",
-        SqlTypeName.INTEGER, "c1",
-        1, 2
-    ).getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "c0",
+            Types.INTEGER, "c1"
+        ).addRows(
+            1, 2
+        ).getRows()
+    );
     pipeline.run();
   }
 
@@ -75,17 +92,14 @@ public class BeamValuesRelTest {
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
     PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, 
sqlEnv);
-    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
-        SqlTypeName.INTEGER, "EXPR$0",
-        SqlTypeName.CHAR, "EXPR$1",
-        1, "1"
-    ).getInputRecords());
+    PAssert.that(rows).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            Types.INTEGER, "EXPR$0",
+            Types.CHAR, "EXPR$1"
+        ).addRows(
+            1, "1"
+        ).getRows()
+    );
     pipeline.run();
   }
-
-  @BeforeClass
-  public static void prepareClass() {
-    sqlEnv.registerTable("string_table", stringTable);
-    sqlEnv.registerTable("int_table", intTable);
-  }
 }

Reply via email to