[ 
https://issues.apache.org/jira/browse/BEAM-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17539773#comment-17539773
 ] 

Yi Hu commented on BEAM-10100:
------------------------------

Maybe it is a documentation issue. What happens is that a file may contain one 
or more records, depending on the sharding. I've made a test below:

{code:java}
  public static void main(String[] argv) {
    PipelineOptions options = 
PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    PCollection<KV<String, String>> foldersToValues = 
p.apply(Create.of(Ints.asList((IntStream.range(0, 
1000).toArray())))).apply(MapElements.into(
        TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptors.strings())).via((Integer x) -> KV.of(Integer.toString(x % 100), 
"["+x+"]")));
    foldersToValues.apply(FileIO.<String, KV<String, String>>writeDynamic()
        .by(kv -> kv.getKey())
        .via(Contextful.fn(kv -> kv.getValue()), TextIO.sink())
        .withNaming(
            folder -> FileIO.Write.defaultNaming(
                folder, ".txt"))
        .withDestinationCoder(StringUtf8Coder.of())
        .to("/Users/yathu/dev/temp/"));

    p.run().waitUntilFinish();
  }
{code}

It should write 1000 records and has 100 file names. Due to sharding, file 
names has a midfix like '00000-of-00005'. The sharding varies in each run, but 
gathering the files with same name before the sharding midfix it can be found 
all records are written.

Ran this code using direct runner and flink runner, this is the file names I 
got: "0-00000-of-00001.txt", "1-00000-of-00001.txt" (contains 10 records per 
file), ...., "20-00000-of-00001.txt" (contains 1 record per file)

> FileIO writeDynamic with AvroIO.sink not writing all data
> ---------------------------------------------------------
>
>                 Key: BEAM-10100
>                 URL: https://issues.apache.org/jira/browse/BEAM-10100
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-avro, io-java-files, runner-flink, runner-spark
>    Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0, 2.21.0, 2.22.0
>         Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
>            Reporter: Dave Martin
>            Priority: P1
>              Labels: p1
>
> FileIO writeDynamic with AvroIO.sink is not writing all data in the following 
> pipeline. The amount of data written varies between runs but it is 
> consistently dropping records. This is with a very small test dataset - 6 
> records, which should produce 3 directories.
> {code:java}
> Pipeline p = Pipeline.create(options);
> PCollection<KV<String, AvroRecord>> records = 
> p.apply(TextIO.read().from("/tmp/input.csv"))
> .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
> //write out into AVRO in each separate directory
> records.apply("Write avro file per dataset", FileIO.<String, KV<String, 
> AvroRecord>>writeDynamic()
>   .by(KV::getKey)
>   .via(Contextful.fn(KV::getValue), Contextful.fn(x -> 
> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
>   .to(options.getTargetPath())
>   .withDestinationCoder(StringUtf8Coder.of())
>   .withNaming(key -> defaultNaming(key + "/export", 
> PipelinesVariables.Pipeline.AVRO_EXTENSION)));
> p.run().waitUntilFinish();
> {code}
> If i replace AvroIO.sink() with TextIO.sink() (and replace the initial 
> mapping function) then the correct number of records are written to the 
> separate directories. This is working consistently.
> e.g.
> {code:java}
> // Initialise pipeline
> Pipeline p = Pipeline.create(options);
> PCollection<KV<String, String>> records = 
> p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new 
> StringToDatasetIDKVFcn()));
> //write out into AVRO in each separate directory
> records.apply("Write CSV file per dataset", FileIO.<String, KV<String, 
> String>>writeDynamic()
>     .by(KV::getKey)
>     .via(Contextful.fn(KV::getValue), TextIO.sink())
>     .to(options.getTargetPath())
>     .withDestinationCoder(StringUtf8Coder.of())
>     .withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
>  p.run().waitUntilFinish();
> {code}
> cc [~timrobertson100]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to