Repository: beam
Updated Branches:
  refs/heads/DSL_SQL f1c2b6540 -> ca8760373


http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
new file mode 100644
index 0000000..9dde0f1
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to explain queries.
+ *
+ */
+public class BeamPlannerExplainTest extends BasePlanner {
+
+  @Test
+  public void selectAll() throws Exception {
+    String sql = "SELECT * FROM ORDER_DETAILS";
+    String plan = runner.explainQuery(sql);
+
+    String expectedPlan =
+        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[$3])\n"
+        + "  BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+  }
+
+  @Test
+  public void selectWithFilter() throws Exception {
+    String sql = "SELECT " + " order_id, site_id, price " + "FROM 
ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 and price > 20";
+    String plan = runner.explainQuery(sql);
+
+    String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], 
price=[$2])\n"
+        + "  BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+        + "    BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+  }
+
+  @Test
+  public void insertSelectFilter() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
+        + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 and price > 20";
+    String plan = runner.explainQuery(sql);
+
+    String expectedPlan =
+        "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], 
flattened=[true])\n"
+        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[null])\n"
+        + "    BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+        + "      BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+        + "        BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
new file mode 100644
index 0000000..d32b19b
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to execute a query.
+ *
+ */
+public class BeamPlannerSubmitTest extends BasePlanner {
+  @Test
+  public void insertSelectFilter() throws Exception {
+    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+        + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE 
SITE_ID = 0 and price > 20";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    runner.getPlanner().planner.close();
+
+    pipeline.run().waitUntilFinish();
+
+    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+    Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null",
+        MockedBeamSQLTable.CONTENT.get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
new file mode 100644
index 0000000..8631a6e
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+
+/**
+ * A mock table use to check input/output.
+ *
+ */
+public class MockedBeamSQLTable extends BaseBeamTable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1373168368414036932L;
+
+  public static final List<String> CONTENT = new ArrayList<>();
+
+  public MockedBeamSQLTable(RelProtoDataType protoRowType) {
+    super(protoRowType);
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  @Override
+  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
+    BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
+    row1.addField(0, 12345L);
+    row1.addField(1, 0);
+    row1.addField(2, 10.5);
+    row1.addField(3, new Date());
+
+    BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
+    row2.addField(0, 12345L);
+    row2.addField(1, 1);
+    row2.addField(2, 20.5);
+    row2.addField(3, new Date());
+
+    BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
+    row3.addField(0, 12345L);
+    row3.addField(1, 0);
+    row3.addField(2, 20.5);
+    row3.addField(3, new Date());
+
+    BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
+    row4.addField(0, null);
+    row4.addField(1, null);
+    row4.addField(2, 20.5);
+    row4.addField(3, new Date());
+
+    return Create.of(row1, row2, row3);
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+    return new OutputStore();
+  }
+
+  /**
+   * Keep output in {@code CONTENT} for validation.
+   *
+   */
+  public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, 
PDone> {
+
+    @Override
+    public PDone expand(PCollection<BeamSQLRow> input) {
+      input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() {
+
+        @Setup
+        public void setup() {
+          CONTENT.clear();
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          CONTENT.add(c.element().valueInString());
+        }
+
+        @Teardown
+        public void close() {
+
+        }
+
+      }));
+      return PDone.in(input.getPipeline());
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
deleted file mode 100644
index 56e45c4..0000000
--- a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.planner;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
-import org.junit.BeforeClass;
-
-/**
- * prepare {@code BeamSqlRunner} for test.
- *
- */
-public class BasePlanner {
-  public static BeamSqlRunner runner = new BeamSqlRunner();
-
-  @BeforeClass
-  public static void prepare() {
-    runner.addTable("ORDER_DETAILS", getTable());
-    runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
-    runner.addTable("SUB_ORDER_RAM", getTable());
-  }
-
-  private static BaseBeamTable getTable() {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", 
SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", 
SqlTypeName.TIMESTAMP).build();
-      }
-    };
-
-    return new MockedBeamSQLTable(protoRowType);
-  }
-
-  public static BaseBeamTable getTable(String bootstrapServer, String topic) {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", 
SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", 
SqlTypeName.TIMESTAMP).build();
-      }
-    };
-
-    Map<String, Object> consumerPara = new HashMap<String, Object>();
-    consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-
-    return new BeamKafkaCSVTable(protoRowType, bootstrapServer, 
Arrays.asList(topic))
-        .updateConsumerProperties(consumerPara);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
deleted file mode 100644
index a77878f..0000000
--- 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.planner;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests to explain queries.
- *
- */
-public class BeamPlannerExplainTest extends BasePlanner {
-
-  @Test
-  public void selectAll() throws Exception {
-    String sql = "SELECT * FROM ORDER_DETAILS";
-    String plan = runner.explainQuery(sql);
-
-    String expectedPlan =
-        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[$3])\n"
-        + "  BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
-    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
-  }
-
-  @Test
-  public void selectWithFilter() throws Exception {
-    String sql = "SELECT " + " order_id, site_id, price " + "FROM 
ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 and price > 20";
-    String plan = runner.explainQuery(sql);
-
-    String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], 
price=[$2])\n"
-        + "  BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
-        + "    BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
-    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
-  }
-
-  @Test
-  public void insertSelectFilter() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
-        + " order_id, site_id, price " + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 and price > 20";
-    String plan = runner.explainQuery(sql);
-
-    String expectedPlan =
-        "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], 
flattened=[true])\n"
-        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[null])\n"
-        + "    BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
-        + "      BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
-        + "        BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
-    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
deleted file mode 100644
index eb097a9..0000000
--- 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.planner;
-
-import org.apache.beam.sdk.Pipeline;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests to execute a query.
- *
- */
-public class BeamPlannerSubmitTest extends BasePlanner {
-  @Test
-  public void insertSelectFilter() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
-        + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE 
SITE_ID = 0 and price > 20";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
-    runner.getPlanner().planner.close();
-
-    pipeline.run().waitUntilFinish();
-
-    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
-    Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", 
MockedBeamSQLTable.CONTENT.get(0));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java 
b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
deleted file mode 100644
index 31f5578..0000000
--- a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.planner;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamIOType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * A mock table use to check input/output.
- *
- */
-public class MockedBeamSQLTable extends BaseBeamTable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1373168368414036932L;
-
-  public static final List<String> CONTENT = new ArrayList<>();
-
-  public MockedBeamSQLTable(RelProtoDataType protoRowType) {
-    super(protoRowType);
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return BeamIOType.UNBOUNDED;
-  }
-
-  @Override
-  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
-    BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
-    row1.addField(0, 12345L);
-    row1.addField(1, 0);
-    row1.addField(2, 10.5);
-    row1.addField(3, new Date());
-
-    BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
-    row2.addField(0, 12345L);
-    row2.addField(1, 1);
-    row2.addField(2, 20.5);
-    row2.addField(3, new Date());
-
-    BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
-    row3.addField(0, 12345L);
-    row3.addField(1, 0);
-    row3.addField(2, 20.5);
-    row3.addField(3, new Date());
-
-    BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
-    row4.addField(0, null);
-    row4.addField(1, null);
-    row4.addField(2, 20.5);
-    row4.addField(3, new Date());
-
-    return Create.of(row1, row2, row3);
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
-    return new OutputStore();
-  }
-
-  /**
-   * Keep output in {@code CONTENT} for validation.
-   *
-   */
-  public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, 
PDone> {
-
-    @Override
-    public PDone expand(PCollection<BeamSQLRow> input) {
-      input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() {
-
-        @Setup
-        public void setup() {
-          CONTENT.clear();
-        }
-
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          CONTENT.add(c.element().valueInString());
-        }
-
-        @Teardown
-        public void close() {
-
-        }
-
-      }));
-      return PDone.in(input.getPipeline());
-    }
-
-  }
-
-}

Reply via email to