Mike Pedersen created BEAM-5098:
-----------------------------------

             Summary: Combine.Globally::asSingletonView clears side inputs
                 Key: BEAM-5098
                 URL: https://issues.apache.org/jira/browse/BEAM-5098
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.5.0
            Reporter: Mike Pedersen
            Assignee: Kenneth Knowles


It seems like calling .asSingletonView on Combine.Globally clears all side 
inputs. Take this code for example:

 
{code:java}
public class Main {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        PCollection<Integer> a = p.apply(Create.of(1, 2, 3));
        PCollectionView<Integer> b = 
p.apply(Create.of(10)).apply(View.asSingleton());

        a
                .apply(Combine.globally(new 
CombineWithContext.CombineFnWithContext<Integer, Integer, Integer>() {
                    @Override
                    public Integer createAccumulator(CombineWithContext.Context 
c) {
                        return c.sideInput(b);
                    }

                    @Override
                    public Integer addInput(Integer accumulator, Integer input, 
CombineWithContext.Context c) {
                        return accumulator + input;
                    }

                    @Override
                    public Integer mergeAccumulators(Iterable<Integer> 
accumulators, CombineWithContext.Context c) {
                        int sum = 0;
                        for (int i : accumulators) {
                            sum += i;
                        }
                        return sum;
                    }

                    @Override
                    public Integer extractOutput(Integer accumulator, 
CombineWithContext.Context c) {
                        return accumulator;
                    }

                    @Override
                    public Integer defaultValue() {
                        return 0;
                    }
                }).withSideInputs(b).asSingletonView());

        p.run().waitUntilFinish();
    }
}{code}
This fails with the following exception:
{code:java}
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException: calling sideInput() with unknown view
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at Main.main(Main.java:287)
Caused by: java.lang.IllegalArgumentException: calling sideInput() with unknown 
view
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69)
    at 
org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489)
    at 
org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137)
    at Main$1.createAccumulator(Main.java:258)
    at Main$1.createAccumulator(Main.java:255)
    at 
org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120)
    at 
org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code}
But if you change
{code:java}
.withSideInputs(b).asSingletonView()){code}
to
{code:java}
.withSideInputs(b)).apply(View.asSingletonView()){code}
then it works just fine.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to