[ 
https://issues.apache.org/jira/browse/BEAM-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537842#comment-17537842
 ] 

Stephen Patel commented on BEAM-10100:
--------------------------------------

I think I may have just encountered this as well.
I'm using emr-5.30.1, beam-2.29.0, flink 1.10, s3, textio:

{noformat}
//This is bounded, and has had no explicit window/trigger defined (I believe 
that would therefore be GlobalWindow/DefaultTrigger).
//There are approximately 7000 values, across 90 ish keys
PCollection<KV<String, String>> foldersToValues = ...;
foldersToValues.apply(FileIO.<String, KV<String, String>>writeDynamic()
                      .by(kv -> kv.getKey())
                      .via(Contextful.fn(kv -> kv.getValue()), TextIO.sink())
                      .withNaming(
                          folder -> FileIO.Write.defaultNaming(
                                  folder, ".json"))
                      .withCompression(Compression.GZIP)
                      .withDestinationCoder(StringUtf8Coder.of())
                      .to(destination));
{noformat}

This claims its writing 391 files, however when checking the destination after 
a "successful" job, only 120 are present.

Using {noformat}.withIgnoreWindowing(){noformat} seemed to work however (391 
files were present in the destination).

> FileIO writeDynamic with AvroIO.sink not writing all data
> ---------------------------------------------------------
>
>                 Key: BEAM-10100
>                 URL: https://issues.apache.org/jira/browse/BEAM-10100
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-avro, io-java-files, runner-flink, runner-spark
>    Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0, 2.21.0, 2.22.0
>         Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
>            Reporter: Dave Martin
>            Priority: P1
>              Labels: p1
>
> FileIO writeDynamic with AvroIO.sink is not writing all data in the following 
> pipeline. The amount of data written varies between runs but it is 
> consistently dropping records. This is with a very small test dataset - 6 
> records, which should produce 3 directories.
> {code:java}
> Pipeline p = Pipeline.create(options);
> PCollection<KV<String, AvroRecord>> records = 
> p.apply(TextIO.read().from("/tmp/input.csv"))
> .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
> //write out into AVRO in each separate directory
> records.apply("Write avro file per dataset", FileIO.<String, KV<String, 
> AvroRecord>>writeDynamic()
>   .by(KV::getKey)
>   .via(Contextful.fn(KV::getValue), Contextful.fn(x -> 
> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
>   .to(options.getTargetPath())
>   .withDestinationCoder(StringUtf8Coder.of())
>   .withNaming(key -> defaultNaming(key + "/export", 
> PipelinesVariables.Pipeline.AVRO_EXTENSION)));
> p.run().waitUntilFinish();
> {code}
> If i replace AvroIO.sink() with TextIO.sink() (and replace the initial 
> mapping function) then the correct number of records are written to the 
> separate directories. This is working consistently.
> e.g.
> {code:java}
> // Initialise pipeline
> Pipeline p = Pipeline.create(options);
> PCollection<KV<String, String>> records = 
> p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new 
> StringToDatasetIDKVFcn()));
> //write out into AVRO in each separate directory
> records.apply("Write CSV file per dataset", FileIO.<String, KV<String, 
> String>>writeDynamic()
>     .by(KV::getKey)
>     .via(Contextful.fn(KV::getValue), TextIO.sink())
>     .to(options.getTargetPath())
>     .withDestinationCoder(StringUtf8Coder.of())
>     .withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
>  p.run().waitUntilFinish();
> {code}
> cc [~timrobertson100]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to