This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b5dc728b677 Fix an edge case of getting duplicated records when using 
TextIO. (#30026)
b5dc728b677 is described below

commit b5dc728b677101cf3968e9f94db0898342343f6e
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Jan 17 13:39:35 2024 -0500

    Fix an edge case of getting duplicated records when using TextIO. (#30026)
    
    When processing a CRLF-delimited file and the read buffer has
    CR as the last character, startOfNextRecord will be set to the
    position after the CR, i.e. the following LF. Let's say the
    position of this LF is p.
    
    In the next buffer, even though the actual start of record should be
    p+1, startOfRecord is set to startOfNextRecord, which is p.
    
    Then the code processes the next record by skipping the LF and yields
    a record starting from p+1. It decides whether the record is valid by
    checking if startOfRecord is in the range defined in RangeTracker.
    
    If there is a split right after p, i.e. we have ranges [a, p+1) and [p+1, 
b),
    then the above record would be considered as valid in the split [a, p+1),
    because its startOfRecord is p <= p+1. However, the record is also
    considered valid when split [p+1, b) is processed, resulting into
    duplicated records in the output.
---
 .../java/org/apache/beam/sdk/io/TextSource.java    |   5 +-
 .../org/apache/beam/sdk/io/TextIOReadTest.java     | 113 ++++++++++++++-------
 2 files changed, 82 insertions(+), 36 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 3d62c677950..8367b38751c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -323,10 +323,13 @@ public class TextSource extends FileBasedSource<String> {
 
         // Consume any LF after CR if it is the first character of the next 
buffer
         if (skipLineFeedAtStart && buffer[bufferPosn] == LF) {
-          ++bytesConsumed;
           ++startPosn;
           ++bufferPosn;
           skipLineFeedAtStart = false;
+
+          // Right now, startOfRecord is pointing at the position of LF, but 
the actual start
+          // position of the new record should be the position after LF.
+          ++startOfRecord;
         }
 
         // Search for the newline
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 84c05ee6c90..253308d1b93 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -386,53 +386,96 @@ public class TextIOReadTest {
       runTestReadWithData(line.getBytes(UTF_8), expected);
     }
 
+    // Placeholder channel that only yields 0- and 1-length buffers.
+    private static class SlowReadChannel implements ReadableByteChannel {
+      int readCount = 0;
+      InputStream stream;
+      ReadableByteChannel channel;
+
+      public SlowReadChannel(FileBasedSource source) throws IOException {
+        channel =
+            FileSystems.open(
+                
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
+        stream = Channels.newInputStream(channel);
+      }
+
+      // Data is read at most one byte at a time from line parameter.
+      @Override
+      public int read(ByteBuffer dst) throws IOException {
+        if (++readCount % 3 == 0) {
+          if (dst.hasRemaining()) {
+            int value = stream.read();
+            if (value == -1) {
+              return -1;
+            }
+            dst.put((byte) value);
+            return 1;
+          }
+        }
+        return 0;
+      }
+
+      @Override
+      public boolean isOpen() {
+        return channel.isOpen();
+      }
+
+      @Override
+      public void close() throws IOException {
+        stream.close();
+      }
+    }
+
     @Test
-    public void 
testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel()
-        throws Exception {
+    public void testReadLinesWithDefaultDelimiterAndSlowReadChannel() throws 
Exception {
       Path path = tempFolder.newFile().toPath();
       Files.write(path, line.getBytes(UTF_8));
       Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
       FileBasedSource source =
           getTextSource(path.toString(), null, 0)
               .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
+
       FileBasedReader<String> reader =
           source.createSingleFileReader(PipelineOptionsFactory.create());
-      ReadableByteChannel channel =
-          FileSystems.open(
-              
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
-      InputStream stream = Channels.newInputStream(channel);
-      reader.startReading(
-          // Placeholder channel that only yields 0- and 1-length buffers.
-          // Data is read at most one byte at a time from line parameter.
-          new ReadableByteChannel() {
-            int readCount = 0;
 
-            @Override
-            public int read(ByteBuffer dst) throws IOException {
-              if (++readCount % 3 == 0) {
-                if (dst.hasRemaining()) {
-                  int value = stream.read();
-                  if (value == -1) {
-                    return -1;
-                  }
-                  dst.put((byte) value);
-                  return 1;
-                }
-              }
-              return 0;
-            }
+      reader.startReading(new SlowReadChannel(source));
+      assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
+    }
 
-            @Override
-            public boolean isOpen() {
-              return channel.isOpen();
-            }
+    @Test
+    public void 
testReadLinesWithDefaultDelimiterOnSplittingSourceAndSlowReadChannel()
+        throws Exception {
+      Path path = tempFolder.newFile().toPath();
+      Files.write(path, line.getBytes(UTF_8));
+      Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
+      FileBasedSource<String> source =
+          getTextSource(path.toString(), null, 0)
+              .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
 
-            @Override
-            public void close() throws IOException {
-              stream.close();
-            }
-          });
-      assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // Check every possible split positions.
+      for (int i = 0; i < line.length(); ++i) {
+        double fraction = i * 1.0 / line.length();
+        FileBasedReader<String> reader = 
source.createSingleFileReader(options);
+
+        // Use a slow read channel to read the content byte by byte. This can 
simulate the scenario
+        // of a certain character (in our case CR) occurring at the end of the 
read buffer.
+        reader.startReading(new SlowReadChannel(source));
+
+        // In order to get a successful split, we need to read at least one 
record before calling
+        // splitAtFraction().
+        List<String> totalItems = 
SourceTestUtils.readNItemsFromStartedReader(reader, 1);
+        BoundedSource<String> residual = reader.splitAtFraction(fraction);
+        List<String> primaryItems = 
SourceTestUtils.readFromStartedReader(reader);
+        totalItems.addAll(primaryItems);
+
+        if (residual != null) {
+          List<String> residualItems = 
SourceTestUtils.readFromSource(residual, options);
+          totalItems.addAll(residualItems);
+        }
+        assertEquals(expected, totalItems);
+      }
     }
 
     @Test

Reply via email to