This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b5dc728b677 Fix an edge case of getting duplicated records when using
TextIO. (#30026)
b5dc728b677 is described below
commit b5dc728b677101cf3968e9f94db0898342343f6e
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Jan 17 13:39:35 2024 -0500
Fix an edge case of getting duplicated records when using TextIO. (#30026)
When processing a CRLF-delimited file and the read buffer has
CR as the last character, startOfNextRecord will be set to the
position after the CR, i.e. the following LF. Let's say the
position of this LF is p.
In the next buffer, even though the actual start of record should be
p+1, startOfRecord is set to startOfNextRecord, which is p.
Then the code processes the next record by skipping the LF and yields
a record starting from p+1. It decides whether the record is valid by
checking if startOfRecord is in the range defined in RangeTracker.
If there is a split right after p, i.e. we have ranges [a, p+1) and [p+1,
b),
then the above record would be considered as valid in the split [a, p+1),
because its startOfRecord is p <= p+1. However, the record is also
considered valid when split [p+1, b) is processed, resulting into
duplicated records in the output.
---
.../java/org/apache/beam/sdk/io/TextSource.java | 5 +-
.../org/apache/beam/sdk/io/TextIOReadTest.java | 113 ++++++++++++++-------
2 files changed, 82 insertions(+), 36 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 3d62c677950..8367b38751c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -323,10 +323,13 @@ public class TextSource extends FileBasedSource<String> {
// Consume any LF after CR if it is the first character of the next
buffer
if (skipLineFeedAtStart && buffer[bufferPosn] == LF) {
- ++bytesConsumed;
++startPosn;
++bufferPosn;
skipLineFeedAtStart = false;
+
+ // Right now, startOfRecord is pointing at the position of LF, but
the actual start
+ // position of the new record should be the position after LF.
+ ++startOfRecord;
}
// Search for the newline
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 84c05ee6c90..253308d1b93 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -386,53 +386,96 @@ public class TextIOReadTest {
runTestReadWithData(line.getBytes(UTF_8), expected);
}
+ // Placeholder channel that only yields 0- and 1-length buffers.
+ private static class SlowReadChannel implements ReadableByteChannel {
+ int readCount = 0;
+ InputStream stream;
+ ReadableByteChannel channel;
+
+ public SlowReadChannel(FileBasedSource source) throws IOException {
+ channel =
+ FileSystems.open(
+
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
+ stream = Channels.newInputStream(channel);
+ }
+
+ // Data is read at most one byte at a time from line parameter.
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (++readCount % 3 == 0) {
+ if (dst.hasRemaining()) {
+ int value = stream.read();
+ if (value == -1) {
+ return -1;
+ }
+ dst.put((byte) value);
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+ }
+
@Test
- public void
testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel()
- throws Exception {
+ public void testReadLinesWithDefaultDelimiterAndSlowReadChannel() throws
Exception {
Path path = tempFolder.newFile().toPath();
Files.write(path, line.getBytes(UTF_8));
Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
FileBasedSource source =
getTextSource(path.toString(), null, 0)
.createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
+
FileBasedReader<String> reader =
source.createSingleFileReader(PipelineOptionsFactory.create());
- ReadableByteChannel channel =
- FileSystems.open(
-
FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
- InputStream stream = Channels.newInputStream(channel);
- reader.startReading(
- // Placeholder channel that only yields 0- and 1-length buffers.
- // Data is read at most one byte at a time from line parameter.
- new ReadableByteChannel() {
- int readCount = 0;
- @Override
- public int read(ByteBuffer dst) throws IOException {
- if (++readCount % 3 == 0) {
- if (dst.hasRemaining()) {
- int value = stream.read();
- if (value == -1) {
- return -1;
- }
- dst.put((byte) value);
- return 1;
- }
- }
- return 0;
- }
+ reader.startReading(new SlowReadChannel(source));
+ assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
+ }
- @Override
- public boolean isOpen() {
- return channel.isOpen();
- }
+ @Test
+ public void
testReadLinesWithDefaultDelimiterOnSplittingSourceAndSlowReadChannel()
+ throws Exception {
+ Path path = tempFolder.newFile().toPath();
+ Files.write(path, line.getBytes(UTF_8));
+ Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
+ FileBasedSource<String> source =
+ getTextSource(path.toString(), null, 0)
+ .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
- @Override
- public void close() throws IOException {
- stream.close();
- }
- });
- assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // Check every possible split positions.
+ for (int i = 0; i < line.length(); ++i) {
+ double fraction = i * 1.0 / line.length();
+ FileBasedReader<String> reader =
source.createSingleFileReader(options);
+
+ // Use a slow read channel to read the content byte by byte. This can
simulate the scenario
+ // of a certain character (in our case CR) occurring at the end of the
read buffer.
+ reader.startReading(new SlowReadChannel(source));
+
+ // In order to get a successful split, we need to read at least one
record before calling
+ // splitAtFraction().
+ List<String> totalItems =
SourceTestUtils.readNItemsFromStartedReader(reader, 1);
+ BoundedSource<String> residual = reader.splitAtFraction(fraction);
+ List<String> primaryItems =
SourceTestUtils.readFromStartedReader(reader);
+ totalItems.addAll(primaryItems);
+
+ if (residual != null) {
+ List<String> residualItems =
SourceTestUtils.readFromSource(residual, options);
+ totalItems.addAll(residualItems);
+ }
+ assertEquals(expected, totalItems);
+ }
}
@Test