baeminbo opened a new issue, #32249: URL: https://github.com/apache/beam/issues/32249
### What happened? The pipeline reading a text file with a non-default delimit [1] fails by `IndexOutOfBoundsException` at `TextBasedReader.readCustomLine` [2]. The delimiter is "ABCDE" (5 bytes). The input file is [sample.csv](https://github.com/user-attachments/files/16669008/sample.csv). It is 16400 bytes and has 'A' at index 8190, 'B' at index 8191 (index is 0-based), and 'C' at index 8192. So, the pipeline doesn't split the file content and the whole content should be a single element. I have a theory about the root cause as below. The code [TextBasedReader.readCustomLine](https://github.com/apache/beam/blob/v2.58.1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java#L466) writes `buffer` (8192 bytes) into a `ByteArrayOutputStream`, but the range is [0, 8194) when the exception is thrown. This is because the `appendLength` is 8194, where `readLength` is 8192 (= the length of `buffer`), `delPosn` is 0, `prevDelPosn` is 2. For the first buffer read of [0, 8192), the `delPosn` is 2. For the second buffer read of [8192, 16384), the `delPosn` is reset to 0 while `prevDelPosn` is 2 (= [`delPosn` in prev buffer read](https://github.com/apache/beam/blob/v2.58.1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java#L423)). I guess `prevDelPosn` should be reset to 0 [when delimiter match fails](https://github.com/apache/beam/blob/v2.58.1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java#L456). [1] ``` public class TextReadJob { private static final String INPUT_PATH = "sample.csv"; private static final byte[] DELIMITER = "ABCDE".getBytes(StandardCharsets.UTF_8); public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER)); pipeline.run(); } } ``` [2] ``` Aug 19, 2024 9:24:32 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes INFO: Filepattern sample.csv matched 1 files with total size 16400 Aug 19, 2024 9:24:32 PM org.apache.beam.sdk.io.FileBasedSource split INFO: Splitting filepattern sample.csv into bundles of size 1025 took 1 ms and produced 1 files and 16 bundles Exception in thread "main" java.lang.IndexOutOfBoundsException: Range [0, 0 + 8194) out of bounds for length 8192 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckFromIndexSize(Preconditions.java:82) at java.base/jdk.internal.util.Preconditions.checkFromIndexSize(Preconditions.java:343) at java.base/java.util.Objects.checkFromIndexSize(Objects.java:426) at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:155) at org.apache.beam.sdk.io.TextSource$TextBasedReader.readCustomLine(TextSource.java:466) at org.apache.beam.sdk.io.TextSource$TextBasedReader.readNextRecord(TextSource.java:268) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:507) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:502) at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252) at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:150) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [X] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
