damccorm opened a new issue, #20332:
URL: https://github.com/apache/beam/issues/20332

   FileIO writeDynamic with AvroIO.sink is not writing all data in the 
following pipeline. The amount of data written varies between runs but it is 
consistently dropping records. This is with a very small test dataset - 6 
records, which should produce 3 directories.
   
   ```
   
   Pipeline p = Pipeline.create(options);
   PCollection<KV<String, AvroRecord>> records = 
p.apply(TextIO.read().from("/tmp/input.csv"))
   .apply(ParDo.of(new
   StringToDatasetIDAvroRecordFcn()));
   
   //write out into AVRO in each separate directory
   records.apply("Write
   avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic()
     .by(KV::getKey)
    
   .via(Contextful.fn(KV::getValue), Contextful.fn(x -> 
AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
   
    .to(options.getTargetPath())
     .withDestinationCoder(StringUtf8Coder.of())
     .withNaming(key -> defaultNaming(key
   + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION)));
   
   p.run().waitUntilFinish();
   
   ```
   
   
   
   If i replace AvroIO.sink() with TextIO.sink() (and replace the initial 
mapping function) then the correct number of records are written to the 
separate directories. This is working consistently.
   
   e.g.
   
   ```
   
   // Initialise pipeline
   Pipeline p = Pipeline.create(options);
   
   PCollection<KV<String, String>> records
   = p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new 
StringToDatasetIDKVFcn()));
   
   //write
   out into AVRO in each separate directory
   records.apply("Write CSV file per dataset", FileIO.<String,
   KV<String, String>>writeDynamic()
       .by(KV::getKey)
       .via(Contextful.fn(KV::getValue), TextIO.sink())
   
      .to(options.getTargetPath())
       .withDestinationCoder(StringUtf8Coder.of())
       .withNaming(datasetID
   -> defaultNaming(key + "/export", ".csv"));
   
    p.run().waitUntilFinish();
   
   ```
   
   
   cc [~timrobertson100]
   
   
   
   Imported from Jira 
[BEAM-10100](https://issues.apache.org/jira/browse/BEAM-10100). Original Jira 
may contain additional context.
   Reported by: djtfmartin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to