damccorm opened a new issue, #20868:
URL: https://github.com/apache/beam/issues/20868

   Filing a bug as suggested on 
[StackOverflow](https://stackoverflow.com/questions/67163902/writedynamic-in-apache-beam-on-emr-spark-is-not-finishing-writes-to-s3-leaves-o?noredirect=1#comment118755306_67163902).
   
   I have a bounded PCollection and would like to persist the output to S3 
bucket with dynamic file naming scheme. Unfortunately, when running on EMR & 
Spark Runner (tried emr-6.2.0/Spark 3.0.1/Beam 2.28.0 and emr-5.30.1/Spark 
2.4.5/Beam 2.14.0), after all steps in the pipeline finish and cluster 
terminates, output on S3 contains only some of expected contents in main output 
directory, most of it though is placed in .temp-beam folder and never moved to 
the main output dir. Most of it = 90% of expected lines are not persisted in 
correctly named files and spot checks indicate the expected lines are in files 
inside .temp-beam folder. 
   Here's a relevant pipeline declaration part:
   ```
   
   PCollection<SomeObject> input; // is a bounded PCollection 
    
   FileIO.Write<String, SomeObject> write
   = FileIO.<String, SomeObject>writeDynamic()
    .by(SomeObject::key)
    .withDestinationCoder(StringUtf8Coder.of())
   
   .withCompression(Compression.GZIP)
    .withNaming((SerializableFunction<String, FileIO.Write.FileNaming>)
   key
    -> (FileIO.Write.FileNaming) (window, pane, numShards, shardIndex, 
compression)
    -> String.format("some_object_%s_%d.csv.gz",
   key, shardIndex))
    .via(Contextful.fn(SomeObject::toCsvLine), Contextful.fn(x -> 
TextIO.sink().withHeader(SomeObject.HEADER)))
   
   .to("s3://some-bucket/some-output-path");
   input.apply("write-a-pcollection", write);
   
   ```
   
   With this code I get an S3 bucket that looks like:
   * some_object_key1_0.csv.gz
   * some_object_key1_1.csv.gz
   * some_object_key2_0.csv.gz
   * some_object_key3_0.csv.gz
   * .temp-beam-\<uuid\> with 90% of the expected content persisted inside 
objects named with random uuids
   
   However, when I add `.withIgnoreWindowing()` to the writeDynamic 
configuration, output seems to be fully correct and no .temp-beam directory is 
left over. This method is deprecated though and no replacement has been ever 
provided in the javadocs (at least I couldn't find any). 
   
   Input PCollection does not contain any windowing and it does not solve the 
issue, when I force global windowing right before the writeDynamic transform.
   
   This might be a bug in Beam or a consideration to leave out 
`.withIgnoreWindowing()` in the future versions. Kindly please investigate the 
issue.
   
    
   
   Imported from Jira 
[BEAM-12202](https://issues.apache.org/jira/browse/BEAM-12202). Original Jira 
may contain additional context.
   Reported by: pawelw.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to