baeminbo opened a new issue, #32251: URL: https://github.com/apache/beam/issues/32251
### What happened? The pipeline [1] splits a text file with the delimiter "ABC". For the input text "ABABCD", the expected result is ["AB", "D"]. But, the actual pipeline result is ["ABABCD", "D"]. See [2] for the result with DirectRunner. I guess the delimiter match at [TextSource](https://github.com/apache/beam/blob/v2.58.1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java#L448-L458) has the root cause. It processes the input text "ABABCD" as follows, so fails to match the delimiter "ABC" in the input text. ``` "A": "A" == delimiter[0] (= "A"), set delPosn to 1 "B": "B" == delimiter[1] (= "B"), set delPosn to 2 "A": "A" != delimiter[2] (= "C"), set delPosn to 0 <-- This is wrong. delPosn should be 1 as "A" matches delimiter[0] "B": "B" != delimiter[0] (= "A"), set delPosn to 0 "C": "C" != delimiter[0] (= "A"), set delPosn to 0 "D": "D" != delimiter[0] (= "A"), set delPosn to 0 ``` I think this is something like a regex match problem (e.g. delimiter "ABCABCABD" and input text "...ABCABCABC...". It may need to have multiple `delPosn`s for partial matches). [1] ``` import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TextReadJob { private static final Logger LOG = LoggerFactory.getLogger(TextReadJob.class); private static final String INPUT_PATH = "short.csv"; // content: "ABABCD" private static final byte[] DELIMITER = "ABC".getBytes(StandardCharsets.UTF_8); public static void main(String[] args) throws IOException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER)).apply(ParDo.of(new DoFn<String, Void>() { @ProcessElement public void processElement(@Element String input) { LOG.info("input: <{}>", input); } })); pipeline.run(); } } ``` [2] ``` ug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes INFO: Filepattern short.csv matched 1 files with total size 6 Aug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource split INFO: Splitting filepattern short.csv into bundles of size 0 took 1 ms and produced 1 files and 6 bundles Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement INFO: input: <ABABCD> Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement INFO: input: <D> ``` ### Issue Priority Priority: 1 (data loss / total loss of function) ### Issue Components - [ ] Component: Python SDK - [X] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
