[
https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mike Pedersen resolved BEAM-5098.
---------------------------------
Resolution: Fixed
Fix Version/s: 2.7.0
> Combine.Globally::asSingletonView clears side inputs
> ----------------------------------------------------
>
> Key: BEAM-5098
> URL: https://issues.apache.org/jira/browse/BEAM-5098
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.5.0
> Reporter: Mike Pedersen
> Assignee: Mike Pedersen
> Priority: Critical
> Labels: beginner, starter, triaged
> Fix For: 2.7.0
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> It seems like calling .asSingletonView on Combine.Globally clears all side
> inputs. Take this code for example:
>
> {code:java}
> public class Main {
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> PCollection<Integer> a = p.apply(Create.of(1, 2, 3));
> PCollectionView<Integer> b =
> p.apply(Create.of(10)).apply(View.asSingleton());
> a
> .apply(Combine.globally(new
> CombineWithContext.CombineFnWithContext<Integer, Integer, Integer>() {
> @Override
> public Integer
> createAccumulator(CombineWithContext.Context c) {
> return c.sideInput(b);
> }
> @Override
> public Integer addInput(Integer accumulator, Integer
> input, CombineWithContext.Context c) {
> return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable<Integer>
> accumulators, CombineWithContext.Context c) {
> int sum = 0;
> for (int i : accumulators) {
> sum += i;
> }
> return sum;
> }
> @Override
> public Integer extractOutput(Integer accumulator,
> CombineWithContext.Context c) {
> return accumulator;
> }
> @Override
> public Integer defaultValue() {
> return 0;
> }
> }).withSideInputs(b).asSingletonView());
> p.run().waitUntilFinish();
> }
> }{code}
> This fails with the following exception:
> {code:java}
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalArgumentException: calling sideInput() with unknown view
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at Main.main(Main.java:287)
> Caused by: java.lang.IllegalArgumentException: calling sideInput() with
> unknown view
> at
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212)
> at
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69)
> at
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489)
> at
> org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137)
> at Main$1.createAccumulator(Main.java:258)
> at Main$1.createAccumulator(Main.java:255)
> at
> org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120)
> at
> org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code}
> But if you change
> {code:java}
> .withSideInputs(b).asSingletonView()){code}
> to
> {code:java}
> .withSideInputs(b)).apply(View.asSingleton()){code}
> then it works just fine.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)