[ https://issues.apache.org/jira/browse/BEAM-1983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15969724#comment-15969724 ]
Eugene Kirpichov commented on BEAM-1983: ---------------------------------------- One more thing to be on the lookout for in test coverage: verify also that windowed side inputs are accessible after a suspend/resume (in the context of a timer firing) - should use proper window of the timer. > SDF should properly support windowed side inputs > ------------------------------------------------ > > Key: BEAM-1983 > URL: https://issues.apache.org/jira/browse/BEAM-1983 > Project: Beam > Issue Type: Bug > Components: runner-apex, runner-dataflow, runner-direct, > runner-flink, sdk-java-core > Reporter: Eugene Kirpichov > Assignee: Eugene Kirpichov > > Currently there is no test coverage for Splittable DoFn + windowed side > inputs, especially when not all of the side input windows are ready. > Moreover, current implementation of SDF in the direct runner is definitely > wrong: it uses a ParDoEvaluator to run the ProcessFn, and this ParDoEvaluator > looks at the wrong windows to decide which windows are ready and which are > not: > https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L134 > - the WindowedValue in question is a KeyedWorkItem, and they are always in > the global window, but the important windows are windows of elements inside > this KWI's elementsIterable(). > The Flink implementation is also wrong in the same way. > This JIRA is to: > 1) add test coverage for this case > 2) implement proper support in all runners > I believe the easiest way to do 2) is to: > - make SplittableParDo, in case the DoFn has side inputs, pre-explode windows > before feeding them into GroupByKeyIntoKeyedWorkItems , so that the resulting > KWI's have elements only in a single window > - tweak runners to look at the proper window, and assert that there's only > one window, while evaluating ProcessFn, in case the DoFn uses side inputs -- This message was sent by Atlassian JIRA (v6.3.15#6346)