Repository: beam Updated Branches: refs/heads/master c2c89eda9 -> 3161904d9
Moves TextSource and TextSink to top level Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b725c25 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b725c25 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b725c25 Branch: refs/heads/master Commit: 7b725c25288ae24eb89be3bf61e09e0e38c2b200 Parents: 681b5d6 Author: Eugene Kirpichov <[email protected]> Authored: Fri Apr 28 17:46:44 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue May 2 12:20:14 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 343 +------------------ .../java/org/apache/beam/sdk/io/TextSink.java | 139 ++++++++ .../java/org/apache/beam/sdk/io/TextSource.java | 236 +++++++++++++ .../java/org/apache/beam/sdk/io/TextIOTest.java | 3 +- 4 files changed, 377 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 90d56e7..1f9b7a0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -19,20 +19,8 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.charset.StandardCharsets; -import java.util.NoSuchElementException; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; @@ -41,13 +29,10 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -412,7 +397,7 @@ public class TextIO { throw new IllegalStateException( "cannot set both a filename policy and a filename prefix"); } - WriteFiles<String> write = null; + WriteFiles<String> write; if (getFilenamePolicy() != null) { write = WriteFiles.to( @@ -535,330 +520,4 @@ public class TextIO { /** Disable construction of utility class. */ private TextIO() {} - - /** - * A {@link FileBasedSource} which can decode records delimited by newline characters. - * - * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or - * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record - * even if it is not delimited. Finally, no records are decoded if the stream is empty. - * - * <p>This source supports reading from any arbitrary byte position within the stream. If the - * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found - * representing the beginning of the first record to be decoded. - */ - @VisibleForTesting - static class TextSource extends FileBasedSource<String> { - /** The Coder to use to decode each line. */ - @VisibleForTesting - TextSource(String fileSpec) { - super(StaticValueProvider.of(fileSpec), 1L); - } - - @VisibleForTesting - TextSource(ValueProvider<String> fileSpec) { - super(fileSpec, 1L); - } - - private TextSource(Metadata metadata, long start, long end) { - super(metadata, 1L, start, end); - } - - @Override - protected FileBasedSource<String> createForSubrangeOfFile( - Metadata metadata, - long start, - long end) { - return new TextSource(metadata, start, end); - } - - @Override - protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) { - return new TextBasedReader(this); - } - - @Override - public Coder<String> getDefaultOutputCoder() { - return StringUtf8Coder.of(); - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSource.FileBasedReader FileBasedReader} - * which can decode records delimited by newline characters. - * - * <p>See {@link TextSource} for further details. - */ - @VisibleForTesting - static class TextBasedReader extends FileBasedReader<String> { - private static final int READ_BUFFER_SIZE = 8192; - private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); - private ByteString buffer; - private int startOfSeparatorInBuffer; - private int endOfSeparatorInBuffer; - private long startOfRecord; - private volatile long startOfNextRecord; - private volatile boolean eof; - private volatile boolean elementIsPresent; - private String currentValue; - private ReadableByteChannel inChannel; - - private TextBasedReader(TextSource source) { - super(source); - buffer = ByteString.EMPTY; - } - - @Override - protected long getCurrentOffset() throws NoSuchElementException { - if (!elementIsPresent) { - throw new NoSuchElementException(); - } - return startOfRecord; - } - - @Override - public long getSplitPointsRemaining() { - if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) { - return isDone() ? 0 : 1; - } - return super.getSplitPointsRemaining(); - } - - @Override - public String getCurrent() throws NoSuchElementException { - if (!elementIsPresent) { - throw new NoSuchElementException(); - } - return currentValue; - } - - @Override - protected void startReading(ReadableByteChannel channel) throws IOException { - this.inChannel = channel; - // If the first offset is greater than zero, we need to skip bytes until we see our - // first separator. - if (getCurrentSource().getStartOffset() > 0) { - checkState(channel instanceof SeekableByteChannel, - "%s only supports reading from a SeekableByteChannel when given a start offset" - + " greater than 0.", TextSource.class.getSimpleName()); - long requiredPosition = getCurrentSource().getStartOffset() - 1; - ((SeekableByteChannel) channel).position(requiredPosition); - findSeparatorBounds(); - buffer = buffer.substring(endOfSeparatorInBuffer); - startOfNextRecord = requiredPosition + endOfSeparatorInBuffer; - endOfSeparatorInBuffer = 0; - startOfSeparatorInBuffer = 0; - } - } - - /** - * Locates the start position and end position of the next delimiter. Will - * consume the channel till either EOF or the delimiter bounds are found. - * - * <p>This fills the buffer and updates the positions as follows: - * <pre>{@code - * ------------------------------------------------------ - * | element bytes | delimiter bytes | unconsumed bytes | - * ------------------------------------------------------ - * 0 start of end of buffer - * separator separator size - * in buffer in buffer - * }</pre> - */ - private void findSeparatorBounds() throws IOException { - int bytePositionInBuffer = 0; - while (true) { - if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) { - startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer; - break; - } - - byte currentByte = buffer.byteAt(bytePositionInBuffer); - - if (currentByte == '\n') { - startOfSeparatorInBuffer = bytePositionInBuffer; - endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1; - break; - } else if (currentByte == '\r') { - startOfSeparatorInBuffer = bytePositionInBuffer; - endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1; - - if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) { - currentByte = buffer.byteAt(bytePositionInBuffer + 1); - if (currentByte == '\n') { - endOfSeparatorInBuffer += 1; - } - } - break; - } - - // Move to the next byte in buffer. - bytePositionInBuffer += 1; - } - } - - @Override - protected boolean readNextRecord() throws IOException { - startOfRecord = startOfNextRecord; - findSeparatorBounds(); - - // If we have reached EOF file and consumed all of the buffer then we know - // that there are no more records. - if (eof && buffer.size() == 0) { - elementIsPresent = false; - return false; - } - - decodeCurrentElement(); - startOfNextRecord = startOfRecord + endOfSeparatorInBuffer; - return true; - } - - /** - * Decodes the current element updating the buffer to only contain the unconsumed bytes. - * - * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and - * {@code endOfSeparatorInBuffer}. - */ - private void decodeCurrentElement() throws IOException { - ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer); - currentValue = dataToDecode.toStringUtf8(); - elementIsPresent = true; - buffer = buffer.substring(endOfSeparatorInBuffer); - } - - /** - * Returns false if we were unable to ensure the minimum capacity by consuming the channel. - */ - private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException { - // While we aren't at EOF or haven't fulfilled the minimum buffer capacity, - // attempt to read more bytes. - while (buffer.size() <= minCapacity && !eof) { - eof = inChannel.read(readBuffer) == -1; - readBuffer.flip(); - buffer = buffer.concat(ByteString.copyFrom(readBuffer)); - readBuffer.clear(); - } - // Return true if we were able to honor the minimum buffer capacity request - return buffer.size() >= minCapacity; - } - } - } - - /** - * A {@link FileBasedSink} for text files. Produces text files with the newline separator - * {@code '\n'} represented in {@code UTF-8} format as the record separator. - * Each record (including the last) is terminated. - */ - @VisibleForTesting - static class TextSink extends FileBasedSink<String> { - @Nullable private final String header; - @Nullable private final String footer; - - @VisibleForTesting - TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer, - WritableByteChannelFactory writableByteChannelFactory) { - super(filenamePolicy, writableByteChannelFactory); - this.header = header; - this.footer = footer; - } - @VisibleForTesting - TextSink( - ValueProvider<String> baseOutputFilename, - String extension, - @Nullable String header, - @Nullable String footer, - String fileNameTemplate, - WritableByteChannelFactory writableByteChannelFactory) { - super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory); - this.header = header; - this.footer = footer; - } - - @Override - public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation() { - return new TextWriteOperation(this, header, footer); - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation - * FileBasedWriteOperation} for text files. - */ - private static class TextWriteOperation extends FileBasedWriteOperation<String> { - @Nullable private final String header; - @Nullable private final String footer; - - private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) { - super(sink); - this.header = header; - this.footer = footer; - } - - @Override - public FileBasedWriter createWriter(PipelineOptions options) throws Exception { - return new TextWriter(this, header, footer); - } - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} - * for text files. - */ - private static class TextWriter extends FileBasedWriter<String> { - private static final String NEWLINE = "\n"; - @Nullable private final String header; - @Nullable private final String footer; - private OutputStreamWriter out; - - public TextWriter( - FileBasedWriteOperation<String> writeOperation, - @Nullable String header, - @Nullable String footer) { - super(writeOperation, MimeTypes.TEXT); - this.header = header; - this.footer = footer; - } - - /** - * Writes {@code value} followed by a newline character if {@code value} is not null. - */ - private void writeIfNotNull(@Nullable String value) throws IOException { - if (value != null) { - writeLine(value); - } - } - - /** - * Writes {@code value} followed by newline character. - */ - private void writeLine(String value) throws IOException { - out.write(value); - out.write(NEWLINE); - } - - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8); - } - - @Override - protected void writeHeader() throws Exception { - writeIfNotNull(header); - } - - @Override - public void write(String value) throws Exception { - writeLine(value); - } - - @Override - protected void writeFooter() throws Exception { - writeIfNotNull(footer); - } - - @Override - protected void finishWrite() throws Exception { - out.flush(); - } - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java new file mode 100644 index 0000000..4efdc32 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.util.MimeTypes; + +/** + * Implementation detail of {@link TextIO.Write}. + * + * <p>A {@link FileBasedSink} for text files. Produces text files with the newline separator {@code + * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the + * last) is terminated. + */ +class TextSink extends FileBasedSink<String> { + @Nullable private final String header; + @Nullable private final String footer; + + TextSink( + FilenamePolicy filenamePolicy, + @Nullable String header, + @Nullable String footer, + WritableByteChannelFactory writableByteChannelFactory) { + super(filenamePolicy, writableByteChannelFactory); + this.header = header; + this.footer = footer; + } + + TextSink( + ValueProvider<String> baseOutputFilename, + String extension, + @Nullable String header, + @Nullable String footer, + String fileNameTemplate, + WritableByteChannelFactory writableByteChannelFactory) { + super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory); + this.header = header; + this.footer = footer; + } + + @Override + public FileBasedWriteOperation<String> createWriteOperation() { + return new TextWriteOperation(this, header, footer); + } + + /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for text files. */ + private static class TextWriteOperation extends FileBasedWriteOperation<String> { + @Nullable private final String header; + @Nullable private final String footer; + + private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) { + super(sink); + this.header = header; + this.footer = footer; + } + + @Override + public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception { + return new TextWriter(this, header, footer); + } + } + + /** A {@link FileBasedWriter FileBasedWriter} for text files. */ + private static class TextWriter extends FileBasedWriter<String> { + private static final String NEWLINE = "\n"; + @Nullable private final String header; + @Nullable private final String footer; + private OutputStreamWriter out; + + public TextWriter( + FileBasedWriteOperation<String> writeOperation, + @Nullable String header, + @Nullable String footer) { + super(writeOperation, MimeTypes.TEXT); + this.header = header; + this.footer = footer; + } + + /** Writes {@code value} followed by a newline character if {@code value} is not null. */ + private void writeIfNotNull(@Nullable String value) throws IOException { + if (value != null) { + writeLine(value); + } + } + + /** Writes {@code value} followed by newline character. */ + private void writeLine(String value) throws IOException { + out.write(value); + out.write(NEWLINE); + } + + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8); + } + + @Override + protected void writeHeader() throws Exception { + writeIfNotNull(header); + } + + @Override + public void write(String value) throws Exception { + writeLine(value); + } + + @Override + protected void writeFooter() throws Exception { + writeIfNotNull(footer); + } + + @Override + protected void finishWrite() throws Exception { + out.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java new file mode 100644 index 0000000..4d9fa77 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; + +/** + * Implementation detail of {@link TextIO.Read}. + * + * <p>A {@link FileBasedSource} which can decode records delimited by newline characters. + * + * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or + * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record + * even if it is not delimited. Finally, no records are decoded if the stream is empty. + * + * <p>This source supports reading from any arbitrary byte position within the stream. If the + * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found + * representing the beginning of the first record to be decoded. + */ +@VisibleForTesting +class TextSource extends FileBasedSource<String> { + TextSource(ValueProvider<String> fileSpec) { + super(fileSpec, 1L); + } + + private TextSource(MatchResult.Metadata metadata, long start, long end) { + super(metadata, 1L, start, end); + } + + @Override + protected FileBasedSource<String> createForSubrangeOfFile( + MatchResult.Metadata metadata, + long start, + long end) { + return new TextSource(metadata, start, end); + } + + @Override + protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) { + return new TextBasedReader(this); + } + + @Override + public Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + /** + * A {@link FileBasedReader FileBasedReader} + * which can decode records delimited by newline characters. + * + * <p>See {@link TextSource} for further details. + */ + @VisibleForTesting + static class TextBasedReader extends FileBasedReader<String> { + private static final int READ_BUFFER_SIZE = 8192; + private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); + private ByteString buffer; + private int startOfSeparatorInBuffer; + private int endOfSeparatorInBuffer; + private long startOfRecord; + private volatile long startOfNextRecord; + private volatile boolean eof; + private volatile boolean elementIsPresent; + private String currentValue; + private ReadableByteChannel inChannel; + + private TextBasedReader(TextSource source) { + super(source); + buffer = ByteString.EMPTY; + } + + @Override + protected long getCurrentOffset() throws NoSuchElementException { + if (!elementIsPresent) { + throw new NoSuchElementException(); + } + return startOfRecord; + } + + @Override + public long getSplitPointsRemaining() { + if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) { + return isDone() ? 0 : 1; + } + return super.getSplitPointsRemaining(); + } + + @Override + public String getCurrent() throws NoSuchElementException { + if (!elementIsPresent) { + throw new NoSuchElementException(); + } + return currentValue; + } + + @Override + protected void startReading(ReadableByteChannel channel) throws IOException { + this.inChannel = channel; + // If the first offset is greater than zero, we need to skip bytes until we see our + // first separator. + if (getCurrentSource().getStartOffset() > 0) { + checkState(channel instanceof SeekableByteChannel, + "%s only supports reading from a SeekableByteChannel when given a start offset" + + " greater than 0.", TextSource.class.getSimpleName()); + long requiredPosition = getCurrentSource().getStartOffset() - 1; + ((SeekableByteChannel) channel).position(requiredPosition); + findSeparatorBounds(); + buffer = buffer.substring(endOfSeparatorInBuffer); + startOfNextRecord = requiredPosition + endOfSeparatorInBuffer; + endOfSeparatorInBuffer = 0; + startOfSeparatorInBuffer = 0; + } + } + + /** + * Locates the start position and end position of the next delimiter. Will + * consume the channel till either EOF or the delimiter bounds are found. + * + * <p>This fills the buffer and updates the positions as follows: + * <pre>{@code + * ------------------------------------------------------ + * | element bytes | delimiter bytes | unconsumed bytes | + * ------------------------------------------------------ + * 0 start of end of buffer + * separator separator size + * in buffer in buffer + * }</pre> + */ + private void findSeparatorBounds() throws IOException { + int bytePositionInBuffer = 0; + while (true) { + if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) { + startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer; + break; + } + + byte currentByte = buffer.byteAt(bytePositionInBuffer); + + if (currentByte == '\n') { + startOfSeparatorInBuffer = bytePositionInBuffer; + endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1; + break; + } else if (currentByte == '\r') { + startOfSeparatorInBuffer = bytePositionInBuffer; + endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1; + + if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) { + currentByte = buffer.byteAt(bytePositionInBuffer + 1); + if (currentByte == '\n') { + endOfSeparatorInBuffer += 1; + } + } + break; + } + + // Move to the next byte in buffer. + bytePositionInBuffer += 1; + } + } + + @Override + protected boolean readNextRecord() throws IOException { + startOfRecord = startOfNextRecord; + findSeparatorBounds(); + + // If we have reached EOF file and consumed all of the buffer then we know + // that there are no more records. + if (eof && buffer.size() == 0) { + elementIsPresent = false; + return false; + } + + decodeCurrentElement(); + startOfNextRecord = startOfRecord + endOfSeparatorInBuffer; + return true; + } + + /** + * Decodes the current element updating the buffer to only contain the unconsumed bytes. + * + * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and + * {@code endOfSeparatorInBuffer}. + */ + private void decodeCurrentElement() throws IOException { + ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer); + currentValue = dataToDecode.toStringUtf8(); + elementIsPresent = true; + buffer = buffer.substring(endOfSeparatorInBuffer); + } + + /** + * Returns false if we were unable to ensure the minimum capacity by consuming the channel. + */ + private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException { + // While we aren't at EOF or haven't fulfilled the minimum buffer capacity, + // attempt to read more bytes. + while (buffer.size() <= minCapacity && !eof) { + eof = inChannel.read(readBuffer) == -1; + readBuffer.flip(); + buffer = buffer.concat(ByteString.copyFrom(readBuffer)); + readBuffer.clear(); + } + // Return true if we were able to honor the minimum buffer capacity request + return buffer.size() >= minCapacity; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 425e0d6..f30b52f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -73,7 +73,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; -import org.apache.beam.sdk.io.TextIO.TextSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -1064,7 +1063,7 @@ public class TextIOTest { private TextSource prepareSource(byte[] data) throws IOException { Path path = Files.createTempFile(tempFolder, "tempfile", "ext"); Files.write(path, data); - return new TextSource(path.toString()); + return new TextSource(ValueProvider.StaticValueProvider.of(path.toString())); } @Test
