[ 
https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749237#comment-15749237
 ] 

Daniel Halperin edited comment on BEAM-1149 at 12/14/16 7:30 PM:
-----------------------------------------------------------------

Yes. I think the "right" fix is more to point out that "has side inputs" 
implies "observesWindow = true", because the window of the main input is used 
when identifying the side input. Which is effectively what you have done in 
your change.


was (Author: dhalp...@google.com):
Yes. I think the "right" fix is more to point out that "has side inputs" 
implies "observesWindow = true". Which is effectively what you have done in 
your change.

> Side input access fails in direct runner (possibly others too) when input 
> element in multiple windows
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-1149
>                 URL: https://issues.apache.org/jira/browse/BEAM-1149
>             Project: Beam
>          Issue Type: Bug
>            Reporter: Eugene Kirpichov
>            Assignee: Kenneth Knowles
>            Priority: Blocker
>             Fix For: 0.4.0-incubating
>
>
> {code:java}
>   private static class FnWithSideInputs extends DoFn<String, String> {
>     private final PCollectionView<Integer> view;
>     private FnWithSideInputs(PCollectionView<Integer> view) {
>       this.view = view;
>     }
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>       c.output(c.element() + ":" + c.sideInput(view));
>     }
>   }
>   @Test
>   public void testSideInputsWithMultipleWindows() {
>     Pipeline p = TestPipeline.create();
>     MutableDateTime mutableNow = Instant.now().toMutableDateTime();
>     mutableNow.setMillisOfSecond(0);
>     Instant now = mutableNow.toInstant();
>     SlidingWindows windowFn =
>         
> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
>     PCollectionView<Integer> view = 
> p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
>     PCollection<String> res =
>         p.apply(Create.timestamped(TimestampedValue.of("a", now)))
>             .apply(Window.<String>into(windowFn))
>             .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
>     PAssert.that(res).containsInAnyOrder("a:1");
>     p.run();
>   }
> {code}
> This fails with the following exception:
> {code}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.IllegalStateException: sideInput called when main input element is 
> in multiple windows
>       at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
>       at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
>       at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
>       at ....
> Caused by: java.lang.IllegalStateException: sideInput called when main input 
> element is in multiple windows
>       at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
>       at 
> org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to