Pawel Walczak created BEAM-12202:
------------------------------------

             Summary: writeDynamic on EMR/Spark is not finishing writes to S3, 
leaves out temp files
                 Key: BEAM-12202
                 URL: https://issues.apache.org/jira/browse/BEAM-12202
             Project: Beam
          Issue Type: Bug
          Components: io-java-files, runner-spark
    Affects Versions: 2.28.0, 2.14.0
            Reporter: Pawel Walczak


Filing a bug as suggested on 
[StackOverflow|https://stackoverflow.com/questions/67163902/writedynamic-in-apache-beam-on-emr-spark-is-not-finishing-writes-to-s3-leaves-o?noredirect=1#comment118755306_67163902].

I have a bounded PCollection and would like to persist the output to S3 bucket 
with dynamic file naming scheme. Unfortunately, when running on EMR & Spark 
Runner (tried emr-6.2.0/Spark 3.0.1/Beam 2.28.0 and emr-5.30.1/Spark 2.4.5/Beam 
2.14.0), after all steps in the pipeline finish and cluster terminates, output 
on S3 contains only some of expected contents in main output directory, most of 
it though is placed in .temp-beam folder and never moved to the main output 
dir. Most of it = 90% of expected lines are not persisted in correctly named 
files and spot checks indicate the expected lines are in files inside 
.temp-beam folder. 
Here's a relevant pipeline declaration part:
{noformat}
PCollection<SomeObject> input; // is a bounded PCollection 
 
FileIO.Write<String, SomeObject> write = FileIO.<String, 
SomeObject>writeDynamic()
 .by(SomeObject::key)
 .withDestinationCoder(StringUtf8Coder.of())
 .withCompression(Compression.GZIP)
 .withNaming((SerializableFunction<String, FileIO.Write.FileNaming>) key
 -> (FileIO.Write.FileNaming) (window, pane, numShards, shardIndex, compression)
 -> String.format("some_object_%s_%d.csv.gz", key, shardIndex))
 .via(Contextful.fn(SomeObject::toCsvLine), Contextful.fn(x -> 
TextIO.sink().withHeader(SomeObject.HEADER)))
 .to("s3://some-bucket/some-output-path");
input.apply("write-a-pcollection", write);
{noformat}
With this code I get an S3 bucket that looks like:
* some_object_key1_0.csv.gz
* some_object_key1_1.csv.gz
* some_object_key2_0.csv.gz
* some_object_key3_0.csv.gz
* .temp-beam-\<uuid> with 90% of the expected content persisted inside objects 
named with random uuids

However, when I add `.withIgnoreWindowing()` to the writeDynamic configuration, 
output seems to be fully correct and no .temp-beam directory is left over. This 
method is deprecated though and no replacement has been ever provided in the 
javadocs (at least I couldn't find any). 

Input PCollection does not contain any windowing and it does not solve the 
issue, when I force global windowing right before the writeDynamic transform.

This might be a bug in Beam or a consideration to leave out 
`.withIgnoreWindowing()` in the future versions. Kindly please investigate the 
issue.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to