[
https://issues.apache.org/jira/browse/BEAM-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dave Martin updated BEAM-10100:
-------------------------------
Description:
FileIO writeDynamic with AvroIO.sink is not writing all data in the following
pipeline. The amount of data written varies between runs but it is consistently
dropping. This is with a very small test dataset - 5 records, which should
produce 3 directories.
{code:java}
Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records =
p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
//write out into AVRO in each separate directory
records.apply("Write avro file per dataset", FileIO.<String, KV<String,
AvroRecord>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), Contextful.fn(x ->
AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(key -> defaultNaming(key + "/export",
PipelinesVariables.Pipeline.AVRO_EXTENSION)));
p.run().waitUntilFinish();
{code}
If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping
function) then the correct number of records are written to the separate
directories. This is working consistently.
e.g.
{code:java}
// Initialise pipeline
Pipeline p = Pipeline.create(options);
PCollection<KV<String, String>> records =
p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new
StringToDatasetIDKVFcn()));
//write out into AVRO in each separate directory
records.apply("Write CSV file per dataset", FileIO.<String, KV<String,
String>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
p.run().waitUntilFinish();
{code}
cc [~timrobertson100]
was:
FileIO writeDynamic with AvroIO.sink is not writing all data in the following
pipeline. The amount of data written varies between runs but it is consistently
dropping. This is with a very small test dataset - 5 records, which should
produce 3 directories.
{code:java}
Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records =
p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
//write out into AVRO in each separate directory
records.apply("Write avro file per dataset", FileIO.<String, KV<String,
AvroRecord>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), Contextful.fn(x ->
AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(key -> defaultNaming(key + "/export",
PipelinesVariables.Pipeline.AVRO_EXTENSION)));
p.run().waitUntilFinish();
{code}
If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping
function) then the correct number of records are written to the separate
directories.
e.g.
{code:java}
// Initialise pipeline
Pipeline p = Pipeline.create(options);
PCollection<KV<String, String>> records =
p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new
StringToDatasetIDKVFcn()));
//write out into AVRO in each separate directory
records.apply("Write CSV file per dataset", FileIO.<String, KV<String,
String>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
p.run().waitUntilFinish();
{code}
cc [~timrobertson100]
> FileIO writeDynamic with AvroIO.sink not writing all data
> ---------------------------------------------------------
>
> Key: BEAM-10100
> URL: https://issues.apache.org/jira/browse/BEAM-10100
> Project: Beam
> Issue Type: Bug
> Components: beam-community
> Affects Versions: 2.17.0, 2.20.0
> Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
> Reporter: Dave Martin
> Assignee: Aizhamal Nurmamat kyzy
> Priority: P2
> Labels: AVRO, IO, Spark
>
> FileIO writeDynamic with AvroIO.sink is not writing all data in the following
> pipeline. The amount of data written varies between runs but it is
> consistently dropping. This is with a very small test dataset - 5 records,
> which should produce 3 directories.
> {code:java}
> Pipeline p = Pipeline.create(options);
> PCollection<KV<String, AvroRecord>> records =
> p.apply(TextIO.read().from("/tmp/input.csv"))
> .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
> //write out into AVRO in each separate directory
> records.apply("Write avro file per dataset", FileIO.<String, KV<String,
> AvroRecord>>writeDynamic()
> .by(KV::getKey)
> .via(Contextful.fn(KV::getValue), Contextful.fn(x ->
> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
> .to(options.getTargetPath())
> .withDestinationCoder(StringUtf8Coder.of())
> .withNaming(key -> defaultNaming(key + "/export",
> PipelinesVariables.Pipeline.AVRO_EXTENSION)));
> p.run().waitUntilFinish();
> {code}
> If i replace AvroIO.sink() with TextIO.sink() (and replace the initial
> mapping function) then the correct number of records are written to the
> separate directories. This is working consistently.
> e.g.
> {code:java}
> // Initialise pipeline
> Pipeline p = Pipeline.create(options);
> PCollection<KV<String, String>> records =
> p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new
> StringToDatasetIDKVFcn()));
> //write out into AVRO in each separate directory
> records.apply("Write CSV file per dataset", FileIO.<String, KV<String,
> String>>writeDynamic()
> .by(KV::getKey)
> .via(Contextful.fn(KV::getValue), TextIO.sink())
> .to(options.getTargetPath())
> .withDestinationCoder(StringUtf8Coder.of())
> .withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
> p.run().waitUntilFinish();
> {code}
> cc [~timrobertson100]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)