[
https://issues.apache.org/jira/browse/BEAM-241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15266983#comment-15266983
]
Kenneth Knowles commented on BEAM-241:
--------------------------------------
Your first point: I think that is a good idea. We should make all of this part
of {{runners/core}} so those runners that want to use it can do so. This is
closely related to the Fn API, so will probably face some changes as we sort
that out. It seems like a fruitful area.
Your second point: We definitely want to move away from {{DoFns}} that do
windowing operations. With the exception of window merging, we'd like
processing of a window to be fully isolated from other windows (more
parallelism possible / smaller scope of stateful DoFn state once we make it
user-facing, support shuffle-by-key-and-window implementation strategy, simpler
{{ParDo}} semantics, supports micro-batch runners... probably more pragmatic
benefits we haven't thought of will come from this clean theoretical move).
This is a part of why I proposed making {{Window.into}} a primitive.
So you definitely should phase out use of {{GroupAlsoByWindowViaWindowSetDoFn}}
as a parameter to {{ParDo}}. The logic in {{GroupAlsoByWindowViaWindowSetDoFn}}
is fine and you can still re-use it, you just need to get the
{{StateInternals}} and {{outputWindowedValue}} via the runner backend rather
than the {{ProcessContext}}.
I've started this for the new {{InProcessPipelineRunner}} in
[#268|https://github.com/apache/incubator-beam/pull/268/files] - {{GroupByKey}}
expands into runner-specific primitives {{GroupByKeyOnly}} and
{{GroupAlsoByWindow}} and then the runner provides a translator/interpreter for
those transforms. Within that translator/interpreter it is fine to use the
logic in {{GroupAlsoByWindowViaWindowSetDoFn}}, as I do [right
here|https://github.com/apache/incubator-beam/pull/268/files#diff-b81c7b54aea3aaed9e80891f97f3d911R97].
> Not easy for runners to get late-data dropping
> ----------------------------------------------
>
> Key: BEAM-241
> URL: https://issues.apache.org/jira/browse/BEAM-241
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Mark Shields
> Assignee: Frances Perry
>
> Quite by accident realized the Flink runner delegates to
> GroupAlsoByWindowViaWindowSetDoFn for GBK, which in turn delegates to
> ReduceFnRunner. The latter now assumes no messages will arrive beyond the
> 'garbage collection' time of their target window(s).
> The Dataflow runner interposes a LateDataDroppingDoFnRunner into the path so
> as to drop those too-late messages. That's done (I think) using
> DoFnRunners.createDefault.
> I don't think the Flink runner does that.
> We need a nice runner-friendly way of dealing with the too-late data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)