Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 3e678a75a -> 7e918a76e


[BEAM-2079] Support TextIO as SQL source/sink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59abc764
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59abc764
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59abc764

Branch: refs/heads/DSL_SQL
Commit: 59abc7640866f58427817c026af15df30947f3a6
Parents: 3e678a7
Author: James Xu <xumingmi...@gmail.com>
Authored: Thu May 4 11:44:14 2017 +0800
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Wed May 10 07:24:01 2017 +0200

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |   5 +
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |  66 +++++++
 .../schema/text/BeamTextCSVTableIOReader.java   | 123 +++++++++++++
 .../schema/text/BeamTextCSVTableIOWriter.java   |  76 ++++++++
 .../dsls/sql/schema/text/BeamTextTable.java     |  45 +++++
 .../beam/dsls/sql/schema/text/package-info.java |  22 +++
 .../sql/schema/text/BeamTextCSVTableTest.java   | 179 +++++++++++++++++++
 7 files changed, 516 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 6139ada..15692e9 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -204,5 +204,10 @@
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>1.4</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
new file mode 100644
index 0000000..b9e6b81
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
+ *
+ * <p>
+ * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
+ * </p>
+ */
+public class BeamTextCSVTable extends BeamTextTable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BeamTextCSVTable.class);
+
+  private CSVFormat csvFormat;
+
+  /**
+   * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
+   */
+  public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern)  
{
+    this(protoDataType, filePattern, CSVFormat.DEFAULT);
+  }
+
+  public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern,
+      CSVFormat csvFormat) {
+    super(protoDataType, filePattern);
+    this.csvFormat = csvFormat;
+  }
+
+  @Override
+  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
+    return new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, 
csvFormat);
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+    return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, 
csvFormat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
new file mode 100644
index 0000000..cf7c095
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IOReader for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOReader
+    extends PTransform<PBegin, PCollection<BeamSQLRow>>
+    implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamTextCSVTableIOReader.class);
+  private String filePattern;
+  protected BeamSQLRecordType beamSqlRecordType;
+  protected CSVFormat csvFormat;
+
+  public BeamTextCSVTableIOReader(BeamSQLRecordType beamSqlRecordType, String 
filePattern,
+      CSVFormat csvFormat) {
+    this.filePattern = filePattern;
+    this.beamSqlRecordType = beamSqlRecordType;
+    this.csvFormat = csvFormat;
+  }
+
+  @Override
+  public PCollection<BeamSQLRow> expand(PBegin input) {
+    return input.apply("decodeRecord", TextIO.Read.from(filePattern))
+        .apply(ParDo.of(new DoFn<String, BeamSQLRow>() {
+          @ProcessElement
+          public void processElement(ProcessContext ctx) {
+            String str = ctx.element();
+
+            try (StringReader reader = new StringReader(str)) {
+              CSVRecord rawRecord = null;
+              try {
+                CSVParser parser = csvFormat.parse(reader);
+                rawRecord = parser.getRecords().get(0);
+              } catch (IOException e) {
+                throw new IllegalArgumentException("Invalid text filePattern: 
" + filePattern, e);
+              }
+
+              BeamSQLRow row = new BeamSQLRow(beamSqlRecordType);
+              if (rawRecord.size() != beamSqlRecordType.size()) {
+                throw new IllegalArgumentException(String.format(
+                    "Invalid filePattern: {}, expect %d fields, but actually 
%d", str,
+                    filePattern, beamSqlRecordType.size(), rawRecord.size()
+                ));
+              } else {
+                for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
+                  String raw = rawRecord.get(idx);
+                  addFieldWithAutoTypeCasting(row, idx, raw);
+                }
+                ctx.output(row);
+              }
+            }
+          }
+        }));
+  }
+
+  public void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) 
{
+    SqlTypeName columnType = row.getDataType().getFieldsType().get(idx);
+    switch (columnType) {
+      case TINYINT:
+        row.addField(idx, Byte.valueOf(raw));
+        break;
+      case SMALLINT:
+        row.addField(idx, Short.valueOf(raw));
+        break;
+      case INTEGER:
+        row.addField(idx, Integer.valueOf(raw));
+        break;
+      case BIGINT:
+        row.addField(idx, Long.valueOf(raw));
+        break;
+      case FLOAT:
+        row.addField(idx, Float.valueOf(raw));
+        break;
+      case DOUBLE:
+        row.addField(idx, Double.valueOf(raw));
+        break;
+      case VARCHAR:
+        row.addField(idx, raw);
+        break;
+      default:
+        throw new BeamSqlUnsupportedException(String.format(
+            "Column type %s is not supported yet!", columnType));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
new file mode 100644
index 0000000..6104cd8
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IOWriter for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOWriter extends 
PTransform<PCollection<BeamSQLRow>, PDone>
+    implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamTextCSVTableIOWriter.class);
+
+  private String filePattern;
+  protected BeamSQLRecordType beamSqlRecordType;
+  protected CSVFormat csvFormat;
+
+  public BeamTextCSVTableIOWriter(BeamSQLRecordType beamSqlRecordType, String 
filePattern,
+      CSVFormat csvFormat) {
+    this.filePattern = filePattern;
+    this.beamSqlRecordType = beamSqlRecordType;
+    this.csvFormat = csvFormat;
+  }
+
+  @Override public PDone expand(PCollection<BeamSQLRow> input) {
+    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, String>() 
{
+
+      @ProcessElement public void processElement(ProcessContext ctx) {
+        BeamSQLRow row = ctx.element();
+        StringWriter writer = new StringWriter();
+
+        try (CSVPrinter printer = csvFormat.print(writer)) {
+          for (int i = 0; i < row.size(); i++) {
+            printer.print(row.getFieldValue(i).toString());
+          }
+          printer.println();
+        } catch (IOException e) {
+          throw new IllegalArgumentException("Invalid filePattern: " + 
filePattern, e);
+        }
+
+        ctx.output(writer.toString());
+      }
+    })).apply(TextIO.Write.to(filePattern));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
new file mode 100644
index 0000000..3353761
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.Serializable;
+
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.calcite.rel.type.RelProtoDataType;
+
+/**
+ * {@code BeamTextTable} represents a text file/directory(backed by {@code 
TextIO}).
+ */
+public abstract class BeamTextTable extends BaseBeamTable implements 
Serializable {
+  protected String filePattern;
+  protected BeamTextTable(RelProtoDataType protoRowType) {
+    super(protoRowType);
+  }
+
+  protected BeamTextTable(RelProtoDataType protoDataType, String filePattern) {
+    super(protoDataType);
+    this.filePattern = filePattern;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.BOUNDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
new file mode 100644
index 0000000..f48f2fe
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for text files.
+ */
+package org.apache.beam.dsls.sql.schema.text;

http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
new file mode 100644
index 0000000..e06f8da
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @Rule public TestPipeline pipeline2 = TestPipeline.create();
+
+  /**
+   * testData.
+   *
+   * <p>
+   * The types of the csv fields are:
+   *     integer,bigint,float,double,string
+   * </p>
+   */
+  private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" };
+  private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
+
+  private static List<Object[]> testData = Arrays.asList(data1, data2);
+  private static List<BeamSQLRow> testDataRows = new ArrayList<BeamSQLRow>() {{
+    for (Object[] data : testData) {
+      add(buildRow(data));
+    }
+  }};
+  private static ConcurrentLinkedQueue<Object[]> actualData = new 
ConcurrentLinkedQueue<>();
+
+  private static Path tempFolder;
+  private static File readerSourceFile;
+  private static File writerTargetFile;
+
+  @Test public void testBuildIOReader() {
+    PCollection<BeamSQLRow> rows = pipeline.apply(
+        new BeamTextCSVTable(buildRowType(), 
readerSourceFile.getAbsolutePath()).buildIOReader());
+    PAssert.that(rows).containsInAnyOrder(testDataRows);
+    pipeline.run();
+  }
+
+  @Test public void testBuildIOWriter() {
+    // reader from a source file, then write into a target file
+    pipeline.apply(
+        new BeamTextCSVTable(buildRowType(), 
readerSourceFile.getAbsolutePath()).buildIOReader())
+        .apply(new BeamTextCSVTable(buildRowType(), 
writerTargetFile.getAbsolutePath())
+            .buildIOWriter());
+    pipeline.run();
+
+    PCollection<BeamSQLRow> rows = pipeline2.apply(
+        new BeamTextCSVTable(buildRowType(), 
writerTargetFile.getAbsolutePath()).buildIOReader());
+
+    // confirm the two reads match
+    PAssert.that(rows).containsInAnyOrder(testDataRows);
+    pipeline2.run();
+  }
+
+  @BeforeClass public static void setUp() throws IOException {
+    tempFolder = Files.createTempDirectory("BeamTextTableTest");
+    readerSourceFile = writeToFile(testData, "readerSourceFile.txt");
+    writerTargetFile = writeToFile(testData, "writerTargetFile.txt");
+  }
+
+  @AfterClass public static void teardownClass() throws IOException {
+    Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() {
+
+      @Override public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs)
+          throws IOException {
+        Files.delete(file);
+        return FileVisitResult.CONTINUE;
+      }
+
+      @Override public FileVisitResult postVisitDirectory(Path dir, 
IOException exc)
+          throws IOException {
+        Files.delete(dir);
+        return FileVisitResult.CONTINUE;
+      }
+    });
+  }
+
+  private static File writeToFile(List<Object[]> rows, String filename) throws 
IOException {
+    File file = tempFolder.resolve(filename).toFile();
+    OutputStream output = new FileOutputStream(file);
+    writeToStreamAndClose(rows, output);
+    return file;
+  }
+
+  /**
+   * Helper that writes the given lines (adding a newline in between) to a 
stream, then closes the
+   * stream.
+   */
+  private static void writeToStreamAndClose(List<Object[]> rows, OutputStream 
outputStream) {
+    try (PrintStream writer = new PrintStream(outputStream)) {
+      CSVPrinter printer = CSVFormat.DEFAULT.print(writer);
+      for (Object[] row : rows) {
+        for (Object field : row) {
+          printer.print(field);
+        }
+        printer.println();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private RelProtoDataType buildRowType() {
+    return new RelProtoDataType() {
+
+      @Override public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", 
SqlTypeName.BIGINT)
+            .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE)
+            .add("user_name", SqlTypeName.VARCHAR).build();
+      }
+    };
+  }
+
+  private static RelDataType buildRelDataType() {
+    return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", 
SqlTypeName.INTEGER)
+        .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT)
+        .add("amount", SqlTypeName.DOUBLE).add("user_name", 
SqlTypeName.VARCHAR).build();
+  }
+
+  private static BeamSQLRecordType buildBeamSQLRecordType() {
+    return BeamSQLRecordType.from(buildRelDataType());
+  }
+
+  private static BeamSQLRow buildRow(Object[] data) {
+    return new BeamSQLRow(buildBeamSQLRecordType(), Arrays.asList(data));
+  }
+}

Reply via email to