tvalentyn commented on a change in pull request #12490:
URL: https://github.com/apache/beam/pull/12490#discussion_r469644457



##########
File path: 
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOSource.java
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline 
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, 
{@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the 
last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the 
stream. If the

Review comment:
       Unless Multiline setting is enabled? 

##########
File path: 
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOSource.java
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline 
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, 
{@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the 
last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the 
stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first 
delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<LineContext> {
+  byte[] delimiter;
+
+  // Used to Override isSplittable
+  private boolean hasRFC4180MultiLineColumn;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasRFC4180MultiLineColumn) return false;
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  @Override
+  protected FileBasedSource<LineContext> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  protected FileBasedReader<LineContext> 
createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  public Coder<LineContext> getOutputCoder() {
+    SchemaCoder<LineContext> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(LineContext.class);
+    } catch (NoSuchSchemaException e) {
+      System.out.println("No Coder!");
+    }
+    return coder;
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader} which can decode records 
delimited by delimiter
+   * characters.
+   *
+   * <p>See {@link ContextualTextIOSource } for further details.
+   */
+  @VisibleForTesting
+  static class MultiLineTextBasedReader extends FileBasedReader<LineContext> {
+    public static final int READ_BUFFER_SIZE = 8192;
+    private static final ByteString UTF8_BOM =
+        ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 
0xBF});
+    private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfDelimiterInBuffer;
+    private int endOfDelimiterInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private @Nullable LineContext currentValue;
+    private @Nullable ReadableByteChannel inChannel;
+    private @Nullable byte[] delimiter;
+
+    // Add to override the isSplittable
+    private boolean hasRFC4180MultiLineColumn;
+
+    private long startingOffset;
+    private long readerlineNum;
+
+    private MultiLineTextBasedReader(
+        ContextualTextIOSource source, byte[] delimiter, boolean 
hasRFC4180MultiLineColumn) {
+      super(source);
+      buffer = ByteString.EMPTY;
+      this.delimiter = delimiter;
+      this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+      startingOffset = getCurrentSource().getStartOffset(); // Start offset;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) throw new NoSuchElementException();
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= 
getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public LineContext getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws 
IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until 
we see our
+      // first delimiter.
+      long startOffset = getCurrentSource().getStartOffset();
+      if (startOffset > 0) {
+        Preconditions.checkState(
+            channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a 
start offset"
+                + " greater than 0.",
+            ContextualTextIOSource.class.getSimpleName());
+        long requiredPosition = startOffset - 1;
+        if (delimiter != null && startOffset >= delimiter.length) {
+          // we need to move back the offset of at worse delimiter.size to be 
sure to see
+          // all the bytes of the delimiter in the call to 
findDelimiterBounds() below
+          requiredPosition = startOffset - delimiter.length;
+        }
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findDelimiterBoundsWithMultiLineCheck();
+        buffer = buffer.substring(endOfDelimiterInBuffer);
+        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
+        endOfDelimiterInBuffer = 0;
+        startOfDelimiterInBuffer = 0;
+      }
+    }
+
+    private void findDelimiterBoundsWithMultiLineCheck() throws IOException {
+      findDelimiterBounds();
+    }
+
+    /**
+     * Locates the start position and end position of the next delimiter. Will 
consume the channel
+     * till either EOF or the delimiter bounds are found.
+     *
+     * <p>If {@link ContextualTextIOSource#hasRFC4180MultiLineColumn} is set 
then the behaviour will
+     * change from the standard read seen in {@link 
org.apache.beam.sdk.io.TextIO}. The assumption
+     * when {@link ContextualTextIOSource#hasRFC4180MultiLineColumn} is set is 
that the file is
+     * being read with a single thread.
+     *
+     * <p>This fills the buffer and updates the positions as follows:
+     *
+     * <pre>{@code
+     * ------------------------------------------------------
+     * | element bytes | delimiter bytes | unconsumed bytes |
+     * ------------------------------------------------------
+     * 0            start of          end of              buffer
+     *              delimiter         delimiter           size
+     *              in buffer         in buffer
+     * }</pre>
+     */
+    private void findDelimiterBounds() throws IOException {
+      int bytePositionInBuffer = 0;
+      boolean doubleQuoteClosed = true;
+      boolean insideOpenQuote = true;

Review comment:
       Why is the value for  `insideOpenQuote` `true` in the beginning?

##########
File path: 
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOSource.java
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline 
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, 
{@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the 
last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the 
stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first 
delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<LineContext> {
+  byte[] delimiter;
+
+  // Used to Override isSplittable
+  private boolean hasRFC4180MultiLineColumn;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasRFC4180MultiLineColumn) return false;
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  @Override
+  protected FileBasedSource<LineContext> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  protected FileBasedReader<LineContext> 
createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  public Coder<LineContext> getOutputCoder() {
+    SchemaCoder<LineContext> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(LineContext.class);
+    } catch (NoSuchSchemaException e) {
+      System.out.println("No Coder!");

Review comment:
       We should use loggers to handle errors like this, e.g. 
https://github.com/apache/beam/blob/71c7760f4b5c5bf0d91e2c8403fae99216308a3e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L897

##########
File path: 
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOSource.java
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline 
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, 
{@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the 
last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the 
stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first 
delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<LineContext> {
+  byte[] delimiter;
+
+  // Used to Override isSplittable
+  private boolean hasRFC4180MultiLineColumn;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasRFC4180MultiLineColumn) return false;
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  @Override
+  protected FileBasedSource<LineContext> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  protected FileBasedReader<LineContext> 
createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  public Coder<LineContext> getOutputCoder() {
+    SchemaCoder<LineContext> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(LineContext.class);
+    } catch (NoSuchSchemaException e) {
+      System.out.println("No Coder!");
+    }
+    return coder;
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader} which can decode records 
delimited by delimiter
+   * characters.
+   *
+   * <p>See {@link ContextualTextIOSource } for further details.
+   */
+  @VisibleForTesting
+  static class MultiLineTextBasedReader extends FileBasedReader<LineContext> {
+    public static final int READ_BUFFER_SIZE = 8192;
+    private static final ByteString UTF8_BOM =
+        ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 
0xBF});
+    private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfDelimiterInBuffer;
+    private int endOfDelimiterInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private @Nullable LineContext currentValue;
+    private @Nullable ReadableByteChannel inChannel;
+    private @Nullable byte[] delimiter;
+
+    // Add to override the isSplittable
+    private boolean hasRFC4180MultiLineColumn;
+
+    private long startingOffset;
+    private long readerlineNum;
+
+    private MultiLineTextBasedReader(
+        ContextualTextIOSource source, byte[] delimiter, boolean 
hasRFC4180MultiLineColumn) {
+      super(source);
+      buffer = ByteString.EMPTY;
+      this.delimiter = delimiter;
+      this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+      startingOffset = getCurrentSource().getStartOffset(); // Start offset;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) throw new NoSuchElementException();
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= 
getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public LineContext getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws 
IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until 
we see our
+      // first delimiter.
+      long startOffset = getCurrentSource().getStartOffset();
+      if (startOffset > 0) {
+        Preconditions.checkState(
+            channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a 
start offset"
+                + " greater than 0.",
+            ContextualTextIOSource.class.getSimpleName());
+        long requiredPosition = startOffset - 1;
+        if (delimiter != null && startOffset >= delimiter.length) {
+          // we need to move back the offset of at worse delimiter.size to be 
sure to see
+          // all the bytes of the delimiter in the call to 
findDelimiterBounds() below
+          requiredPosition = startOffset - delimiter.length;
+        }
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findDelimiterBoundsWithMultiLineCheck();
+        buffer = buffer.substring(endOfDelimiterInBuffer);
+        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
+        endOfDelimiterInBuffer = 0;
+        startOfDelimiterInBuffer = 0;
+      }
+    }
+
+    private void findDelimiterBoundsWithMultiLineCheck() throws IOException {

Review comment:
       Do we need this helper?

##########
File path: 
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOSource.java
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline 
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, 
{@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the 
last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the 
stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first 
delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<LineContext> {
+  byte[] delimiter;
+
+  // Used to Override isSplittable
+  private boolean hasRFC4180MultiLineColumn;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasRFC4180MultiLineColumn) return false;
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasRFC4180MultiLineColumn) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+  }
+
+  @Override
+  protected FileBasedSource<LineContext> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  protected FileBasedReader<LineContext> 
createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, 
hasRFC4180MultiLineColumn);
+  }
+
+  @Override
+  public Coder<LineContext> getOutputCoder() {
+    SchemaCoder<LineContext> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(LineContext.class);
+    } catch (NoSuchSchemaException e) {
+      System.out.println("No Coder!");
+    }
+    return coder;
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader} which can decode records 
delimited by delimiter
+   * characters.
+   *
+   * <p>See {@link ContextualTextIOSource } for further details.
+   */
+  @VisibleForTesting
+  static class MultiLineTextBasedReader extends FileBasedReader<LineContext> {
+    public static final int READ_BUFFER_SIZE = 8192;
+    private static final ByteString UTF8_BOM =
+        ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 
0xBF});
+    private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfDelimiterInBuffer;
+    private int endOfDelimiterInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private @Nullable LineContext currentValue;
+    private @Nullable ReadableByteChannel inChannel;
+    private @Nullable byte[] delimiter;
+
+    // Add to override the isSplittable
+    private boolean hasRFC4180MultiLineColumn;
+
+    private long startingOffset;
+    private long readerlineNum;
+
+    private MultiLineTextBasedReader(
+        ContextualTextIOSource source, byte[] delimiter, boolean 
hasRFC4180MultiLineColumn) {
+      super(source);
+      buffer = ByteString.EMPTY;
+      this.delimiter = delimiter;
+      this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+      startingOffset = getCurrentSource().getStartOffset(); // Start offset;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) throw new NoSuchElementException();
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= 
getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public LineContext getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws 
IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until 
we see our
+      // first delimiter.
+      long startOffset = getCurrentSource().getStartOffset();
+      if (startOffset > 0) {
+        Preconditions.checkState(
+            channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a 
start offset"
+                + " greater than 0.",
+            ContextualTextIOSource.class.getSimpleName());
+        long requiredPosition = startOffset - 1;
+        if (delimiter != null && startOffset >= delimiter.length) {
+          // we need to move back the offset of at worse delimiter.size to be 
sure to see
+          // all the bytes of the delimiter in the call to 
findDelimiterBounds() below
+          requiredPosition = startOffset - delimiter.length;
+        }
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findDelimiterBoundsWithMultiLineCheck();
+        buffer = buffer.substring(endOfDelimiterInBuffer);
+        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
+        endOfDelimiterInBuffer = 0;
+        startOfDelimiterInBuffer = 0;
+      }
+    }
+
+    private void findDelimiterBoundsWithMultiLineCheck() throws IOException {
+      findDelimiterBounds();
+    }
+
+    /**
+     * Locates the start position and end position of the next delimiter. Will 
consume the channel
+     * till either EOF or the delimiter bounds are found.
+     *
+     * <p>If {@link ContextualTextIOSource#hasRFC4180MultiLineColumn} is set 
then the behaviour will
+     * change from the standard read seen in {@link 
org.apache.beam.sdk.io.TextIO}. The assumption
+     * when {@link ContextualTextIOSource#hasRFC4180MultiLineColumn} is set is 
that the file is
+     * being read with a single thread.
+     *
+     * <p>This fills the buffer and updates the positions as follows:
+     *
+     * <pre>{@code
+     * ------------------------------------------------------
+     * | element bytes | delimiter bytes | unconsumed bytes |
+     * ------------------------------------------------------
+     * 0            start of          end of              buffer
+     *              delimiter         delimiter           size
+     *              in buffer         in buffer
+     * }</pre>
+     */
+    private void findDelimiterBounds() throws IOException {
+      int bytePositionInBuffer = 0;
+      boolean doubleQuoteClosed = true;
+      boolean insideOpenQuote = true;
+
+      while (true) {
+        if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = 
bytePositionInBuffer;
+          break;
+        }
+
+        byte currentByte = buffer.byteAt(bytePositionInBuffer);
+        if (hasRFC4180MultiLineColumn) {
+          // Check if we are inside an open Quote
+          if (currentByte == '"') {
+            doubleQuoteClosed = !doubleQuoteClosed;

Review comment:
       Would anything change if we remove `doubleQuoteClosed` and write:        
     
   `insideOpenQuote = !insideOpenQuote;` here?
   

##########
File path: 
sdks/java/io/contextual-text-io/src/test/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOTest.java
##########
@@ -0,0 +1,1271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
+import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.apache.beam.sdk.io.Compression.AUTO;
+import static org.apache.beam.sdk.io.Compression.BZIP2;
+import static org.apache.beam.sdk.io.Compression.DEFLATE;
+import static org.apache.beam.sdk.io.Compression.GZIP;
+import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
+import static org.apache.beam.sdk.io.Compression.ZIP;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assume.assumeFalse;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ToString;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+
+/** Tests for {@link ContextualTextIO.Read} */
+public class ContextualTextIOTest {
+  private static final int NUM_LINES_FOR_LARGE = 1024;
+
+  private static final List<String> EMPTY = Collections.emptyList();
+
+  private static final List<String> TINY = Arrays.asList("ABC", "DEF", "HIJ");
+
+  private static final List<String> LARGE = makeLines(NUM_LINES_FOR_LARGE);
+
+  private static File writeToFile(
+      List<String> lines, TemporaryFolder folder, String fileName, Compression 
compression)
+      throws IOException {
+    File file = folder.getRoot().toPath().resolve(fileName).toFile();
+    OutputStream output = new FileOutputStream(file);
+    switch (compression) {
+      case UNCOMPRESSED:
+        break;
+      case GZIP:
+        output = new GZIPOutputStream(output);
+        break;
+      case BZIP2:
+        output = new BZip2CompressorOutputStream(output);
+        break;
+      case ZIP:
+        ZipOutputStream zipOutput = new ZipOutputStream(output);
+        zipOutput.putNextEntry(new ZipEntry("entry"));
+        output = zipOutput;
+        break;
+      case DEFLATE:
+        output = new DeflateCompressorOutputStream(output);
+        break;
+      default:
+        throw new UnsupportedOperationException(compression.toString());
+    }
+    writeToStreamAndClose(lines, output);
+    return file;
+  }
+
+  /**
+   * Helper that writes the given lines (adding a newline in between) to a 
stream, then closes the
+   * stream.
+   */
+  private static void writeToStreamAndClose(List<String> lines, OutputStream 
outputStream) {
+    try (PrintStream writer = new PrintStream(outputStream)) {
+      for (String line : lines) {
+        writer.println(line);
+      }
+    }
+  }
+
+  /** Helper to make an array of compressible strings. Returns ["line" i] for 
i in range(0,n). */
+  private static List<String> makeLines(int n) {
+    List<String> lines = new ArrayList<>();
+    for (int i = 0; i < n; ++i) {
+      lines.add("Line " + i);
+    }
+    return lines;
+  }
+
+  private static class convertLineContextToString extends DoFn<LineContext, 
String> {
+    @ProcessElement
+    public void processElement(@Element LineContext L, OutputReceiver<String> 
out) {
+      String file = L.getFile().substring(L.getFile().lastIndexOf('/') + 1);
+      out.output(file + " " + L.getLineNum() + " " + L.getLine());
+    }
+  }
+
+  /**
+   * Helper method that runs a variety of ways to read a single file using 
ContextualTextIO and
+   * checks that they all match the given expected output.
+   *
+   * <p>The transforms being verified are:
+   *
+   * <ul>
+   *   
<li>ContextualTextIO.read().from(filename).withCompression(compressionType).withHintMatchesManyFiles()
+   *   
<li>ContextualTextIO.read().from(filename).withCompression(compressionType)
+   *   
<li>ContextualTextIO.read().from(filename).withCompression(compressionType).with
+   *   <li>ContextualTextIO.readFiles().withCompression(compressionType)
+   * </ul>
+   */
+  private static void assertReadingCompressedFileMatchesExpected(
+      File file, Compression compression, List<String> expected, Pipeline p) {
+
+    ContextualTextIO.Read read =
+        
ContextualTextIO.read().from(file.getPath()).withCompression(compression);
+
+    // Convert the expected output into LineContext output Format
+    List<String> expectedOutput = new ArrayList<>();
+    for (int lineNum = 0; lineNum < expected.size(); ++lineNum) {
+      expectedOutput.add(file.getName() + " " + lineNum + " " + 
expected.get(lineNum));
+    }
+
+    PAssert.that(
+            p.apply("Read_" + file + "_" + compression.toString(), read)
+                .apply("ConvertLineContextToString", ParDo.of(new 
convertLineContextToString())))
+        .containsInAnyOrder(expectedOutput);
+    PAssert.that(
+            p.apply(
+                    "Read_" + file + "_" + compression.toString() + "_many",
+                    read.withHintMatchesManyFiles())
+                .apply(
+                    "ConvertLineContextToString" + "_many",
+                    ParDo.of(new convertLineContextToString())))
+        .containsInAnyOrder(expectedOutput);
+
+    PAssert.that(
+            p.apply(
+                    "Read_" + file + "_" + compression.toString() + 
"_withRFC4180",
+                    read.withHasRFC4180MultiLineColumn(true))
+                .apply(
+                    "ConvertLineContextToString" + "_withRFC4180",
+                    ParDo.of(new convertLineContextToString())))
+        .containsInAnyOrder(expectedOutput);
+
+    PAssert.that(
+            p.apply("Create_Paths_ReadFiles_" + file, 
Create.of(file.getPath()))
+                .apply("Match_" + file, FileIO.matchAll())
+                .apply("ReadMatches_" + file, 
FileIO.readMatches().withCompression(compression))
+                .apply("ReadFiles_" + compression.toString(), 
ContextualTextIO.readFiles())
+                .apply(
+                    "ConvertLineContextToStringWithFileIO",
+                    ParDo.of(new convertLineContextToString())))
+        .containsInAnyOrder(expectedOutput);
+  }
+
+  /**
+   * Create a zip file with the given lines.
+   *
+   * @param expected A list of expected lines, populated in the zip file.
+   * @param folder A temporary folder used to create files.
+   * @param filename Optionally zip file name (can be null).
+   * @param fieldsEntries Fields to write in zip entries.
+   * @return The zip filename.
+   * @throws Exception In case of a failure during zip file creation.
+   */
+  private static File createZipFile(
+      List<String> expected, TemporaryFolder folder, String filename, 
String[]... fieldsEntries)
+      throws Exception {
+    File tmpFile = folder.getRoot().toPath().resolve(filename).toFile();
+
+    ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
+    PrintStream writer = new PrintStream(out, true /* auto-flush on write */);
+
+    int index = 0;
+    for (String[] entry : fieldsEntries) {
+      out.putNextEntry(new ZipEntry(Integer.toString(index)));
+      for (String field : entry) {
+        writer.println(field);
+        expected.add(field);
+      }
+      out.closeEntry();
+      index++;
+    }
+
+    writer.close();
+    out.close();
+
+    return tmpFile;
+  }
+
+  private static ContextualTextIOSource prepareSource(
+      TemporaryFolder temporaryFolder, byte[] data, byte[] delimiter, boolean 
hasRFC4180Multiline)
+      throws IOException {
+    Path path = temporaryFolder.newFile().toPath();
+    Files.write(path, data);
+    return new ContextualTextIOSource(
+        ValueProvider.StaticValueProvider.of(path.toString()),
+        EmptyMatchTreatment.DISALLOW,
+        delimiter,
+        hasRFC4180Multiline);
+  }
+
+  private static String getFileSuffix(Compression compression) {
+    switch (compression) {
+      case UNCOMPRESSED:
+        return ".txt";
+      case GZIP:
+        return ".gz";
+      case BZIP2:
+        return ".bz2";
+      case ZIP:
+        return ".zip";
+      case DEFLATE:
+        return ".deflate";
+      default:
+        return "";
+    }
+  }
+  /** Tests for reading from different size of files with various Compression. 
*/
+  @RunWith(Parameterized.class)
+  public static class CompressedReadTest {
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+    @Rule public TestPipeline p = TestPipeline.create();
+
+    @Parameterized.Parameters(name = "{index}: {1}")
+    public static Iterable<Object[]> data() {
+      return ImmutableList.<Object[]>builder()
+          .add(new Object[] {EMPTY, UNCOMPRESSED})
+          .add(new Object[] {EMPTY, GZIP})
+          .add(new Object[] {EMPTY, BZIP2})
+          .add(new Object[] {EMPTY, ZIP})
+          .add(new Object[] {EMPTY, DEFLATE})
+          .add(new Object[] {TINY, UNCOMPRESSED})
+          .add(new Object[] {TINY, GZIP})
+          .add(new Object[] {TINY, BZIP2})
+          .add(new Object[] {TINY, ZIP})
+          .add(new Object[] {TINY, DEFLATE})
+          .add(new Object[] {LARGE, UNCOMPRESSED})
+          .add(new Object[] {LARGE, GZIP})
+          .add(new Object[] {LARGE, BZIP2})
+          .add(new Object[] {LARGE, ZIP})
+          .add(new Object[] {LARGE, DEFLATE})
+          .build();
+    }
+
+    @Parameterized.Parameter(0)
+    public List<String> lines;
+
+    @Parameterized.Parameter(1)
+    public Compression compression;
+
+    /** Tests reading from a small, compressed file with no extension. */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testCompressedReadWithoutExtension() throws Exception {
+      String fileName = lines.size() + "_" + compression + "_no_extension";
+      File fileWithNoExtension = writeToFile(lines, tempFolder, fileName, 
compression);
+      assertReadingCompressedFileMatchesExpected(fileWithNoExtension, 
compression, lines, p);
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testCompressedReadWithExtension() throws Exception {
+      String fileName =
+          lines.size() + "_" + compression + "_no_extension" + 
getFileSuffix(compression);
+      File fileWithExtension = writeToFile(lines, tempFolder, fileName, 
compression);
+
+      // Sanity check that we're properly testing compression.
+      if (lines.size() == NUM_LINES_FOR_LARGE && 
!compression.equals(UNCOMPRESSED)) {
+        File uncompressedFile = writeToFile(lines, tempFolder, "large.txt", 
UNCOMPRESSED);
+        assertThat(uncompressedFile.length(), 
greaterThan(fileWithExtension.length()));
+      }
+
+      assertReadingCompressedFileMatchesExpected(fileWithExtension, 
compression, lines, p);
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testReadWithAuto() throws Exception {
+      // Files with non-compressed extensions should work in AUTO and 
UNCOMPRESSED modes.
+      String fileName =
+          lines.size() + "_" + compression + "_no_extension" + 
getFileSuffix(compression);
+      File fileWithExtension = writeToFile(lines, tempFolder, fileName, 
compression);
+      assertReadingCompressedFileMatchesExpected(fileWithExtension, AUTO, 
lines, p);
+      p.run();
+    }
+  }
+
+  /** Tests for reading files with various delimiters. */
+  @RunWith(Parameterized.class)
+  public static class ReadWithDelimiterTest {
+    private static final ImmutableList<String> EXPECTED = 
ImmutableList.of("asdf", "hjkl", "xyz");
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Iterable<Object[]> data() {
+      return ImmutableList.<Object[]>builder()
+          //          .add(new Object[] {"\n\n\n", ImmutableList.of("", "", 
"")})
+          .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED})
+          .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED})
+          .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED})
+          .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED})
+          .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED})
+          .build();
+    }
+
+    @Parameterized.Parameter(0)
+    public String line;
+
+    @Parameterized.Parameter(1)
+    public ImmutableList<String> expected;
+
+    @Test
+    public void testReadLinesWithDelimiter() throws Exception {
+      runTestReadWithData(line.getBytes(UTF_8), expected);
+    }
+
+    private ContextualTextIOSource prepareSource(byte[] data, boolean 
hasRFC4180Multiline)
+        throws IOException {
+      return ContextualTextIOTest.prepareSource(tempFolder, data, null, 
hasRFC4180Multiline);
+    }
+
+    private void runTestReadWithData(byte[] data, List<String> 
expectedResults) throws Exception {
+      ContextualTextIOSource source = prepareSource(data, false);
+      List<LineContext> actual =
+          SourceTestUtils.readFromSource(source, 
PipelineOptionsFactory.create());
+      List<String> actualOutput = new ArrayList<>();
+      actual.forEach(
+          (LineContext L) -> {
+            String file = L.getFile().substring(L.getFile().lastIndexOf('/') + 
1);
+            actualOutput.add(L.getLine());
+          });
+      assertThat(
+          actualOutput,
+          containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new 
String[0])));
+    }
+  }
+
+  @RunWith(Parameterized.class)
+  public static class ReadWithDelimiterAndRFC4180 {
+    static final ImmutableList<String> Expected = 
ImmutableList.of("\"asdf\nhjkl\nmnop\"", "xyz");
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Iterable<Object[]> data() {
+      return ImmutableList.<Object[]>builder()
+          .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")})
+          .add(new Object[] {"\"asdf\nhjkl\"\nxyz\n", 
ImmutableList.of("\"asdf\nhjkl\"", "xyz")})
+          .add(new Object[] {"\"asdf\nhjkl\nmnop\"\nxyz\n", Expected})
+          .add(new Object[] {"\"asdf\nhjkl\nmnop\"\nxyz\r", Expected})
+          .add(new Object[] {"\"asdf\nhjkl\nmnop\"\r\nxyz\n", Expected})
+          .add(new Object[] {"\"asdf\nhjkl\nmnop\"\r\nxyz\r\n", Expected})
+          .add(new Object[] {"\"asdf\nhjkl\nmnop\"\rxyz\r\n", Expected})
+          .build();
+    }
+
+    @Parameterized.Parameter(0)
+    public String line;
+
+    @Parameterized.Parameter(1)
+    public ImmutableList<String> expected;
+
+    @Test
+    public void testReadLinesWithDelimiter() throws Exception {
+      runTestReadWithData(line.getBytes(UTF_8), expected);
+    }
+
+    private ContextualTextIOSource prepareSource(byte[] data, boolean 
hasRFC4180Multiline)
+        throws IOException {
+      return ContextualTextIOTest.prepareSource(tempFolder, data, null, 
hasRFC4180Multiline);
+    }
+
+    private void runTestReadWithData(byte[] data, List<String> 
expectedResults) throws Exception {
+      ContextualTextIOSource source = prepareSource(data, true);
+      List<LineContext> actual =
+          SourceTestUtils.readFromSource(source, 
PipelineOptionsFactory.create());
+      List<String> actualOutput = new ArrayList<>();
+      actual.forEach(
+          (LineContext L) -> {
+            String file = L.getFile().substring(L.getFile().lastIndexOf('/') + 
1);
+            actualOutput.add(L.getLine());
+          });
+      assertThat(
+          actualOutput,
+          containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new 
String[0])));
+    }
+  }
+
+  /** Tests Specific for checking functionality of ContextualTextIO */
+  @RunWith(JUnit4.class)
+  public static class ContextualTextIOSpecificTests {
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+    @Rule public TestPipeline p = TestPipeline.create();
+
+    public static final char CR = (char) 0x0D;
+    public static final char LF = (char) 0x0A;
+
+    public static final String CRLF = "" + CR + LF;
+
+    public String createFiles(List<String> input) throws Exception {
+
+      File tmpFile = tempFolder.newFile();
+      String filename = tmpFile.getPath();
+
+      try (PrintStream writer = new PrintStream(new 
FileOutputStream(tmpFile))) {
+        for (String elem : input) {
+          byte[] encodedElem = 
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
+          String line = new String(encodedElem, Charsets.UTF_8);
+          writer.println(line);
+        }
+      }
+      return filename;
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void MultipleFilesTest() throws Exception {
+      List<File> files =
+          Arrays.asList(
+              tempFolder.newFile("File1"),
+              tempFolder.newFile("File2"),
+              tempFolder.newFile("File3"));
+
+      int num = 0;
+      for (File tmpFile : files) {
+        num += 2;
+        String filename = tmpFile.getPath();
+        try (PrintStream writer = new PrintStream(new 
FileOutputStream(tmpFile))) {
+          for (int lineNum = 0; lineNum < 10 + num; ++lineNum) {
+            String elem = filename + " " + lineNum;
+            byte[] encodedElem = 
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
+            String line = new String(encodedElem, Charsets.UTF_8);
+            writer.println(line);
+          }
+        }
+      }
+      String filePath = files.get(0).getPath();
+      filePath = filePath.substring(0, filePath.lastIndexOf('/') + 1);
+      filePath += '*';
+      p.apply(ContextualTextIO.read().from(filePath))
+          .apply(
+              MapElements.into(strings())
+                  .via(
+                      (LineContext L) -> {
+                        String expectedLineNum =
+                            L.getLine().substring(L.getLine().lastIndexOf(' ') 
+ 1);
+                        assertEquals(Long.parseLong(expectedLineNum), (long) 
L.getLineNum());
+                        return "";
+                      }));
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWithHintMatchesManyFiles() throws IOException {
+      List<File> files =
+          Arrays.asList(
+              tempFolder.newFile("File1"),
+              tempFolder.newFile("File2"),
+              tempFolder.newFile("File3"));
+
+      int num = 0;
+      for (File tmpFile : files) {
+        num += 2;
+        String filename = tmpFile.getPath();
+        try (PrintStream writer = new PrintStream(new 
FileOutputStream(tmpFile))) {
+          for (int lineNum = 0; lineNum < 10 + num; ++lineNum) {
+            String elem = filename + " " + lineNum;
+            byte[] encodedElem = 
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
+            String line = new String(encodedElem, Charsets.UTF_8);
+            writer.println(line);
+          }
+        }
+      }
+      String filePath = files.get(0).getPath();
+      filePath = filePath.substring(0, filePath.lastIndexOf('/') + 1);
+      filePath += '*';
+      
p.apply(ContextualTextIO.read().from(filePath).withHintMatchesManyFiles())
+          .apply(
+              MapElements.into(strings())
+                  .via(
+                      (LineContext L) -> {
+                        String expectedLineNum =
+                            L.getLine().substring(L.getLine().lastIndexOf(' ') 
+ 1);
+                        assertEquals(Long.parseLong(expectedLineNum), (long) 
L.getLineNum());
+                        return "";
+                      }));
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void runBasicReadTest() throws Exception {
+
+      List<String> input = ImmutableList.of("1", "2");
+      ContextualTextIO.Read read = 
ContextualTextIO.read().from(createFiles(input));
+      PCollection<LineContext> output = p.apply(read);
+
+      PCollection<String> result =
+          output.apply(MapElements.into(strings()).via(x -> 
String.valueOf(x.getLine())));
+
+      PAssert.that(result).containsInAnyOrder("1", "2");
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void runBasicReadTestWithRFC4180Set() throws Exception {
+
+      List<String> input = ImmutableList.of("1", "2");
+
+      ContextualTextIO.Read read =
+          
ContextualTextIO.read().from(createFiles(input)).withHasRFC4180MultiLineColumn(true);
+      PCollection<LineContext> output = p.apply(read);
+
+      PCollection<String> result =
+          output.apply(MapElements.into(strings()).via(x -> 
String.valueOf(x.getLine())));
+
+      PAssert.that(result).containsInAnyOrder("1", "2");
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    /** Test to read files with using MultiLine columns as per RFC4180 */
+    public void runSmallRFC4180MultiLineReadTest() throws Exception {
+
+      // Generate lines of format "1\n1" where number changes per line.
+      List<String> input =
+          IntStream.range(0, 2)
+              .<String>mapToObj(x -> "\"" + x + CRLF + x + "\"")
+              .collect(Collectors.toList());
+
+      ContextualTextIO.Read read =
+          
ContextualTextIO.read().from(createFiles(input)).withRFC4180MultiLineColumn(true);
+      PCollection<LineContext> output = p.apply(read);
+
+      PCollection<String> result =
+          output.apply(
+              MapElements.into(strings())
+                  .via(
+                      x -> {
+                        return String.valueOf(x.getLine());
+                      }));
+
+      PAssert.that(result).containsInAnyOrder(input);
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    /** Test to read files with using MultiLine columns as per RFC4180 */
+    public void runSmallRFC4180EscapedCharcatersReadTest() throws Exception {
+
+      // Generate lines of format  "aaa","b""bb","ccc" where number changes 
per line.
+      List<String> input =
+          IntStream.range(0, 2)
+              .<String>mapToObj(x -> "\"aaa\",\"b\"\"bb\",\"ccc\"")
+              .collect(Collectors.toList());
+
+      ContextualTextIO.Read read =
+          
ContextualTextIO.read().from(createFiles(input)).withRFC4180MultiLineColumn(true);
+      PCollection<LineContext> output = p.apply(read);
+
+      PCollection<String> result =
+          output.apply(
+              MapElements.into(strings())
+                  .via(
+                      x -> {
+                        return String.valueOf(x.getLine());
+                      }));
+
+      PAssert.that(result).containsInAnyOrder(input);
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    /** Test to read files with using MultiLine columns as per RFC4180 */
+    public void runLargeRFC4180MultiLineReadTest() throws Exception {
+
+      // Generate lines of format "1\n1" where number changes per line.
+      List<String> input =
+          IntStream.range(0, 1000)
+              .<String>mapToObj(x -> "\"" + x + CRLF + x + "\"")
+              .collect(Collectors.toList());
+
+      ContextualTextIO.Read read =
+          
ContextualTextIO.read().from(createFiles(input)).withHasRFC4180MultiLineColumn(true);
+      PCollection<LineContext> output = p.apply(read);
+
+      PCollection<String> result =
+          output.apply(MapElements.into(strings()).via(x -> 
String.valueOf(x.getLine())));
+
+      PAssert.that(result).containsInAnyOrder(input);
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    /** Test to read files with using MultiLine columns as per RFC4180 */
+    public void runLargeRFC4180MultiLineAndEscapedReadTest() throws Exception {
+
+      // Generate lines of format  "aaa","b""\nbb","ccc","""\nHello" where 
number changes per line.
+      List<String> input =
+          IntStream.range(0, 1000)
+              .<String>mapToObj(
+                  x -> "\"a" + CRLF + "aa\",\"b\"\"" + CRLF + 
"bb\",\"ccc\",\"\"\"\\nHello\"")
+              .collect(Collectors.toList());
+
+      ContextualTextIO.Read read =
+          
ContextualTextIO.read().from(createFiles(input)).withHasRFC4180MultiLineColumn(true);
+      PCollection<LineContext> output = p.apply(read);
+
+      PCollection<String> result =
+          output.apply(MapElements.into(strings()).via(x -> 
String.valueOf(x.getLine())));
+
+      PAssert.that(result).containsInAnyOrder(input);
+
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    /** Test to read files with using MultiLine columns as per RFC4180 */

Review comment:
       This comment is probably not very helpful since we are copypasting it 
with 5 different scenarios.
   Also, a good test case communicates the scenario and expected result in the 
test name, so that a comment is not required.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to