Repository: beam
Updated Branches:
  refs/heads/master c9653f270 -> b844126c8


[BEAM-2802] Support multi-byte custom separator in TextIO

Supports only separators that can not self-overlap,
because self-overlapping separators cause ambiguous parsing.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b6cde06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b6cde06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b6cde06

Branch: refs/heads/master
Commit: 1b6cde067ce78e1ce780b66e0cf1c883ce901959
Parents: c9653f2
Author: Etienne Chauchot <[email protected]>
Authored: Fri Aug 25 17:23:51 2017 +0200
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Sep 1 11:51:17 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     |  69 +++++++++--
 .../java/org/apache/beam/sdk/io/TextSource.java | 117 ++++++++++++-------
 .../org/apache/beam/sdk/io/TextIOReadTest.java  |  88 +++++++++++---
 3 files changed, 207 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1b6cde06/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 76102cb..7832168 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -27,6 +27,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -63,7 +66,8 @@ import org.joda.time.Duration;
  * PCollection}, apply {@link TextIO#readAll()}.
  *
  * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, 
each corresponding to
- * one line of an input UTF-8 text file (split into lines delimited by '\n', 
'\r', or '\r\n').
+ * one line of an input UTF-8 text file (split into lines delimited by '\n', 
'\r', or '\r\n',
+ * or specified delimiter see {@link TextIO.Read#withDelimiter}).
  *
  * <h3>Filepattern expansion and watching</h3>
  *
@@ -255,7 +259,8 @@ public class TextIO {
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, 
PCollection<String>> {
-    @Nullable abstract ValueProvider<String> getFilepattern();
+    @Nullable
+    abstract ValueProvider<String> getFilepattern();
     abstract Compression getCompression();
 
     @Nullable
@@ -266,6 +271,8 @@ public class TextIO {
 
     abstract boolean getHintMatchesManyFiles();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable
+    abstract byte[] getDelimiter();
 
     abstract Builder toBuilder();
 
@@ -278,6 +285,7 @@ public class TextIO {
               TerminationCondition<?, ?> condition);
       abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
       abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+      abstract Builder setDelimiter(byte[] delimiter);
 
       abstract Read build();
     }
@@ -360,6 +368,25 @@ public class TextIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /**
+     * Set the custom delimiter to be used in place of the default ones ('\r', 
'\n' or '\r\n').
+     */
+    public Read withDelimiter(byte[] delimiter) {
+      checkArgument(delimiter != null, "delimiter can not be null");
+      checkArgument(!isSelfOverlapping(delimiter), "delimiter must not 
self-overlap");
+      return toBuilder().setDelimiter(delimiter).build();
+    }
+
+    static boolean isSelfOverlapping(byte[] s) {
+      // s self-overlaps if v exists such as s = vu = wv with u and w non empty
+      for (int i = 1; i < s.length - 1; ++i) {
+        if (ByteBuffer.wrap(s, 0, i).equals(ByteBuffer.wrap(s, s.length - i, 
i))) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     @Override
     public PCollection<String> expand(PBegin input) {
       checkNotNull(getFilepattern(), "need to set the filepattern of a 
TextIO.Read transform");
@@ -370,7 +397,8 @@ public class TextIO {
       ReadAll readAll =
           readAll()
               .withCompression(getCompression())
-              .withEmptyMatchTreatment(getEmptyMatchTreatment());
+              .withEmptyMatchTreatment(getEmptyMatchTreatment())
+              .withDelimiter(getDelimiter());
       if (getWatchForNewFilesInterval() != null) {
         TerminationCondition<String, ?> readAllCondition =
             ignoreInput(getWatchForNewFilesTerminationCondition());
@@ -383,7 +411,8 @@ public class TextIO {
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      return CompressedSource.from(new TextSource(getFilepattern(), 
getEmptyMatchTreatment()))
+      return CompressedSource
+          .from(new TextSource(getFilepattern(), getEmptyMatchTreatment(), 
getDelimiter()))
           .withCompression(getCompression());
     }
 
@@ -401,7 +430,11 @@ public class TextIO {
                   .withLabel("Treatment of filepatterns that match no files"))
           .addIfNotNull(
               DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
-                  .withLabel("Interval to watch for new files"));
+                  .withLabel("Interval to watch for new files"))
+          .addIfNotNull(
+              DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
+              .withLabel("Custom delimiter to split records"));
+
     }
   }
 
@@ -421,6 +454,8 @@ public class TextIO {
 
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract long getDesiredBundleSizeBytes();
+    @Nullable
+    abstract byte[] getDelimiter();
 
     abstract Builder toBuilder();
 
@@ -432,7 +467,7 @@ public class TextIO {
           TerminationCondition<String, ?> condition);
       abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
       abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
-
+      abstract Builder setDelimiter(byte[] delimiter);
       abstract ReadAll build();
     }
 
@@ -471,6 +506,10 @@ public class TextIO {
       return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
     }
 
+    ReadAll withDelimiter(byte[] delimiter) {
+      return toBuilder().setDelimiter(delimiter).build();
+    }
+
     @Override
     public PCollection<String> expand(PCollection<String> input) {
       Match.Filepatterns matchFilepatterns =
@@ -487,34 +526,40 @@ public class TextIO {
               new ReadAllViaFileBasedSource<>(
                   new IsSplittableFn(getCompression()),
                   getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getCompression(), 
getEmptyMatchTreatment())))
-          .setCoder(StringUtf8Coder.of());
+                  new CreateTextSourceFn(getCompression(), 
getEmptyMatchTreatment(),
+                      getDelimiter()))).setCoder(StringUtf8Coder.of());
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      builder.add(
+      builder
+          .add(
           DisplayData.item("compressionType", getCompression().toString())
-              .withLabel("Compression Type"));
+              .withLabel("Compression Type"))
+          .addIfNotNull(
+          DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
+              .withLabel("Custom delimiter to split records"));
     }
 
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
       private final Compression compression;
       private final EmptyMatchTreatment emptyMatchTreatment;
+      private byte[] delimiter;
 
       private CreateTextSourceFn(
-          Compression compression, EmptyMatchTreatment emptyMatchTreatment) {
+          Compression compression, EmptyMatchTreatment emptyMatchTreatment, 
byte[] delimiter) {
         this.compression = compression;
         this.emptyMatchTreatment = emptyMatchTreatment;
+        this.delimiter = delimiter;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
         return CompressedSource.from(
-                new TextSource(StaticValueProvider.of(input), 
emptyMatchTreatment))
+                new TextSource(StaticValueProvider.of(input), 
emptyMatchTreatment, delimiter))
             .withCompression(compression);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b6cde06/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 29188dc..f3e4f77 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -48,16 +48,17 @@ import org.apache.beam.sdk.options.ValueProvider;
  */
 @VisibleForTesting
 class TextSource extends FileBasedSource<String> {
-  TextSource(ValueProvider<String> fileSpec) {
-    this(fileSpec, EmptyMatchTreatment.DISALLOW);
-  }
+  byte[] delimiter;
 
-  TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment 
emptyMatchTreatment) {
+  TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment 
emptyMatchTreatment,
+      byte[] delimiter) {
     super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
   }
 
-  private TextSource(MatchResult.Metadata metadata, long start, long end) {
+  private TextSource(MatchResult.Metadata metadata, long start, long end, 
byte[] delimiter) {
     super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
   }
 
   @Override
@@ -65,12 +66,13 @@ class TextSource extends FileBasedSource<String> {
       MatchResult.Metadata metadata,
       long start,
       long end) {
-    return new TextSource(metadata, start, end);
+    return new TextSource(metadata, start, end, delimiter);
+
   }
 
   @Override
   protected FileBasedReader<String> createSingleFileReader(PipelineOptions 
options) {
-    return new TextBasedReader(this);
+    return new TextBasedReader(this, delimiter);
   }
 
   @Override
@@ -80,7 +82,7 @@ class TextSource extends FileBasedSource<String> {
 
   /**
    * A {@link FileBasedReader FileBasedReader}
-   * which can decode records delimited by newline characters.
+   * which can decode records delimited by delimiter characters.
    *
    * <p>See {@link TextSource} for further details.
    */
@@ -89,18 +91,20 @@ class TextSource extends FileBasedSource<String> {
     private static final int READ_BUFFER_SIZE = 8192;
     private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
     private ByteString buffer;
-    private int startOfSeparatorInBuffer;
-    private int endOfSeparatorInBuffer;
+    private int startOfDelimiterInBuffer;
+    private int endOfDelimiterInBuffer;
     private long startOfRecord;
     private volatile long startOfNextRecord;
     private volatile boolean eof;
     private volatile boolean elementIsPresent;
     private String currentValue;
     private ReadableByteChannel inChannel;
+    private byte[] delimiter;
 
-    private TextBasedReader(TextSource source) {
+    private TextBasedReader(TextSource source, byte[] delimiter) {
       super(source);
       buffer = ByteString.EMPTY;
+      this.delimiter = delimiter;
     }
 
     @Override
@@ -131,18 +135,24 @@ class TextSource extends FileBasedSource<String> {
     protected void startReading(ReadableByteChannel channel) throws 
IOException {
       this.inChannel = channel;
       // If the first offset is greater than zero, we need to skip bytes until 
we see our
-      // first separator.
-      if (getCurrentSource().getStartOffset() > 0) {
+      // first delimiter.
+      long startOffset = getCurrentSource().getStartOffset();
+      if (startOffset > 0) {
         checkState(channel instanceof SeekableByteChannel,
             "%s only supports reading from a SeekableByteChannel when given a 
start offset"
             + " greater than 0.", TextSource.class.getSimpleName());
-        long requiredPosition = getCurrentSource().getStartOffset() - 1;
+        long requiredPosition = startOffset - 1;
+        if (delimiter != null && startOffset >= delimiter.length) {
+          // we need to move back the offset of at worse delimiter.size to be 
sure to see
+          // all the bytes of the delimiter in the call to 
findDelimiterBounds() below
+          requiredPosition = startOffset - delimiter.length;
+        }
         ((SeekableByteChannel) channel).position(requiredPosition);
-        findSeparatorBounds();
-        buffer = buffer.substring(endOfSeparatorInBuffer);
-        startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
-        endOfSeparatorInBuffer = 0;
-        startOfSeparatorInBuffer = 0;
+        findDelimiterBounds();
+        buffer = buffer.substring(endOfDelimiterInBuffer);
+        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
+        endOfDelimiterInBuffer = 0;
+        startOfDelimiterInBuffer = 0;
       }
     }
 
@@ -156,37 +166,60 @@ class TextSource extends FileBasedSource<String> {
      * | element bytes | delimiter bytes | unconsumed bytes |
      * ------------------------------------------------------
      * 0            start of          end of              buffer
-     *              separator         separator           size
+     *              delimiter         delimiter           size
      *              in buffer         in buffer
      * }</pre>
      */
-    private void findSeparatorBounds() throws IOException {
+    private void findDelimiterBounds() throws IOException {
       int bytePositionInBuffer = 0;
       while (true) {
         if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
-          startOfSeparatorInBuffer = endOfSeparatorInBuffer = 
bytePositionInBuffer;
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = 
bytePositionInBuffer;
           break;
         }
 
         byte currentByte = buffer.byteAt(bytePositionInBuffer);
 
-        if (currentByte == '\n') {
-          startOfSeparatorInBuffer = bytePositionInBuffer;
-          endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-          break;
-        } else if (currentByte == '\r') {
-          startOfSeparatorInBuffer = bytePositionInBuffer;
-          endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-
-          if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
-            currentByte = buffer.byteAt(bytePositionInBuffer + 1);
-            if (currentByte == '\n') {
-              endOfSeparatorInBuffer += 1;
+        if (delimiter == null) {
+          // default delimiter
+          if (currentByte == '\n') {
+            startOfDelimiterInBuffer = bytePositionInBuffer;
+            endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
+            break;
+          } else if (currentByte == '\r') {
+            startOfDelimiterInBuffer = bytePositionInBuffer;
+            endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
+
+            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
+              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
+              if (currentByte == '\n') {
+                endOfDelimiterInBuffer += 1;
+              }
             }
+            break;
+          }
+        } else {
+          // user defined delimiter
+          int i = 0;
+          // initialize delimiter not found
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = 
bytePositionInBuffer;
+          while ((i <= delimiter.length - 1) && (currentByte == delimiter[i])) 
{
+            // read next byte
+            i++;
+            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 
1)) {
+              currentByte = buffer.byteAt(bytePositionInBuffer + i);
+            } else {
+              // corner case: delimiter truncated at the end of the file
+              startOfDelimiterInBuffer = endOfDelimiterInBuffer = 
bytePositionInBuffer;
+              break;
+            }
+          }
+          if (i == delimiter.length) {
+            // all bytes of delimiter found
+            endOfDelimiterInBuffer = bytePositionInBuffer + i;
+            break;
           }
-          break;
         }
-
         // Move to the next byte in buffer.
         bytePositionInBuffer += 1;
       }
@@ -195,7 +228,7 @@ class TextSource extends FileBasedSource<String> {
     @Override
     protected boolean readNextRecord() throws IOException {
       startOfRecord = startOfNextRecord;
-      findSeparatorBounds();
+      findDelimiterBounds();
 
       // If we have reached EOF file and consumed all of the buffer then we 
know
       // that there are no more records.
@@ -205,21 +238,21 @@ class TextSource extends FileBasedSource<String> {
       }
 
       decodeCurrentElement();
-      startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
+      startOfNextRecord = startOfRecord + endOfDelimiterInBuffer;
       return true;
     }
 
     /**
      * Decodes the current element updating the buffer to only contain the 
unconsumed bytes.
      *
-     * <p>This invalidates the currently stored {@code 
startOfSeparatorInBuffer} and
-     * {@code endOfSeparatorInBuffer}.
+     * <p>This invalidates the currently stored {@code 
startOfDelimiterInBuffer} and
+     * {@code endOfDelimiterInBuffer}.
      */
     private void decodeCurrentElement() throws IOException {
-      ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
+      ByteString dataToDecode = buffer.substring(0, startOfDelimiterInBuffer);
       currentValue = dataToDecode.toStringUtf8();
       elementIsPresent = true;
-      buffer = buffer.substring(endOfSeparatorInBuffer);
+      buffer = buffer.substring(endOfDelimiterInBuffer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/1b6cde06/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 65253f9..e55a820 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.io.Compression.AUTO;
@@ -40,10 +41,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -212,6 +216,60 @@ public class TextIOReadTest {
   }
 
   @Test
+  public void testDelimiterSelfOverlaps(){
+    assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'}));
+    assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 
'a', 'b'}));
+    assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 
'b', 'd'}));
+    assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'}));
+    assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 
'b'}));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadStringsWithCustomDelimiter() throws Exception {
+    final String[] inputStrings = new String[] {
+        // incomplete delimiter
+        "To be, or not to be: that |is the question: ",
+        // incomplete delimiter
+        "To be, or not to be: that *is the question: ",
+        // complete delimiter
+        "Whether 'tis nobler in the mind to suffer |*",
+        // truncated delimiter
+        "The slings and arrows of outrageous fortune,|" };
+
+    File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
+    String filename = tmpFile.getPath();
+
+    try (FileWriter writer = new FileWriter(tmpFile)) {
+      writer.write(Joiner.on("").join(inputStrings));
+    }
+
+    PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] 
{'|', '*'})))
+        .containsInAnyOrder(
+            "To be, or not to be: that |is the question: To be, or not to be: "
+                + "that *is the question: Whether 'tis nobler in the mind to 
suffer ",
+            "The slings and arrows of outrageous fortune,|");
+    p.run();
+  }
+
+  @Test
+  public void testSplittingSourceWithCustomDelimiter() throws Exception {
+    List<String> testCases = Lists.newArrayList();
+    String infix = "first|*second|*|*third";
+    String[] affixes = new String[] {"", "|", "*", "|*"};
+    for (String prefix : affixes) {
+      for (String suffix : affixes) {
+        testCases.add(prefix + infix + suffix);
+      }
+    }
+    for (String testCase : testCases) {
+      SourceTestUtils.assertSplitAtFractionExhaustive(
+          prepareSource(testCase.getBytes(StandardCharsets.UTF_8), new byte[] 
{'|', '*'}),
+          PipelineOptionsFactory.create());
+    }
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testReadStrings() throws Exception {
     runTestRead(LINES_ARRAY);
@@ -555,7 +613,7 @@ public class TextIOReadTest {
   @Test
   public void testProgressEmptyFile() throws IOException {
     try (BoundedReader<String> reader =
-        prepareSource(new 
byte[0]).createReader(PipelineOptionsFactory.create())) {
+        prepareSource(new byte[0], 
null).createReader(PipelineOptionsFactory.create())) {
       // Check preconditions before starting.
       assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
       assertEquals(0, reader.getSplitPointsConsumed());
@@ -575,7 +633,7 @@ public class TextIOReadTest {
   public void testProgressTextFile() throws IOException {
     String file = "line1\nline2\nline3";
     try (BoundedReader<String> reader =
-        
prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) {
+        prepareSource(file.getBytes(), 
null).createReader(PipelineOptionsFactory.create())) {
       // Check preconditions before starting
       assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
       assertEquals(0, reader.getSplitPointsConsumed());
@@ -733,65 +791,69 @@ public class TextIOReadTest {
 
   @Test
   public void testSplittingSourceWithEmptyLines() throws Exception {
-    TextSource source = 
prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("\n\n\n".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithLineFeedDelimiter() throws Exception {
-    TextSource source = 
prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithCarriageReturnDelimiter() throws 
Exception {
-    TextSource source = 
prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() 
throws Exception {
-    TextSource source = 
prepareSource("asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("asdf\r\nhjkl\r\nxyz\r\n".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithMixedDelimiters() throws Exception {
-    TextSource source = 
prepareSource("asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\rhjkl\r\nxyz\n".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() 
throws Exception {
-    TextSource source = 
prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void 
testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd()
       throws Exception {
-    TextSource source = 
prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void 
testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd()
       throws Exception {
-    TextSource source = 
prepareSource("asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\r\nhjkl\r\nxyz".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() 
throws Exception {
-    TextSource source = 
prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   private TextSource prepareSource(byte[] data) throws IOException {
+    return prepareSource(data, null /* default delimiters */);
+  }
+
+  private TextSource prepareSource(byte[] data, byte[] delimiter) throws 
IOException {
     Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
     Files.write(path, data);
-    return new TextSource(
-        ValueProvider.StaticValueProvider.of(path.toString()), 
EmptyMatchTreatment.DISALLOW);
+    return new 
TextSource(ValueProvider.StaticValueProvider.of(path.toString()),
+        EmptyMatchTreatment.DISALLOW, delimiter);
   }
 
   @Test

Reply via email to