[ 
https://issues.apache.org/jira/browse/BEAM-4626?focusedWorklogId=115618&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115618
 ]

ASF GitHub Bot logged work on BEAM-4626:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jun/18 20:06
            Start Date: 25/Jun/18 20:06
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5748: [BEAM-4626] SQL 
text tables of raw lines
URL: https://github.com/apache/beam/pull/5748
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 7a92639d4fe..66f68e53c65 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -249,6 +249,7 @@ public static Write write() {
         .setFilenameSuffix(null)
         .setFilenamePolicy(null)
         .setDynamicDestinations(null)
+        .setDelimiter(new char[] {'\n'})
         
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
         .setNumShards(0)
@@ -569,6 +570,7 @@ private CreateTextSourceFn(byte[] delimiter) {
   @AutoValue
   public abstract static class TypedWrite<UserT, DestinationT>
       extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
+
     /** The prefix of each file written, combined with suffix and 
shardTemplate. */
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
@@ -579,6 +581,9 @@ private CreateTextSourceFn(byte[] delimiter) {
     @Nullable
     abstract ValueProvider<ResourceId> getTempDirectory();
 
+    /** The delimiter between string records. */
+    abstract char[] getDelimiter();
+
     /** An optional header to add to each file. */
     @Nullable abstract String getHeader();
 
@@ -637,6 +642,8 @@ private CreateTextSourceFn(byte[] delimiter) {
 
       abstract Builder<UserT, DestinationT> setFooter(@Nullable String footer);
 
+      abstract Builder<UserT, DestinationT> setDelimiter(char[] delimiter);
+
       abstract Builder<UserT, DestinationT> setFilenamePolicy(
           @Nullable FilenamePolicy filenamePolicy);
 
@@ -822,6 +829,15 @@ private CreateTextSourceFn(byte[] delimiter) {
       return withNumShards(1).withShardNameTemplate("");
     }
 
+    /**
+     * Specifies the delimiter after each string written.
+     *
+     * <p>Defaults to '\n'.
+     */
+    public TypedWrite<UserT, DestinationT> withDelimiter(char[] delimiter) {
+      return toBuilder().setDelimiter(delimiter).build();
+    }
+
     /**
      * Adds a header string to each file. A newline after the header is added 
automatically.
      *
@@ -945,6 +961,7 @@ private CreateTextSourceFn(byte[] delimiter) {
               new TextSink<>(
                   tempDirectory,
                   resolveDynamicDestinations(),
+                  getDelimiter(),
                   getHeader(),
                   getFooter(),
                   getWritableByteChannelFactory()));
@@ -1087,6 +1104,11 @@ public Write withoutSharding() {
       return new Write(inner.withoutSharding());
     }
 
+    /** See {@link TypedWrite#withDelimiter(char[])}. */
+    public Write withDelimiter(char[] delimiter) {
+      return new Write(inner.withDelimiter(delimiter));
+    }
+
     /** See {@link TypedWrite#withHeader(String)}. */
     public Write withHeader(@Nullable String header) {
       return new Write(inner.withHeader(header));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 24f63a6b466..e5119c09e0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -37,21 +37,24 @@
 class TextSink<UserT, DestinationT> extends FileBasedSink<UserT, DestinationT, 
String> {
   @Nullable private final String header;
   @Nullable private final String footer;
+  private final char[] delimiter;
 
   TextSink(
       ValueProvider<ResourceId> baseOutputFilename,
       DynamicDestinations<UserT, DestinationT, String> dynamicDestinations,
+      char[] delimiter,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
     super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory);
     this.header = header;
     this.footer = footer;
+    this.delimiter = delimiter;
   }
 
   @Override
   public WriteOperation<DestinationT, String> createWriteOperation() {
-    return new TextWriteOperation<>(this, header, footer);
+    return new TextWriteOperation<>(this, delimiter, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
@@ -59,16 +62,19 @@
       extends WriteOperation<DestinationT, String> {
     @Nullable private final String header;
     @Nullable private final String footer;
+    private final char[] delimiter;
 
-    private TextWriteOperation(TextSink sink, @Nullable String header, 
@Nullable String footer) {
+    private TextWriteOperation(
+        TextSink sink, char[] delimiter, @Nullable String header, @Nullable 
String footer) {
       super(sink);
       this.header = header;
       this.footer = footer;
+      this.delimiter = delimiter;
     }
 
     @Override
     public Writer<DestinationT, String> createWriter() throws Exception {
-      return new TextWriter<>(this, header, footer);
+      return new TextWriter<>(this, delimiter, header, footer);
     }
   }
 
@@ -77,15 +83,18 @@ private TextWriteOperation(TextSink sink, @Nullable String 
header, @Nullable Str
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
+    private final char[] delimiter;
 
     // Initialized in prepareWrite
     @Nullable private OutputStreamWriter out;
 
     public TextWriter(
         WriteOperation<DestinationT, String> writeOperation,
+        char[] delimiter,
         @Nullable String header,
         @Nullable String footer) {
       super(writeOperation, MimeTypes.TEXT);
+      this.delimiter = delimiter;
       this.header = header;
       this.footer = footer;
     }
@@ -97,10 +106,10 @@ private void writeIfNotNull(@Nullable String value) throws 
IOException {
       }
     }
 
-    /** Writes {@code value} followed by newline character. */
+    /** Writes {@code value} followed by the delimiter byte sequence. */
     private void writeLine(String value) throws IOException {
       out.write(value);
-      out.write(NEWLINE);
+      out.write(delimiter);
     }
 
     @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index f42db02b19e..65d013b4227 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -129,6 +129,18 @@
     return new TypeDescriptor<BigInteger>() {};
   }
 
+  /**
+   * The {@link TypeDescriptor} for {@link Row}.
+   * This is the equivalent of:
+   * <pre>
+   * new TypeDescriptor&lt;Row&gt;() {};
+   * </pre>
+   * @return A {@link TypeDescriptor} for Row
+   */
+  public static TypeDescriptor<Row> rows() {
+    return new TypeDescriptor<Row>() {};
+  }
+
   /**
    * The {@link TypeDescriptor} for String.
    * This is the equivalent of:
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
index 10faa81eeb1..02a7e15d7dc 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -21,10 +21,12 @@
 import static org.apache.beam.sdk.values.Row.toRow;
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.io.StringWriter;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.stream.IntStream;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
@@ -40,24 +42,38 @@
  * <p>TODO: Does not yet support nested types.
  */
 public final class BeamTableUtils {
-  public static Row csvLine2BeamRow(CSVFormat csvFormat, String line, Schema 
schema) {
 
-    try (StringReader reader = new StringReader(line)) {
-      CSVParser parser = csvFormat.parse(reader);
-      CSVRecord rawRecord = parser.getRecords().get(0);
-
-      if (rawRecord.size() != schema.getFieldCount()) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Expect %d fields, but actually %d", schema.getFieldCount(), 
rawRecord.size()));
+  /**
+   * Decode zero or more CSV records from the given string, according to the 
specified {@link
+   * CSVFormat}, and converts them to {@link Row Rows} with the specified 
{@link Schema}.
+   *
+   * <p>A single "line" read from e.g. {@link TextIO} can have zero or more 
records, depending on
+   * whether the line was split on the same characters that delimite CSV 
records, and whether the
+   * {@link CSVFormat} ignores blank lines.
+   */
+  public static Iterable<Row> csvLines2BeamRows(CSVFormat csvFormat, String 
line, Schema schema) {
+    // Empty lines can result in empty strings after Beam splits the file,
+    // which are not empty records to CSVParser unless they have a record 
terminator.
+    if (!line.endsWith(csvFormat.getRecordSeparator())) {
+      line += csvFormat.getRecordSeparator();
+    }
+    try (CSVParser parser = CSVParser.parse(line, csvFormat)) {
+      List<Row> rows = new ArrayList<>();
+      for (CSVRecord rawRecord : parser.getRecords()) {
+        if (rawRecord.size() != schema.getFieldCount()) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Expect %d fields, but actually %d", schema.getFieldCount(), 
rawRecord.size()));
+        }
+        rows.add(
+            IntStream.range(0, schema.getFieldCount())
+                .mapToObj(idx -> autoCastField(schema.getField(idx), 
rawRecord.get(idx)))
+                .collect(toRow(schema)));
       }
-
-      return IntStream.range(0, schema.getFieldCount())
-          .mapToObj(idx -> autoCastField(schema.getField(idx), 
rawRecord.get(idx)))
-          .collect(toRow(schema));
-
+      return rows;
     } catch (IOException e) {
-      throw new IllegalArgumentException("decodeRecord failed!", e);
+      throw new IllegalArgumentException(
+          String.format("Could not parse CSV records from %s with format %s", 
line, csvFormat), e);
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java
index b80ecf34cb3..b4ed7e8802b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java
@@ -19,7 +19,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
-import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLine2BeamRow;
+import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
 
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
@@ -75,7 +75,9 @@ public CsvRecorderDecoder(Schema schema, CSVFormat format) {
                 @ProcessElement
                 public void processElement(ProcessContext c) {
                   String rowInString = new String(c.element().getValue(), 
UTF_8);
-                  c.output(csvLine2BeamRow(format, rowInString, schema));
+                  for (Row row : csvLines2BeamRows(format, rowInString, 
schema)) {
+                    c.output(row);
+                  }
                 }
               }));
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
deleted file mode 100644
index 7e7472cd6b1..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
- *
- * <p>{@link CSVFormat} itself has many dialects, check its javadoc for more 
info.
- */
-public class BeamTextCSVTable extends BeamTextTable {
-
-  private String csvFilePattern;
-  private CSVFormat csvFormat;
-
-  /** CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. */
-  public BeamTextCSVTable(Schema beamSchema, String filePattern) {
-    this(beamSchema, filePattern, CSVFormat.DEFAULT);
-  }
-
-  public BeamTextCSVTable(Schema schema, String csvFilePattern, CSVFormat 
csvFormat) {
-    super(schema, csvFilePattern);
-    this.csvFilePattern = csvFilePattern;
-    this.csvFormat = csvFormat;
-  }
-
-  @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    return begin
-        .apply("decodeRecord", TextIO.read().from(filePattern))
-        .apply("parseCSVLine", new BeamTextCSVTableIOReader(schema, 
filePattern, csvFormat));
-  }
-
-  @Override
-  public POutput buildIOWriter(PCollection<Row> input) {
-    return input.apply(new BeamTextCSVTableIOWriter(schema, filePattern, 
csvFormat));
-  }
-
-  public CSVFormat getCsvFormat() {
-    return csvFormat;
-  }
-
-  public String getCsvFilePattern() {
-    return csvFilePattern;
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOReader.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOReader.java
deleted file mode 100644
index 3a52951888d..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOReader.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-
-import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLine2BeamRow;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.commons.csv.CSVFormat;
-
-/** IOReader for {@code BeamTextCSVTable}. */
-public class BeamTextCSVTableIOReader extends PTransform<PCollection<String>, 
PCollection<Row>>
-    implements Serializable {
-  private String filePattern;
-  protected Schema schema;
-  protected CSVFormat csvFormat;
-
-  public BeamTextCSVTableIOReader(Schema schema, String filePattern, CSVFormat 
csvFormat) {
-    this.filePattern = filePattern;
-    this.schema = schema;
-    this.csvFormat = csvFormat;
-  }
-
-  @Override
-  public PCollection<Row> expand(PCollection<String> input) {
-    return input.apply(
-        ParDo.of(
-            new DoFn<String, Row>() {
-              @ProcessElement
-              public void processElement(ProcessContext ctx) {
-                String str = ctx.element();
-                ctx.output(csvLine2BeamRow(csvFormat, str, schema));
-              }
-            }));
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
deleted file mode 100644
index c8a1f436ff1..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-
-import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-import org.apache.commons.csv.CSVFormat;
-
-/** IOWriter for {@code BeamTextCSVTable}. */
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<Row>, 
POutput>
-    implements Serializable {
-  private String filePattern;
-  protected Schema schema;
-  protected CSVFormat csvFormat;
-
-  public BeamTextCSVTableIOWriter(Schema schema, String filePattern, CSVFormat 
csvFormat) {
-    this.filePattern = filePattern;
-    this.schema = schema;
-    this.csvFormat = csvFormat;
-  }
-
-  @Override
-  public POutput expand(PCollection<Row> input) {
-    return input
-        .apply(
-            "encodeRecord",
-            ParDo.of(
-                new DoFn<Row, String>() {
-
-                  @ProcessElement
-                  public void processElement(ProcessContext ctx) {
-                    Row row = ctx.element();
-                    ctx.output(beamRow2CsvLine(row, csvFormat));
-                  }
-                }))
-        .apply(TextIO.write().to(filePattern));
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
deleted file mode 100644
index c0951e5b7ec..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-
-import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** {@code BeamTextTable} represents a text file/directory(backed by {@code 
TextIO}). */
-public abstract class BeamTextTable extends BaseBeamTable {
-  protected String filePattern;
-
-  protected BeamTextTable(Schema schema, String filePattern) {
-    super(schema);
-    this.filePattern = filePattern;
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
new file mode 100644
index 00000000000..a2a2f71a381
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.meta.provider.text;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link TextTable} is a {@link 
org.apache.beam.sdk.extensions.sql.BeamSqlTable} that reads text
+ * files and converts them according to the specified format.
+ *
+ * <p>Support formats are {@code "csv"} and {@code "lines"}.
+ *
+ * <p>{@link CSVFormat} itself has many dialects, check its javadoc for more 
info.
+ */
+@Internal
+public class TextTable extends BaseBeamTable {
+
+  private final PTransform<PCollection<String>, PCollection<Row>> 
readConverter;
+  private final PTransform<PCollection<Row>, PCollection<String>> 
writeConverter;
+  private final String filePattern;
+
+  /** Text table with the specified read and write transforms. */
+  public TextTable(
+      Schema schema,
+      String filePattern,
+      PTransform<PCollection<String>, PCollection<Row>> readConverter,
+      PTransform<PCollection<Row>, PCollection<String>> writeConverter) {
+    super(schema);
+    this.filePattern = filePattern;
+    this.readConverter = readConverter;
+    this.writeConverter = writeConverter;
+  }
+
+  public String getFilePattern() {
+    return filePattern;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    return begin
+        .apply("ReadTextFiles", TextIO.read().from(filePattern))
+        .apply("StringToRow", readConverter);
+  }
+
+  @Override
+  public PDone buildIOWriter(PCollection<Row> input) {
+    return input
+        .apply("RowToString", writeConverter)
+        .apply("WriteTextFiles", TextIO.write().withDelimiter(new char[] 
{}).to(filePattern));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index fdac9096d69..db1ea285e54 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -18,13 +18,28 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
+
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.commons.csv.CSVFormat;
 
 /**
@@ -40,7 +55,7 @@
  * TYPE 'text'
  * COMMENT 'this is the table orders'
  * LOCATION '/home/admin/orders'
- * TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format)
+ * TBLPROPERTIES '{"format":"csv", "csvformat": "Excel"}' -- format of each 
text line(csv format)
  * }</pre>
  */
 @AutoService(TableProvider.class)
@@ -56,14 +71,122 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = table.getSchema();
 
     String filePattern = table.getLocation();
-    CSVFormat format = CSVFormat.DEFAULT;
     JSONObject properties = table.getProperties();
-    String csvFormatStr = properties.getString("format");
-    if (csvFormatStr != null && !csvFormatStr.isEmpty()) {
-      format = CSVFormat.valueOf(csvFormatStr);
+    String format = MoreObjects.firstNonNull(properties.getString("format"), 
"csv");
+
+    // Backwards compatibility: previously "type": "text" meant CSV and 
"format" was where the
+    // CSV format went. So assume that any other format is the CSV format.
+    @Nullable String legacyCsvFormat = null;
+    if (!ImmutableSet.of("csv", "lines").contains(format)) {
+      legacyCsvFormat = format;
+      format = "csv";
+    }
+
+    switch (format) {
+      case "csv":
+        String specifiedCsvFormat = properties.getString("csvformat");
+        CSVFormat csvFormat =
+            specifiedCsvFormat != null
+                ? CSVFormat.valueOf(specifiedCsvFormat)
+                : (legacyCsvFormat != null
+                    ? CSVFormat.valueOf(legacyCsvFormat)
+                    : CSVFormat.DEFAULT);
+        return new TextTable(
+            schema, filePattern, new CsvToRow(schema, csvFormat), new 
RowToCsv(csvFormat));
+      case "lines":
+        checkArgument(
+            schema.getFieldCount() == 1
+                && 
schema.getField(0).getType().equals(Schema.FieldType.STRING),
+            "Table with type 'text' and format 'lines' "
+                + "must have exactly one STRING/VARCHAR/CHAR column");
+        return new TextTable(
+            schema, filePattern, new LinesReadConverter(), new 
LinesWriteConverter());
+      default:
+        throw new IllegalArgumentException(
+            "Table with type 'text' must have format 'csv' or 'lines'");
+    }
+  }
+
+  /** Write-side converter for for {@link TextTable} with format {@code 
'lines'}. */
+  public static class LinesWriteConverter extends PTransform<PCollection<Row>, 
PCollection<String>>
+      implements Serializable {
+    private static final Schema SCHEMA = 
Schema.builder().addStringField("line").build();
+
+    public LinesWriteConverter() {}
+
+    @Override
+    public PCollection<String> expand(PCollection<Row> input) {
+      return input.apply(
+          "rowsToLines",
+          MapElements.into(TypeDescriptors.strings()).via((Row row) -> 
row.getString(0) + "\n"));
+    }
+  }
+
+  /** Read-side converter for {@link TextTable} with format {@code 'lines'}. */
+  public static class LinesReadConverter extends 
PTransform<PCollection<String>, PCollection<Row>>
+      implements Serializable {
+
+    private static final Schema SCHEMA = 
Schema.builder().addStringField("line").build();
+
+    public LinesReadConverter() {}
+
+    @Override
+    public PCollection<Row> expand(PCollection<String> input) {
+      return input.apply(
+          "linesToRows",
+          MapElements.into(TypeDescriptors.rows())
+              .via(s -> Row.withSchema(SCHEMA).addValue(s).build()));
+    }
+  }
+
+  /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */
+  @VisibleForTesting
+  static class RowToCsv extends PTransform<PCollection<Row>, 
PCollection<String>>
+      implements Serializable {
+
+    private CSVFormat csvFormat;
+
+    public RowToCsv(CSVFormat csvFormat) {
+      this.csvFormat = csvFormat;
+    }
+
+    @VisibleForTesting
+    public CSVFormat getCsvFormat() {
+      return csvFormat;
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<Row> input) {
+      return input.apply(
+          "rowToCsv",
+          MapElements.into(TypeDescriptors.strings()).via(row -> 
beamRow2CsvLine(row, csvFormat)));
+    }
+  }
+
+  /** Read-side converter for {@link TextTable} with format {@code 'csv'}. */
+  @VisibleForTesting
+  public static class CsvToRow extends PTransform<PCollection<String>, 
PCollection<Row>>
+      implements Serializable {
+
+    private Schema schema;
+    private CSVFormat csvFormat;
+
+    @VisibleForTesting
+    public CSVFormat getCsvFormat() {
+      return csvFormat;
+    }
+
+    public CsvToRow(Schema schema, CSVFormat csvFormat) {
+      this.schema = schema;
+      this.csvFormat = csvFormat;
     }
 
-    BeamTextCSVTable txtTable = new BeamTextCSVTable(schema, filePattern, 
format);
-    return txtTable;
+    @Override
+    public PCollection<Row> expand(PCollection<String> input) {
+      return input.apply(
+          "csvToRow",
+          FlatMapElements.into(TypeDescriptors.rows())
+              .via(s -> csvLines2BeamRows(csvFormat, s, schema)));
+    }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
deleted file mode 100644
index fbd6ea1a6a0..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta.provider.text;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.UncheckedIOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/** Tests for {@code BeamTextCSVTable}. */
-public class BeamTextCSVTableTest {
-
-  @Rule public TestPipeline pipeline = TestPipeline.create();
-  @Rule public TestPipeline pipeline2 = TestPipeline.create();
-
-  /**
-   * testData.
-   *
-   * <p>The types of the csv fields are: integer,bigint,float,double,string
-   */
-  private static Schema schema =
-      Schema.builder()
-          .addInt32Field("id")
-          .addInt64Field("order_id")
-          .addFloatField("price")
-          .addDoubleField("amount")
-          .addStringField("user_name")
-          .build();
-
-  private static Object[] data1 = new Object[] {1, 1L, 1.1F, 1.1, "james"};
-  private static Object[] data2 = new Object[] {2, 2L, 2.2F, 2.2, "bond"};
-
-  private static List<Object[]> testData = Arrays.asList(data1, data2);
-  private static List<Row> testDataRows =
-      Arrays.asList(
-          Row.withSchema(schema).addValues(data1).build(),
-          Row.withSchema(schema).addValues(data2).build());
-
-  private static Path tempFolder;
-  private static File readerSourceFile;
-  private static File writerTargetFile;
-
-  @Test
-  public void testBuildIOReader() {
-    PCollection<Row> rows =
-        new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath())
-            .buildIOReader(pipeline.begin());
-    PAssert.that(rows).containsInAnyOrder(testDataRows);
-    pipeline.run();
-  }
-
-  @Test
-  public void testBuildIOWriter() {
-    PCollection<Row> input =
-        new BeamTextCSVTable(schema, readerSourceFile.getAbsolutePath())
-            .buildIOReader(pipeline.begin());
-    new BeamTextCSVTable(schema, 
writerTargetFile.getAbsolutePath()).buildIOWriter(input);
-    pipeline.run();
-
-    PCollection<Row> rows =
-        new BeamTextCSVTable(schema, writerTargetFile.getAbsolutePath())
-            .buildIOReader(pipeline2.begin());
-
-    // confirm the two reads match
-    PAssert.that(rows).containsInAnyOrder(testDataRows);
-    pipeline2.run();
-  }
-
-  @BeforeClass
-  public static void setUp() throws IOException {
-    tempFolder = Files.createTempDirectory("BeamTextTableTest");
-    readerSourceFile = writeToFile(testData, "readerSourceFile.txt");
-    writerTargetFile = writeToFile(testData, "writerTargetFile.txt");
-  }
-
-  @AfterClass
-  public static void teardownClass() throws IOException {
-    Files.walkFileTree(
-        tempFolder,
-        new SimpleFileVisitor<Path>() {
-
-          @Override
-          public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs)
-              throws IOException {
-            Files.delete(file);
-            return FileVisitResult.CONTINUE;
-          }
-
-          @Override
-          public FileVisitResult postVisitDirectory(Path dir, IOException exc) 
throws IOException {
-            Files.delete(dir);
-            return FileVisitResult.CONTINUE;
-          }
-        });
-  }
-
-  private static File writeToFile(List<Object[]> rows, String filename) throws 
IOException {
-    File file = tempFolder.resolve(filename).toFile();
-    OutputStream output = new FileOutputStream(file);
-    writeToStreamAndClose(rows, output);
-    return file;
-  }
-
-  /**
-   * Helper that writes the given lines (adding a newline in between) to a 
stream, then closes the
-   * stream.
-   */
-  private static void writeToStreamAndClose(List<Object[]> rows, OutputStream 
outputStream) {
-    try (PrintStream writer = new PrintStream(outputStream)) {
-      CSVPrinter printer = CSVFormat.DEFAULT.print(writer);
-      for (Object[] row : rows) {
-        for (Object field : row) {
-          printer.print(field);
-        }
-        printer.println();
-      }
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index c328f6069c2..1c1c27922e5 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -17,69 +17,229 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.alibaba.fastjson.JSONObject;
-import java.util.stream.Stream;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Charsets;
+import java.io.File;
+import java.nio.file.Files;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.commons.csv.CSVFormat;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
-/** UnitTest for {@link TextTableProvider}. */
+/** Tests for {@link TextTableProvider}. */
 public class TextTableProviderTest {
-  private TextTableProvider provider = new TextTableProvider();
 
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
+  public TemporaryFolder tempFolder =
+      new TemporaryFolder() {
+        @Override
+        protected void after() {}
+      };
+
+  private static final String SQL_CSV_SCHEMA = "(f_string VARCHAR, f_int INT)";
+  private static final Schema CSV_SCHEMA =
+      Schema.builder()
+          .addNullableField("f_string", Schema.FieldType.STRING)
+          .addNullableField("f_int", Schema.FieldType.INT32)
+          .build();
+
+  private static final Schema LINES_SCHEMA = 
Schema.builder().addStringField("f_string").build();
+  private static final String SQL_LINES_SCHEMA = "(f_string VARCHAR)";
+
+  // Even though these have the same schema as LINES_SCHEMA, that is 
accidental; they exist for a
+  // different purpose, to test Excel CSV format that does not ignore empty 
lines
+  private static final Schema SINGLE_STRING_CSV_SCHEMA =
+      Schema.builder().addStringField("f_string").build();
+  private static final String SINGLE_STRING_SQL_SCHEMA = "(f_string VARCHAR)";
+
+  /**
+   * Tests {@code CREATE TABLE TYPE text} with no format reads a default CSV.
+   *
+   * <p>The default format ignores empty lines, so that is an important part 
of this test.
+   */
   @Test
-  public void testGetTableType() throws Exception {
-    assertEquals("text", provider.getTableType());
+  public void testLegacyDefaultCsv() throws Exception {
+    Files.write(
+        tempFolder.newFile("test.csv").toPath(),
+        "hello,13\n\ngoodbye,42\n".getBytes(Charsets.UTF_8));
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s/*'",
+            SQL_CSV_SCHEMA, tempFolder.getRoot()));
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(CSV_SCHEMA).addValues("hello", 13).build(),
+            Row.withSchema(CSV_SCHEMA).addValues("goodbye", 42).build());
+    pipeline.run();
   }
 
+  /**
+   * Tests {@code CREATE TABLE TYPE text} with a format other than "csv" or 
"lines" results in a CSV
+   * read of that format.
+   */
   @Test
-  public void testBuildBeamSqlTable() throws Exception {
-    Table table = mockTable("hello", null);
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+  public void testLegacyTdfCsv() throws Exception {
+    Files.write(
+        tempFolder.newFile("test.csv").toPath(),
+        "hello\t13\n\ngoodbye\t42\n".getBytes(Charsets.UTF_8));
 
-    assertNotNull(sqlTable);
-    assertTrue(sqlTable instanceof BeamTextCSVTable);
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES 
'{\"format\":\"TDF\"}'",
+            SQL_CSV_SCHEMA, tempFolder.getRoot()));
 
-    BeamTextCSVTable csvTable = (BeamTextCSVTable) sqlTable;
-    assertEquals(CSVFormat.DEFAULT, csvTable.getCsvFormat());
-    assertEquals("/home/admin/hello", csvTable.getCsvFilePattern());
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    rows.apply(
+        MapElements.into(TypeDescriptors.voids())
+            .via(
+                r -> {
+                  System.out.println(r.toString());
+                  return null;
+                }));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(CSV_SCHEMA).addValues("hello", 13).build(),
+            Row.withSchema(CSV_SCHEMA).addValues("goodbye", 42).build());
+    pipeline.run();
   }
 
+  /** Tests {@code CREATE TABLE TYPE text TBLPROPERTIES '{"format":"csv"}'} 
works as expected. */
   @Test
-  public void testBuildBeamSqlTable_customizedFormat() throws Exception {
-    Table table = mockTable("hello", "Excel");
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+  public void testExplicitCsv() throws Exception {
+    Files.write(
+        tempFolder.newFile("test.csv").toPath(),
+        "hello,13\n\ngoodbye,42\n".getBytes(Charsets.UTF_8));
 
-    assertNotNull(sqlTable);
-    assertTrue(sqlTable instanceof BeamTextCSVTable);
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES 
'{\"format\":\"csv\"}'",
+            SQL_CSV_SCHEMA, tempFolder.getRoot()));
 
-    BeamTextCSVTable csvTable = (BeamTextCSVTable) sqlTable;
-    assertEquals(CSVFormat.EXCEL, csvTable.getCsvFormat());
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(CSV_SCHEMA).addValues("hello", 13).build(),
+            Row.withSchema(CSV_SCHEMA).addValues("goodbye", 42).build());
+    pipeline.run();
   }
 
-  private static Table mockTable(String name, String format) {
-    JSONObject properties = new JSONObject();
-    if (format != null) {
-      properties.put("format", format);
-    }
-    return Table.builder()
-        .name(name)
-        .comment(name + " table")
-        .location("/home/admin/" + name)
-        .schema(
-            Stream.of(
-                    Schema.Field.nullable("id", Schema.FieldType.INT32),
-                    Schema.Field.nullable("name", Schema.FieldType.STRING))
-                .collect(toSchema()))
-        .type("text")
-        .properties(properties)
-        .build();
+  /**
+   * Tests {@code CREATE TABLE TYPE text TBLPROPERTIES '{"format":"csv", 
"csvFormat": "Excel"}'}
+   * works as expected.
+   *
+   * <p>Not that the different with "Excel" format is that blank lines are not 
ignored but have a
+   * single string field.
+   */
+  @Test
+  public void testExplicitCsvExcel() throws Exception {
+    Files.write(
+        tempFolder.newFile("test.csv").toPath(), 
"hello\n\ngoodbye\n".getBytes(Charsets.UTF_8));
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s/*' "
+                + "TBLPROPERTIES '{\"format\":\"csv\", 
\"csvFormat\":\"Excel\"}'",
+            SINGLE_STRING_SQL_SCHEMA, tempFolder.getRoot()));
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            
Row.withSchema(SINGLE_STRING_CSV_SCHEMA).addValues("hello").build(),
+            
Row.withSchema(SINGLE_STRING_CSV_SCHEMA).addValues("goodbye").build());
+    pipeline.run();
+  }
+
+  /** Tests {@code CREATE TABLE TYPE text TBLPROPERTIES '{"format":"lines"}'} 
works as expected. */
+  @Test
+  public void testLines() throws Exception {
+    // Data that looks like CSV but isn't parsed as it
+    Files.write(
+        tempFolder.newFile("test.csv").toPath(), 
"hello,13\ngoodbye,42\n".getBytes(Charsets.UTF_8));
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES 
'{\"format\":\"lines\"}'",
+            SQL_LINES_SCHEMA, tempFolder.getRoot()));
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(LINES_SCHEMA).addValues("hello,13").build(),
+            Row.withSchema(LINES_SCHEMA).addValues("goodbye,42").build());
+    pipeline.run();
+  }
+
+  @Test
+  public void testWriteLines() throws Exception {
+    File destinationFile = new File(tempFolder.getRoot(), "lines-outputs");
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES 
'{\"format\":\"lines\"}'",
+            SQL_LINES_SCHEMA, destinationFile.getAbsolutePath()));
+
+    BeamSqlRelUtils.toPCollection(
+        pipeline, env.parseQuery("INSERT INTO test VALUES ('hello'), 
('goodbye')"));
+    pipeline.run();
+
+    assertThat(
+        new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
+            .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
+        containsInAnyOrder("hello", "goodbye"));
+  }
+
+  @Test
+  public void testWriteCsv() throws Exception {
+    File destinationFile = new File(tempFolder.getRoot(), "csv-outputs");
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+
+    // NumberedShardedFile
+    env.executeDdl(
+        String.format(
+            "CREATE TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES 
'{\"format\":\"csv\"}'",
+            SQL_CSV_SCHEMA, destinationFile.getAbsolutePath()));
+
+    BeamSqlRelUtils.toPCollection(
+        pipeline, env.parseQuery("INSERT INTO test VALUES ('hello', 42), 
('goodbye', 13)"));
+    pipeline.run();
+
+    assertThat(
+        new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
+            .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
+        containsInAnyOrder("hello,42", "goodbye,13"));
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 115618)
    Time Spent: 1h  (was: 50m)

> Support text table format with a single column of the lines of the files
> ------------------------------------------------------------------------
>
>                 Key: BEAM-4626
>                 URL: https://issues.apache.org/jira/browse/BEAM-4626
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Today, SQL can read CSV and allows a {{format}} flag to control what CSV 
> variant is used. But to do easy things and write pure SQL jobs it would be 
> nice to just read the text file as a one-column table and do transformations 
> in SQL.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to