baeminbo opened a new issue, #32251:
URL: https://github.com/apache/beam/issues/32251

   ### What happened?
   
   The pipeline [1] splits a text file with the delimiter "ABC". For the input 
text "ABABCD", the expected result is ["AB", "D"]. But, the actual pipeline 
result is ["ABABCD", "D"]. See [2] for the result with DirectRunner.
   
   I guess the delimiter match at 
[TextSource](https://github.com/apache/beam/blob/v2.58.1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java#L448-L458)
 has the root cause.  It processes the input text "ABABCD" as follows, so fails 
to match the delimiter "ABC" in the input text.
   ```
   "A": "A" == delimiter[0] (= "A"), set delPosn to 1
   "B": "B" == delimiter[1] (= "B"), set delPosn to 2
   "A": "A" != delimiter[2] (= "C"), set delPosn to 0 <-- This is wrong. 
delPosn should be 1 as "A" matches delimiter[0] 
   "B": "B" != delimiter[0] (= "A"), set delPosn to 0
   "C": "C" != delimiter[0] (= "A"), set delPosn to 0
   "D": "D" != delimiter[0] (= "A"), set delPosn to 0
   ```
   
   I think this is something like a regex match problem (e.g. delimiter 
"ABCABCABD" and input text "...ABCABCABC...". It may need to have multiple 
`delPosn`s for partial matches). 
   
   [1]
   ```
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.io.TextIO;
   import org.apache.beam.sdk.options.PipelineOptions;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.transforms.DoFn;
   import org.apache.beam.sdk.transforms.ParDo;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   public class TextReadJob {
     private static final Logger LOG = 
LoggerFactory.getLogger(TextReadJob.class);
   
     private static final String INPUT_PATH = "short.csv"; // content: "ABABCD"
   
     private static final byte[] DELIMITER = 
"ABC".getBytes(StandardCharsets.UTF_8);
   
     public static void main(String[] args) throws IOException {
       PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
   
       Pipeline pipeline = Pipeline.create(options);
       
       
pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER)).apply(ParDo.of(new
 DoFn<String, Void>() {
         @ProcessElement
         public void processElement(@Element String input) {
           LOG.info("input: <{}>", input);
         }
       }));
   
       pipeline.run();
     }
   }
   ```
   
   [2]
   ```
   ug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource 
getEstimatedSizeBytes
   INFO: Filepattern short.csv matched 1 files with total size 6
   Aug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource split
   INFO: Splitting filepattern short.csv into bundles of size 0 took 1 ms and 
produced 1 files and 6 bundles
   Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement
   INFO: input: <ABABCD>
   Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement
   INFO: input: <D>
   ```
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to