Repository: beam Updated Branches: refs/heads/master c9653f270 -> b844126c8
[BEAM-2802] Support multi-byte custom separator in TextIO Supports only separators that can not self-overlap, because self-overlapping separators cause ambiguous parsing. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b6cde06 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b6cde06 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b6cde06 Branch: refs/heads/master Commit: 1b6cde067ce78e1ce780b66e0cf1c883ce901959 Parents: c9653f2 Author: Etienne Chauchot <[email protected]> Authored: Fri Aug 25 17:23:51 2017 +0200 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Sep 1 11:51:17 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 69 +++++++++-- .../java/org/apache/beam/sdk/io/TextSource.java | 117 ++++++++++++------- .../org/apache/beam/sdk/io/TextIOReadTest.java | 88 +++++++++++--- 3 files changed, 207 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1b6cde06/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 76102cb..7832168 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -27,6 +27,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; + +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -63,7 +66,8 @@ import org.joda.time.Duration; * PCollection}, apply {@link TextIO#readAll()}. * * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to - * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n'). + * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', + * or specified delimiter see {@link TextIO.Read#withDelimiter}). * * <h3>Filepattern expansion and watching</h3> * @@ -255,7 +259,8 @@ public class TextIO { /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<String>> { - @Nullable abstract ValueProvider<String> getFilepattern(); + @Nullable + abstract ValueProvider<String> getFilepattern(); abstract Compression getCompression(); @Nullable @@ -266,6 +271,8 @@ public class TextIO { abstract boolean getHintMatchesManyFiles(); abstract EmptyMatchTreatment getEmptyMatchTreatment(); + @Nullable + abstract byte[] getDelimiter(); abstract Builder toBuilder(); @@ -278,6 +285,7 @@ public class TextIO { TerminationCondition<?, ?> condition); abstract Builder setHintMatchesManyFiles(boolean hintManyFiles); abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); + abstract Builder setDelimiter(byte[] delimiter); abstract Read build(); } @@ -360,6 +368,25 @@ public class TextIO { return toBuilder().setEmptyMatchTreatment(treatment).build(); } + /** + * Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). + */ + public Read withDelimiter(byte[] delimiter) { + checkArgument(delimiter != null, "delimiter can not be null"); + checkArgument(!isSelfOverlapping(delimiter), "delimiter must not self-overlap"); + return toBuilder().setDelimiter(delimiter).build(); + } + + static boolean isSelfOverlapping(byte[] s) { + // s self-overlaps if v exists such as s = vu = wv with u and w non empty + for (int i = 1; i < s.length - 1; ++i) { + if (ByteBuffer.wrap(s, 0, i).equals(ByteBuffer.wrap(s, s.length - i, i))) { + return true; + } + } + return false; + } + @Override public PCollection<String> expand(PBegin input) { checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform"); @@ -370,7 +397,8 @@ public class TextIO { ReadAll readAll = readAll() .withCompression(getCompression()) - .withEmptyMatchTreatment(getEmptyMatchTreatment()); + .withEmptyMatchTreatment(getEmptyMatchTreatment()) + .withDelimiter(getDelimiter()); if (getWatchForNewFilesInterval() != null) { TerminationCondition<String, ?> readAllCondition = ignoreInput(getWatchForNewFilesTerminationCondition()); @@ -383,7 +411,8 @@ public class TextIO { // Helper to create a source specific to the requested compression type. protected FileBasedSource<String> getSource() { - return CompressedSource.from(new TextSource(getFilepattern(), getEmptyMatchTreatment())) + return CompressedSource + .from(new TextSource(getFilepattern(), getEmptyMatchTreatment(), getDelimiter())) .withCompression(getCompression()); } @@ -401,7 +430,11 @@ public class TextIO { .withLabel("Treatment of filepatterns that match no files")) .addIfNotNull( DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) - .withLabel("Interval to watch for new files")); + .withLabel("Interval to watch for new files")) + .addIfNotNull( + DisplayData.item("delimiter", Arrays.toString(getDelimiter())) + .withLabel("Custom delimiter to split records")); + } } @@ -421,6 +454,8 @@ public class TextIO { abstract EmptyMatchTreatment getEmptyMatchTreatment(); abstract long getDesiredBundleSizeBytes(); + @Nullable + abstract byte[] getDelimiter(); abstract Builder toBuilder(); @@ -432,7 +467,7 @@ public class TextIO { TerminationCondition<String, ?> condition); abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); - + abstract Builder setDelimiter(byte[] delimiter); abstract ReadAll build(); } @@ -471,6 +506,10 @@ public class TextIO { return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); } + ReadAll withDelimiter(byte[] delimiter) { + return toBuilder().setDelimiter(delimiter).build(); + } + @Override public PCollection<String> expand(PCollection<String> input) { Match.Filepatterns matchFilepatterns = @@ -487,34 +526,40 @@ public class TextIO { new ReadAllViaFileBasedSource<>( new IsSplittableFn(getCompression()), getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment()))) - .setCoder(StringUtf8Coder.of()); + new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment(), + getDelimiter()))).setCoder(StringUtf8Coder.of()); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add( + builder + .add( DisplayData.item("compressionType", getCompression().toString()) - .withLabel("Compression Type")); + .withLabel("Compression Type")) + .addIfNotNull( + DisplayData.item("delimiter", Arrays.toString(getDelimiter())) + .withLabel("Custom delimiter to split records")); } private static class CreateTextSourceFn implements SerializableFunction<String, FileBasedSource<String>> { private final Compression compression; private final EmptyMatchTreatment emptyMatchTreatment; + private byte[] delimiter; private CreateTextSourceFn( - Compression compression, EmptyMatchTreatment emptyMatchTreatment) { + Compression compression, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { this.compression = compression; this.emptyMatchTreatment = emptyMatchTreatment; + this.delimiter = delimiter; } @Override public FileBasedSource<String> apply(String input) { return CompressedSource.from( - new TextSource(StaticValueProvider.of(input), emptyMatchTreatment)) + new TextSource(StaticValueProvider.of(input), emptyMatchTreatment, delimiter)) .withCompression(compression); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1b6cde06/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 29188dc..f3e4f77 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -48,16 +48,17 @@ import org.apache.beam.sdk.options.ValueProvider; */ @VisibleForTesting class TextSource extends FileBasedSource<String> { - TextSource(ValueProvider<String> fileSpec) { - this(fileSpec, EmptyMatchTreatment.DISALLOW); - } + byte[] delimiter; - TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) { + TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, + byte[] delimiter) { super(fileSpec, emptyMatchTreatment, 1L); + this.delimiter = delimiter; } - private TextSource(MatchResult.Metadata metadata, long start, long end) { + private TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { super(metadata, 1L, start, end); + this.delimiter = delimiter; } @Override @@ -65,12 +66,13 @@ class TextSource extends FileBasedSource<String> { MatchResult.Metadata metadata, long start, long end) { - return new TextSource(metadata, start, end); + return new TextSource(metadata, start, end, delimiter); + } @Override protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) { - return new TextBasedReader(this); + return new TextBasedReader(this, delimiter); } @Override @@ -80,7 +82,7 @@ class TextSource extends FileBasedSource<String> { /** * A {@link FileBasedReader FileBasedReader} - * which can decode records delimited by newline characters. + * which can decode records delimited by delimiter characters. * * <p>See {@link TextSource} for further details. */ @@ -89,18 +91,20 @@ class TextSource extends FileBasedSource<String> { private static final int READ_BUFFER_SIZE = 8192; private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteString buffer; - private int startOfSeparatorInBuffer; - private int endOfSeparatorInBuffer; + private int startOfDelimiterInBuffer; + private int endOfDelimiterInBuffer; private long startOfRecord; private volatile long startOfNextRecord; private volatile boolean eof; private volatile boolean elementIsPresent; private String currentValue; private ReadableByteChannel inChannel; + private byte[] delimiter; - private TextBasedReader(TextSource source) { + private TextBasedReader(TextSource source, byte[] delimiter) { super(source); buffer = ByteString.EMPTY; + this.delimiter = delimiter; } @Override @@ -131,18 +135,24 @@ class TextSource extends FileBasedSource<String> { protected void startReading(ReadableByteChannel channel) throws IOException { this.inChannel = channel; // If the first offset is greater than zero, we need to skip bytes until we see our - // first separator. - if (getCurrentSource().getStartOffset() > 0) { + // first delimiter. + long startOffset = getCurrentSource().getStartOffset(); + if (startOffset > 0) { checkState(channel instanceof SeekableByteChannel, "%s only supports reading from a SeekableByteChannel when given a start offset" + " greater than 0.", TextSource.class.getSimpleName()); - long requiredPosition = getCurrentSource().getStartOffset() - 1; + long requiredPosition = startOffset - 1; + if (delimiter != null && startOffset >= delimiter.length) { + // we need to move back the offset of at worse delimiter.size to be sure to see + // all the bytes of the delimiter in the call to findDelimiterBounds() below + requiredPosition = startOffset - delimiter.length; + } ((SeekableByteChannel) channel).position(requiredPosition); - findSeparatorBounds(); - buffer = buffer.substring(endOfSeparatorInBuffer); - startOfNextRecord = requiredPosition + endOfSeparatorInBuffer; - endOfSeparatorInBuffer = 0; - startOfSeparatorInBuffer = 0; + findDelimiterBounds(); + buffer = buffer.substring(endOfDelimiterInBuffer); + startOfNextRecord = requiredPosition + endOfDelimiterInBuffer; + endOfDelimiterInBuffer = 0; + startOfDelimiterInBuffer = 0; } } @@ -156,37 +166,60 @@ class TextSource extends FileBasedSource<String> { * | element bytes | delimiter bytes | unconsumed bytes | * ------------------------------------------------------ * 0 start of end of buffer - * separator separator size + * delimiter delimiter size * in buffer in buffer * }</pre> */ - private void findSeparatorBounds() throws IOException { + private void findDelimiterBounds() throws IOException { int bytePositionInBuffer = 0; while (true) { if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) { - startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer; + startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer; break; } byte currentByte = buffer.byteAt(bytePositionInBuffer); - if (currentByte == '\n') { - startOfSeparatorInBuffer = bytePositionInBuffer; - endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1; - break; - } else if (currentByte == '\r') { - startOfSeparatorInBuffer = bytePositionInBuffer; - endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1; - - if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) { - currentByte = buffer.byteAt(bytePositionInBuffer + 1); - if (currentByte == '\n') { - endOfSeparatorInBuffer += 1; + if (delimiter == null) { + // default delimiter + if (currentByte == '\n') { + startOfDelimiterInBuffer = bytePositionInBuffer; + endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1; + break; + } else if (currentByte == '\r') { + startOfDelimiterInBuffer = bytePositionInBuffer; + endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1; + + if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) { + currentByte = buffer.byteAt(bytePositionInBuffer + 1); + if (currentByte == '\n') { + endOfDelimiterInBuffer += 1; + } } + break; + } + } else { + // user defined delimiter + int i = 0; + // initialize delimiter not found + startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer; + while ((i <= delimiter.length - 1) && (currentByte == delimiter[i])) { + // read next byte + i++; + if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 1)) { + currentByte = buffer.byteAt(bytePositionInBuffer + i); + } else { + // corner case: delimiter truncated at the end of the file + startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer; + break; + } + } + if (i == delimiter.length) { + // all bytes of delimiter found + endOfDelimiterInBuffer = bytePositionInBuffer + i; + break; } - break; } - // Move to the next byte in buffer. bytePositionInBuffer += 1; } @@ -195,7 +228,7 @@ class TextSource extends FileBasedSource<String> { @Override protected boolean readNextRecord() throws IOException { startOfRecord = startOfNextRecord; - findSeparatorBounds(); + findDelimiterBounds(); // If we have reached EOF file and consumed all of the buffer then we know // that there are no more records. @@ -205,21 +238,21 @@ class TextSource extends FileBasedSource<String> { } decodeCurrentElement(); - startOfNextRecord = startOfRecord + endOfSeparatorInBuffer; + startOfNextRecord = startOfRecord + endOfDelimiterInBuffer; return true; } /** * Decodes the current element updating the buffer to only contain the unconsumed bytes. * - * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and - * {@code endOfSeparatorInBuffer}. + * <p>This invalidates the currently stored {@code startOfDelimiterInBuffer} and + * {@code endOfDelimiterInBuffer}. */ private void decodeCurrentElement() throws IOException { - ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer); + ByteString dataToDecode = buffer.substring(0, startOfDelimiterInBuffer); currentValue = dataToDecode.toStringUtf8(); elementIsPresent = true; - buffer = buffer.substring(endOfSeparatorInBuffer); + buffer = buffer.substring(endOfDelimiterInBuffer); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/1b6cde06/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 65253f9..e55a820 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.io.Compression.AUTO; @@ -40,10 +41,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.File; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; @@ -212,6 +216,60 @@ public class TextIOReadTest { } @Test + public void testDelimiterSelfOverlaps(){ + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 'a', 'b'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b', 'd'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b'})); + } + + @Test + @Category(NeedsRunner.class) + public void testReadStringsWithCustomDelimiter() throws Exception { + final String[] inputStrings = new String[] { + // incomplete delimiter + "To be, or not to be: that |is the question: ", + // incomplete delimiter + "To be, or not to be: that *is the question: ", + // complete delimiter + "Whether 'tis nobler in the mind to suffer |*", + // truncated delimiter + "The slings and arrows of outrageous fortune,|" }; + + File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); + String filename = tmpFile.getPath(); + + try (FileWriter writer = new FileWriter(tmpFile)) { + writer.write(Joiner.on("").join(inputStrings)); + } + + PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] {'|', '*'}))) + .containsInAnyOrder( + "To be, or not to be: that |is the question: To be, or not to be: " + + "that *is the question: Whether 'tis nobler in the mind to suffer ", + "The slings and arrows of outrageous fortune,|"); + p.run(); + } + + @Test + public void testSplittingSourceWithCustomDelimiter() throws Exception { + List<String> testCases = Lists.newArrayList(); + String infix = "first|*second|*|*third"; + String[] affixes = new String[] {"", "|", "*", "|*"}; + for (String prefix : affixes) { + for (String suffix : affixes) { + testCases.add(prefix + infix + suffix); + } + } + for (String testCase : testCases) { + SourceTestUtils.assertSplitAtFractionExhaustive( + prepareSource(testCase.getBytes(StandardCharsets.UTF_8), new byte[] {'|', '*'}), + PipelineOptionsFactory.create()); + } + } + + @Test @Category(NeedsRunner.class) public void testReadStrings() throws Exception { runTestRead(LINES_ARRAY); @@ -555,7 +613,7 @@ public class TextIOReadTest { @Test public void testProgressEmptyFile() throws IOException { try (BoundedReader<String> reader = - prepareSource(new byte[0]).createReader(PipelineOptionsFactory.create())) { + prepareSource(new byte[0], null).createReader(PipelineOptionsFactory.create())) { // Check preconditions before starting. assertEquals(0.0, reader.getFractionConsumed(), 1e-6); assertEquals(0, reader.getSplitPointsConsumed()); @@ -575,7 +633,7 @@ public class TextIOReadTest { public void testProgressTextFile() throws IOException { String file = "line1\nline2\nline3"; try (BoundedReader<String> reader = - prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) { + prepareSource(file.getBytes(), null).createReader(PipelineOptionsFactory.create())) { // Check preconditions before starting assertEquals(0.0, reader.getFractionConsumed(), 1e-6); assertEquals(0, reader.getSplitPointsConsumed()); @@ -733,65 +791,69 @@ public class TextIOReadTest { @Test public void testSplittingSourceWithEmptyLines() throws Exception { - TextSource source = prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("\n\n\n".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithLineFeedDelimiter() throws Exception { - TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnDelimiter() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() throws Exception { - TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz\r\n".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithMixedDelimiters() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\r\nxyz\n".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } private TextSource prepareSource(byte[] data) throws IOException { + return prepareSource(data, null /* default delimiters */); + } + + private TextSource prepareSource(byte[] data, byte[] delimiter) throws IOException { Path path = Files.createTempFile(tempFolder, "tempfile", "ext"); Files.write(path, data); - return new TextSource( - ValueProvider.StaticValueProvider.of(path.toString()), EmptyMatchTreatment.DISALLOW); + return new TextSource(ValueProvider.StaticValueProvider.of(path.toString()), + EmptyMatchTreatment.DISALLOW, delimiter); } @Test
