[ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442731#comment-16442731
 ] 

Eugene Kirpichov commented on BEAM-3268:
----------------------------------------

They are, if it's fused with another ParDo. c.output(x) passes x through the 
whole fusion tree rather than buffering it until the current ProcessElement is 
complete.

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3268
>                 URL: https://issues.apache.org/jira/browse/BEAM-3268
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.3.0
>            Reporter: Kamil Szewczyk
>            Assignee: Eugene Kirpichov
>            Priority: Major
>         Attachments: comparison.png
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.<String>create())
>         .apply(Reshuffle.<String>viaRandomKey());
>     PCollection<String> consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to