[
https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-1149:
----------------------------------
Component/s: runner-core
> Side input access fails in direct runner (possibly others too) when input
> element in multiple windows
> -----------------------------------------------------------------------------------------------------
>
> Key: BEAM-1149
> URL: https://issues.apache.org/jira/browse/BEAM-1149
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Eugene Kirpichov
> Assignee: Kenneth Knowles
> Priority: Blocker
> Fix For: 0.4.0-incubating
>
>
> {code:java}
> private static class FnWithSideInputs extends DoFn<String, String> {
> private final PCollectionView<Integer> view;
> private FnWithSideInputs(PCollectionView<Integer> view) {
> this.view = view;
> }
> @ProcessElement
> public void processElement(ProcessContext c) {
> c.output(c.element() + ":" + c.sideInput(view));
> }
> }
> @Test
> public void testSideInputsWithMultipleWindows() {
> Pipeline p = TestPipeline.create();
> MutableDateTime mutableNow = Instant.now().toMutableDateTime();
> mutableNow.setMillisOfSecond(0);
> Instant now = mutableNow.toInstant();
> SlidingWindows windowFn =
>
> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
> PCollectionView<Integer> view =
> p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
> PCollection<String> res =
> p.apply(Create.timestamped(TimestampedValue.of("a", now)))
> .apply(Window.<String>into(windowFn))
> .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
> PAssert.that(res).containsInAnyOrder("a:1");
> p.run();
> }
> {code}
> This fails with the following exception:
> {code}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException: sideInput called when main input element is
> in multiple windows
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
> at ....
> Caused by: java.lang.IllegalStateException: sideInput called when main input
> element is in multiple windows
> at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
> at
> org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)