damccorm opened a new issue, #20332:
URL: https://github.com/apache/beam/issues/20332
FileIO writeDynamic with AvroIO.sink is not writing all data in the
following pipeline. The amount of data written varies between runs but it is
consistently dropping records. This is with a very small test dataset - 6
records, which should produce 3 directories.
```
Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records =
p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new
StringToDatasetIDAvroRecordFcn()));
//write out into AVRO in each separate directory
records.apply("Write
avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), Contextful.fn(x ->
AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(key -> defaultNaming(key
+ "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION)));
p.run().waitUntilFinish();
```
If i replace AvroIO.sink() with TextIO.sink() (and replace the initial
mapping function) then the correct number of records are written to the
separate directories. This is working consistently.
e.g.
```
// Initialise pipeline
Pipeline p = Pipeline.create(options);
PCollection<KV<String, String>> records
= p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new
StringToDatasetIDKVFcn()));
//write
out into AVRO in each separate directory
records.apply("Write CSV file per dataset", FileIO.<String,
KV<String, String>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(datasetID
-> defaultNaming(key + "/export", ".csv"));
p.run().waitUntilFinish();
```
cc [~timrobertson100]
Imported from Jira
[BEAM-10100](https://issues.apache.org/jira/browse/BEAM-10100). Original Jira
may contain additional context.
Reported by: djtfmartin.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]