Niel Markwick created BEAM-6407:
-----------------------------------

             Summary: regression: FileIO.writeDynamic() with side inputs fails 
in DirectRunner
                 Key: BEAM-6407
                 URL: https://issues.apache.org/jira/browse/BEAM-6407
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.9.0
            Reporter: Niel Markwick
            Assignee: Kenneth Knowles


When FileIO.writeDynamic is used with automatic sharding and  a Contextful.Fn 
that uses side inputs for the file naming, DirectRunner (and TestPipeline) fail 
with: 

{{java.lang.IllegalStateException: All PCollectionViews that are consumed must 
be written by some WriteView PTransform: Missing [<unnamed> 
[RunnerPCollectionView]]}}

 

Example code: 

 

{{PCollectionView<String> outputDirectoryname = pipeline.apply("outputDir", 
Create.of("/tmp/testout")).apply(View.asSingleton());}}

{{Contextful.Fn<String, FileIO.Write.FileNaming> manifestNaming =(element, c) 
-> (window, pane, numShards, shardIndex, compression) -> 
c.sideInput(outputDirectoryname);}}
 {{pipeline.apply(FileIO.<String, String>writeDynamic()}}
 {{   .by(SerializableFunctions.constant(""))}}
 {{   .withDestinationCoder(StringUtf8Coder.of())}}
 {{   .via(TextIO.sink())}}
 {{   .withTempDirectory("/tmp")}}
 {{   .withNaming(Contextful.of(}}
 {{      manifestNaming,}}
 {{      Requirements.requiresSideInputs(outputDirectoryname))));}}

 

This does not occur in Dataflow-runner

It does not occur if the ContextFul.Fn is not given side inputs.

It does not occur if withNumShards(1) is set.

It did not occur in 2.8.0, and does in 2.9.0

 

The cause appears to be due to the DirectRunner using TransformOverrides 
re-writing FileIO sinks to use runner-determined-sharding

( see [DirectRunner.java line 
226|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L226]
 )

 but I do not know why this started occuring in 2.9.0...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to