[ 
https://issues.apache.org/jira/browse/BEAM-1983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov updated BEAM-1983:
-----------------------------------
    Description: 
Currently there is no test coverage for Splittable DoFn + windowed side inputs, 
especially when not all of the side input windows are ready.

Moreover, current implementation of SDF in the direct runner is definitely 
wrong: it uses a ParDoEvaluator to run the ProcessFn, and this ParDoEvaluator 
looks at the wrong windows to decide which windows are ready and which are not: 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L134
 - the WindowedValue in question is a KeyedWorkItem, and they are always in the 
global window, but the important windows are windows of elements inside this 
KWI's elementsIterable().

The Flink implementation is also wrong in the same way.

This JIRA is to:
1) add test coverage for this case
2) implement proper support in all runners

I believe the easiest way to do 2) is to:
- make SplittableParDo, in case the DoFn has side inputs, pre-explode windows 
before feeding them into GroupByKeyIntoKeyedWorkItems , so that the resulting 
KWI's have elements only in a single window
- tweak runners to look at the proper window, and assert that there's only one 
window, while evaluating ProcessFn, in case the DoFn uses side inputs


  was:
Currently there is no test coverage for Splittable DoFn + windowed side inputs, 
especially when not all of the side input windows are ready.

Moreover, current implementation of SDF in the direct runner is definitely 
wrong: it uses a ParDoEvaluator to run the ProcessFn, and this ParDoEvaluator 
looks at the wrong windows to decide which windows are ready and which are not: 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L134
 - the WindowedValue in question is a KeyedWorkItem, and they are always in the 
global window, but the important windows are windows of elements inside this 
KWI's elementsIterable().

This JIRA is to:
1) add test coverage for this case
2) implement proper support in all runners

I believe the easiest way to do 2) is to:
- make SplittableParDo, in case the DoFn has side inputs, pre-explode windows 
before feeding them into GroupByKeyIntoKeyedWorkItems , so that the resulting 
KWI's have elements only in a single window
- tweak runners to look at the proper window, and assert that there's only one 
window, while evaluating ProcessFn, in case the DoFn uses side inputs



> SDF should properly support windowed side inputs
> ------------------------------------------------
>
>                 Key: BEAM-1983
>                 URL: https://issues.apache.org/jira/browse/BEAM-1983
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-apex, runner-dataflow, runner-direct, 
> runner-flink, sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>
> Currently there is no test coverage for Splittable DoFn + windowed side 
> inputs, especially when not all of the side input windows are ready.
> Moreover, current implementation of SDF in the direct runner is definitely 
> wrong: it uses a ParDoEvaluator to run the ProcessFn, and this ParDoEvaluator 
> looks at the wrong windows to decide which windows are ready and which are 
> not: 
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L134
>  - the WindowedValue in question is a KeyedWorkItem, and they are always in 
> the global window, but the important windows are windows of elements inside 
> this KWI's elementsIterable().
> The Flink implementation is also wrong in the same way.
> This JIRA is to:
> 1) add test coverage for this case
> 2) implement proper support in all runners
> I believe the easiest way to do 2) is to:
> - make SplittableParDo, in case the DoFn has side inputs, pre-explode windows 
> before feeding them into GroupByKeyIntoKeyedWorkItems , so that the resulting 
> KWI's have elements only in a single window
> - tweak runners to look at the proper window, and assert that there's only 
> one window, while evaluating ProcessFn, in case the DoFn uses side inputs



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to