[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589321#comment-15589321
 ] 

Daniel Halperin commented on BEAM-696:
--------------------------------------

Let me try to restate what Kenn said above, and see if that explains why I 
disagree with this proposal.

The model has defined {{Combine.perKey}} as the following composite: 
{{GroupByKey}} | {{Map\[preserve K, convert Iterable<V> into 
Combine(Iterable<V>)\]}}.

Now, we have also let users express the {{CombineFn}} in many parts, so that it 
is possible to "pre-combine" some of the steps. As Amit has noted, this is an 
important optimization for nearly all runners.

Amit has accurately identified that *it is not always safe to pre-combine*, 
specifically in the case of merging windows in the main input. I agree!

Where we differ (and I think I'm merely echoing Kenn and Aljoscha) is in the 
resolution.

* Dataflow, Direct, and Flink runners recognize this case and choose not to 
"pre-combine", providing the semantics of {{Combine.perKey}}. Yes this 
sacrifices an optimization opportunity, but not really -- only in the case 
where it's not safe.
* Amit has proposed to allow runners to "pre-combine" in all cases, as the 
Spark runner currently does, and instead suggest that a user only use 
{{Combine}} when it is safe to do so.

Personally, I prefer the former approach: keep the definition simple and clear 
and let runners optimize only when it is correct to do so. Users should not 
have to change their transforms dramatically based on the window.

The counterpoint is that a pipeline might be dramatically slower just because 
of a choice of window, while making the user think about their choice of 
transform would make this explicit. That is true. But I think that this would 
lead to users with incorrect pipelines that might not notice.


So: I think this is a bug in the Spark Runner, and Spark runner should mimic 
the logic from the Flink Runner. Don't use pre-combining if this would result 
in violating the simple composite definition of {{Combine.perKey}}.

> Side-Inputs non-deterministic with merging main-input windows
> -------------------------------------------------------------
>
>                 Key: BEAM-696
>                 URL: https://issues.apache.org/jira/browse/BEAM-696
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Ben Chambers
>            Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to