Repository: beam
Updated Branches:
  refs/heads/master c2c89eda9 -> 3161904d9


Moves TextSource and TextSink to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b725c25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b725c25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b725c25

Branch: refs/heads/master
Commit: 7b725c25288ae24eb89be3bf61e09e0e38c2b200
Parents: 681b5d6
Author: Eugene Kirpichov <[email protected]>
Authored: Fri Apr 28 17:46:44 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 343 +------------------
 .../java/org/apache/beam/sdk/io/TextSink.java   | 139 ++++++++
 .../java/org/apache/beam/sdk/io/TextSource.java | 236 +++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   3 +-
 4 files changed, 377 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 90d56e7..1f9b7a0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,20 +19,8 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
@@ -41,13 +29,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -412,7 +397,7 @@ public class TextIO {
         throw new IllegalStateException(
             "cannot set both a filename policy and a filename prefix");
       }
-      WriteFiles<String> write = null;
+      WriteFiles<String> write;
       if (getFilenamePolicy() != null) {
         write =
             WriteFiles.to(
@@ -535,330 +520,4 @@ public class TextIO {
 
   /** Disable construction of utility class. */
   private TextIO() {}
-
-  /**
-   * A {@link FileBasedSource} which can decode records delimited by newline 
characters.
-   *
-   * <p>This source splits the data into records using {@code UTF-8} {@code 
\n}, {@code \r}, or
-   * {@code \r\n} as the delimiter. This source is not strict and supports 
decoding the last record
-   * even if it is not delimited. Finally, no records are decoded if the 
stream is empty.
-   *
-   * <p>This source supports reading from any arbitrary byte position within 
the stream. If the
-   * starting position is not {@code 0}, then bytes are skipped until the 
first delimiter is found
-   * representing the beginning of the first record to be decoded.
-   */
-  @VisibleForTesting
-  static class TextSource extends FileBasedSource<String> {
-    /** The Coder to use to decode each line. */
-    @VisibleForTesting
-    TextSource(String fileSpec) {
-      super(StaticValueProvider.of(fileSpec), 1L);
-    }
-
-    @VisibleForTesting
-    TextSource(ValueProvider<String> fileSpec) {
-      super(fileSpec, 1L);
-    }
-
-    private TextSource(Metadata metadata, long start, long end) {
-      super(metadata, 1L, start, end);
-    }
-
-    @Override
-    protected FileBasedSource<String> createForSubrangeOfFile(
-        Metadata metadata,
-        long start,
-        long end) {
-      return new TextSource(metadata, start, end);
-    }
-
-    @Override
-    protected FileBasedReader<String> createSingleFileReader(PipelineOptions 
options) {
-      return new TextBasedReader(this);
-    }
-
-    @Override
-    public Coder<String> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSource.FileBasedReader 
FileBasedReader}
-     * which can decode records delimited by newline characters.
-     *
-     * <p>See {@link TextSource} for further details.
-     */
-    @VisibleForTesting
-    static class TextBasedReader extends FileBasedReader<String> {
-      private static final int READ_BUFFER_SIZE = 8192;
-      private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
-      private ByteString buffer;
-      private int startOfSeparatorInBuffer;
-      private int endOfSeparatorInBuffer;
-      private long startOfRecord;
-      private volatile long startOfNextRecord;
-      private volatile boolean eof;
-      private volatile boolean elementIsPresent;
-      private String currentValue;
-      private ReadableByteChannel inChannel;
-
-      private TextBasedReader(TextSource source) {
-        super(source);
-        buffer = ByteString.EMPTY;
-      }
-
-      @Override
-      protected long getCurrentOffset() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return startOfRecord;
-      }
-
-      @Override
-      public long getSplitPointsRemaining() {
-        if (isStarted() && startOfNextRecord >= 
getCurrentSource().getEndOffset()) {
-          return isDone() ? 0 : 1;
-        }
-        return super.getSplitPointsRemaining();
-      }
-
-      @Override
-      public String getCurrent() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return currentValue;
-      }
-
-      @Override
-      protected void startReading(ReadableByteChannel channel) throws 
IOException {
-        this.inChannel = channel;
-        // If the first offset is greater than zero, we need to skip bytes 
until we see our
-        // first separator.
-        if (getCurrentSource().getStartOffset() > 0) {
-          checkState(channel instanceof SeekableByteChannel,
-              "%s only supports reading from a SeekableByteChannel when given 
a start offset"
-              + " greater than 0.", TextSource.class.getSimpleName());
-          long requiredPosition = getCurrentSource().getStartOffset() - 1;
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          findSeparatorBounds();
-          buffer = buffer.substring(endOfSeparatorInBuffer);
-          startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
-          endOfSeparatorInBuffer = 0;
-          startOfSeparatorInBuffer = 0;
-        }
-      }
-
-      /**
-       * Locates the start position and end position of the next delimiter. 
Will
-       * consume the channel till either EOF or the delimiter bounds are found.
-       *
-       * <p>This fills the buffer and updates the positions as follows:
-       * <pre>{@code
-       * ------------------------------------------------------
-       * | element bytes | delimiter bytes | unconsumed bytes |
-       * ------------------------------------------------------
-       * 0            start of          end of              buffer
-       *              separator         separator           size
-       *              in buffer         in buffer
-       * }</pre>
-       */
-      private void findSeparatorBounds() throws IOException {
-        int bytePositionInBuffer = 0;
-        while (true) {
-          if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
-            startOfSeparatorInBuffer = endOfSeparatorInBuffer = 
bytePositionInBuffer;
-            break;
-          }
-
-          byte currentByte = buffer.byteAt(bytePositionInBuffer);
-
-          if (currentByte == '\n') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-            break;
-          } else if (currentByte == '\r') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-
-            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
-              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
-              if (currentByte == '\n') {
-                endOfSeparatorInBuffer += 1;
-              }
-            }
-            break;
-          }
-
-          // Move to the next byte in buffer.
-          bytePositionInBuffer += 1;
-        }
-      }
-
-      @Override
-      protected boolean readNextRecord() throws IOException {
-        startOfRecord = startOfNextRecord;
-        findSeparatorBounds();
-
-        // If we have reached EOF file and consumed all of the buffer then we 
know
-        // that there are no more records.
-        if (eof && buffer.size() == 0) {
-          elementIsPresent = false;
-          return false;
-        }
-
-        decodeCurrentElement();
-        startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
-        return true;
-      }
-
-      /**
-       * Decodes the current element updating the buffer to only contain the 
unconsumed bytes.
-       *
-       * <p>This invalidates the currently stored {@code 
startOfSeparatorInBuffer} and
-       * {@code endOfSeparatorInBuffer}.
-       */
-      private void decodeCurrentElement() throws IOException {
-        ByteString dataToDecode = buffer.substring(0, 
startOfSeparatorInBuffer);
-        currentValue = dataToDecode.toStringUtf8();
-        elementIsPresent = true;
-        buffer = buffer.substring(endOfSeparatorInBuffer);
-      }
-
-      /**
-       * Returns false if we were unable to ensure the minimum capacity by 
consuming the channel.
-       */
-      private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws 
IOException {
-        // While we aren't at EOF or haven't fulfilled the minimum buffer 
capacity,
-        // attempt to read more bytes.
-        while (buffer.size() <= minCapacity && !eof) {
-          eof = inChannel.read(readBuffer) == -1;
-          readBuffer.flip();
-          buffer = buffer.concat(ByteString.copyFrom(readBuffer));
-          readBuffer.clear();
-        }
-        // Return true if we were able to honor the minimum buffer capacity 
request
-        return buffer.size() >= minCapacity;
-      }
-    }
-  }
-
-  /**
-   * A {@link FileBasedSink} for text files. Produces text files with the 
newline separator
-   * {@code '\n'} represented in {@code UTF-8} format as the record separator.
-   * Each record (including the last) is terminated.
-   */
-  @VisibleForTesting
-  static class TextSink extends FileBasedSink<String> {
-    @Nullable private final String header;
-    @Nullable private final String footer;
-
-    @VisibleForTesting
-    TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable 
String footer,
-             WritableByteChannelFactory writableByteChannelFactory) {
-      super(filenamePolicy, writableByteChannelFactory);
-      this.header = header;
-      this.footer = footer;
-    }
-    @VisibleForTesting
-    TextSink(
-        ValueProvider<String> baseOutputFilename,
-        String extension,
-        @Nullable String header,
-        @Nullable String footer,
-        String fileNameTemplate,
-        WritableByteChannelFactory writableByteChannelFactory) {
-      super(baseOutputFilename, extension, fileNameTemplate, 
writableByteChannelFactory);
-      this.header = header;
-      this.footer = footer;
-    }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<String> 
createWriteOperation() {
-      return new TextWriteOperation(this, header, footer);
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for text files.
-     */
-    private static class TextWriteOperation extends 
FileBasedWriteOperation<String> {
-      @Nullable private final String header;
-      @Nullable private final String footer;
-
-      private TextWriteOperation(TextSink sink, @Nullable String header, 
@Nullable String footer) {
-        super(sink);
-        this.header = header;
-        this.footer = footer;
-      }
-
-      @Override
-      public FileBasedWriter createWriter(PipelineOptions options) throws 
Exception {
-        return new TextWriter(this, header, footer);
-      }
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter 
FileBasedWriter}
-     * for text files.
-     */
-    private static class TextWriter extends FileBasedWriter<String> {
-      private static final String NEWLINE = "\n";
-      @Nullable private final String header;
-      @Nullable private final String footer;
-      private OutputStreamWriter out;
-
-      public TextWriter(
-          FileBasedWriteOperation<String> writeOperation,
-          @Nullable String header,
-          @Nullable String footer) {
-        super(writeOperation, MimeTypes.TEXT);
-        this.header = header;
-        this.footer = footer;
-      }
-
-      /**
-       * Writes {@code value} followed by a newline character if {@code value} 
is not null.
-       */
-      private void writeIfNotNull(@Nullable String value) throws IOException {
-        if (value != null) {
-          writeLine(value);
-        }
-      }
-
-      /**
-       * Writes {@code value} followed by newline character.
-       */
-      private void writeLine(String value) throws IOException {
-        out.write(value);
-        out.write(NEWLINE);
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
-        out = new OutputStreamWriter(Channels.newOutputStream(channel), 
StandardCharsets.UTF_8);
-      }
-
-      @Override
-      protected void writeHeader() throws Exception {
-        writeIfNotNull(header);
-      }
-
-      @Override
-      public void write(String value) throws Exception {
-        writeLine(value);
-      }
-
-      @Override
-      protected void writeFooter() throws Exception {
-        writeIfNotNull(footer);
-      }
-
-      @Override
-      protected void finishWrite() throws Exception {
-        out.flush();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
new file mode 100644
index 0000000..4efdc32
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * Implementation detail of {@link TextIO.Write}.
+ *
+ * <p>A {@link FileBasedSink} for text files. Produces text files with the 
newline separator {@code
+ * '\n'} represented in {@code UTF-8} format as the record separator. Each 
record (including the
+ * last) is terminated.
+ */
+class TextSink extends FileBasedSink<String> {
+  @Nullable private final String header;
+  @Nullable private final String footer;
+
+  TextSink(
+      FilenamePolicy filenamePolicy,
+      @Nullable String header,
+      @Nullable String footer,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    super(filenamePolicy, writableByteChannelFactory);
+    this.header = header;
+    this.footer = footer;
+  }
+
+  TextSink(
+      ValueProvider<String> baseOutputFilename,
+      String extension,
+      @Nullable String header,
+      @Nullable String footer,
+      String fileNameTemplate,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    super(baseOutputFilename, extension, fileNameTemplate, 
writableByteChannelFactory);
+    this.header = header;
+    this.footer = footer;
+  }
+
+  @Override
+  public FileBasedWriteOperation<String> createWriteOperation() {
+    return new TextWriteOperation(this, header, footer);
+  }
+
+  /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for text 
files. */
+  private static class TextWriteOperation extends 
FileBasedWriteOperation<String> {
+    @Nullable private final String header;
+    @Nullable private final String footer;
+
+    private TextWriteOperation(TextSink sink, @Nullable String header, 
@Nullable String footer) {
+      super(sink);
+      this.header = header;
+      this.footer = footer;
+    }
+
+    @Override
+    public FileBasedWriter<String> createWriter(PipelineOptions options) 
throws Exception {
+      return new TextWriter(this, header, footer);
+    }
+  }
+
+  /** A {@link FileBasedWriter FileBasedWriter} for text files. */
+  private static class TextWriter extends FileBasedWriter<String> {
+    private static final String NEWLINE = "\n";
+    @Nullable private final String header;
+    @Nullable private final String footer;
+    private OutputStreamWriter out;
+
+    public TextWriter(
+        FileBasedWriteOperation<String> writeOperation,
+        @Nullable String header,
+        @Nullable String footer) {
+      super(writeOperation, MimeTypes.TEXT);
+      this.header = header;
+      this.footer = footer;
+    }
+
+    /** Writes {@code value} followed by a newline character if {@code value} 
is not null. */
+    private void writeIfNotNull(@Nullable String value) throws IOException {
+      if (value != null) {
+        writeLine(value);
+      }
+    }
+
+    /** Writes {@code value} followed by newline character. */
+    private void writeLine(String value) throws IOException {
+      out.write(value);
+      out.write(NEWLINE);
+    }
+
+    @Override
+    protected void prepareWrite(WritableByteChannel channel) throws Exception {
+      out = new OutputStreamWriter(Channels.newOutputStream(channel), 
StandardCharsets.UTF_8);
+    }
+
+    @Override
+    protected void writeHeader() throws Exception {
+      writeIfNotNull(header);
+    }
+
+    @Override
+    public void write(String value) throws Exception {
+      writeLine(value);
+    }
+
+    @Override
+    protected void writeFooter() throws Exception {
+      writeIfNotNull(footer);
+    }
+
+    @Override
+    protected void finishWrite() throws Exception {
+      out.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
new file mode 100644
index 0000000..4d9fa77
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+
+/**
+ * Implementation detail of {@link TextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline 
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, 
{@code \r}, or
+ * {@code \r\n} as the delimiter. This source is not strict and supports 
decoding the last record
+ * even if it is not delimited. Finally, no records are decoded if the stream 
is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the 
stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first 
delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class TextSource extends FileBasedSource<String> {
+  TextSource(ValueProvider<String> fileSpec) {
+    super(fileSpec, 1L);
+  }
+
+  private TextSource(MatchResult.Metadata metadata, long start, long end) {
+    super(metadata, 1L, start, end);
+  }
+
+  @Override
+  protected FileBasedSource<String> createForSubrangeOfFile(
+      MatchResult.Metadata metadata,
+      long start,
+      long end) {
+    return new TextSource(metadata, start, end);
+  }
+
+  @Override
+  protected FileBasedReader<String> createSingleFileReader(PipelineOptions 
options) {
+    return new TextBasedReader(this);
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder() {
+    return StringUtf8Coder.of();
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader}
+   * which can decode records delimited by newline characters.
+   *
+   * <p>See {@link TextSource} for further details.
+   */
+  @VisibleForTesting
+  static class TextBasedReader extends FileBasedReader<String> {
+    private static final int READ_BUFFER_SIZE = 8192;
+    private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfSeparatorInBuffer;
+    private int endOfSeparatorInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private String currentValue;
+    private ReadableByteChannel inChannel;
+
+    private TextBasedReader(TextSource source) {
+      super(source);
+      buffer = ByteString.EMPTY;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= 
getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws 
IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until 
we see our
+      // first separator.
+      if (getCurrentSource().getStartOffset() > 0) {
+        checkState(channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a 
start offset"
+            + " greater than 0.", TextSource.class.getSimpleName());
+        long requiredPosition = getCurrentSource().getStartOffset() - 1;
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findSeparatorBounds();
+        buffer = buffer.substring(endOfSeparatorInBuffer);
+        startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
+        endOfSeparatorInBuffer = 0;
+        startOfSeparatorInBuffer = 0;
+      }
+    }
+
+    /**
+     * Locates the start position and end position of the next delimiter. Will
+     * consume the channel till either EOF or the delimiter bounds are found.
+     *
+     * <p>This fills the buffer and updates the positions as follows:
+     * <pre>{@code
+     * ------------------------------------------------------
+     * | element bytes | delimiter bytes | unconsumed bytes |
+     * ------------------------------------------------------
+     * 0            start of          end of              buffer
+     *              separator         separator           size
+     *              in buffer         in buffer
+     * }</pre>
+     */
+    private void findSeparatorBounds() throws IOException {
+      int bytePositionInBuffer = 0;
+      while (true) {
+        if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
+          startOfSeparatorInBuffer = endOfSeparatorInBuffer = 
bytePositionInBuffer;
+          break;
+        }
+
+        byte currentByte = buffer.byteAt(bytePositionInBuffer);
+
+        if (currentByte == '\n') {
+          startOfSeparatorInBuffer = bytePositionInBuffer;
+          endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
+          break;
+        } else if (currentByte == '\r') {
+          startOfSeparatorInBuffer = bytePositionInBuffer;
+          endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
+
+          if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
+            currentByte = buffer.byteAt(bytePositionInBuffer + 1);
+            if (currentByte == '\n') {
+              endOfSeparatorInBuffer += 1;
+            }
+          }
+          break;
+        }
+
+        // Move to the next byte in buffer.
+        bytePositionInBuffer += 1;
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      startOfRecord = startOfNextRecord;
+      findSeparatorBounds();
+
+      // If we have reached EOF file and consumed all of the buffer then we 
know
+      // that there are no more records.
+      if (eof && buffer.size() == 0) {
+        elementIsPresent = false;
+        return false;
+      }
+
+      decodeCurrentElement();
+      startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
+      return true;
+    }
+
+    /**
+     * Decodes the current element updating the buffer to only contain the 
unconsumed bytes.
+     *
+     * <p>This invalidates the currently stored {@code 
startOfSeparatorInBuffer} and
+     * {@code endOfSeparatorInBuffer}.
+     */
+    private void decodeCurrentElement() throws IOException {
+      ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
+      currentValue = dataToDecode.toStringUtf8();
+      elementIsPresent = true;
+      buffer = buffer.substring(endOfSeparatorInBuffer);
+    }
+
+    /**
+     * Returns false if we were unable to ensure the minimum capacity by 
consuming the channel.
+     */
+    private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws 
IOException {
+      // While we aren't at EOF or haven't fulfilled the minimum buffer 
capacity,
+      // attempt to read more bytes.
+      while (buffer.size() <= minCapacity && !eof) {
+        eof = inChannel.read(readBuffer) == -1;
+        readBuffer.flip();
+        buffer = buffer.concat(ByteString.copyFrom(readBuffer));
+        readBuffer.clear();
+      }
+      // Return true if we were able to honor the minimum buffer capacity 
request
+      return buffer.size() >= minCapacity;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 425e0d6..f30b52f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
-import org.apache.beam.sdk.io.TextIO.TextSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -1064,7 +1063,7 @@ public class TextIOTest {
   private TextSource prepareSource(byte[] data) throws IOException {
     Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
     Files.write(path, data);
-    return new TextSource(path.toString());
+    return new 
TextSource(ValueProvider.StaticValueProvider.of(path.toString()));
   }
 
   @Test

Reply via email to