[
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068782#comment-16068782
]
Eugene Kirpichov commented on BEAM-2140:
----------------------------------------
Conceptually, watermarks are for PCollections - lower bound on timestamps of
new elements that may get added to the collection.
However, at the implementation level, watermarks are assigned to transforms:
they have an "input watermark" and "output watermark" (I suppose, per input and
per output).
The difference between the output watermark of a transform producing PC and the
input watermark of a transform consuming PC is as follows: the input watermark
is held by "pending elements", that we know need to be processed, but yet
haven't.
The input watermark is also held by the event-time of pending timers set by the
transform. In other words, logically the transform's input is (output of the
producer of the input) + (timers set by the transform itself), and the input
watermark is held by both of these.
Currently the input watermark of a transform is held only by _event-time_
timers; however, it makes sense to hold it also by _processing-time_ timers.
For that we need to assign them an event-time timestamp. Currently this isn't
happening at all (except assigning an "effective timestamp" to output from the
timer firing, when it fires - it is assigned from the current input watermark).
The suggestion in case of SDF is to use the ProcessContinuation's output
watermark as the event-time for the residual timer.
We also discussed handling of processing-time timers in batch. Coming from the
point of view that things should work exactly the same way in batch - setting a
processing-time timer in batch for firing in 5 minutes should actually fire it
after 5 minutes, including possibly delaying the bundle until processing-time
timers quiesce. Motivating use case is, say, using an SDF-based polling
continuous glob expander in a batch pipeline - it should process the same set
of files it would in a streaming pipeline.
A few questions I still do not understand:
- Where exactly do the processing-timers get dropped, and on what condition?
Kenn says that event-time timers don't get dropped: we just forbid setting them
if they would be already "late".
- When can an input to the SDF, or a timer set by the SDF be late at all; and
should the SDF drop them? Technically a runner is free to drop late data at any
point in the pipeline, but in practice it happens after GBKs; and semantically
an SDF need not involve a GBK, so it should be allowed to just not drop
anything late, no? - like a regular DoFn would (as long as it doesn't leak
state)
Seems like we also should file JIRAs for the following:
- state leakage
- handling processing-time timers in batch properly
- holding watermark by processing-time timers
- allowing the timer API (internals or the user-facing one) to specifying
event-time of processing-time timers
- more?
> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled
> the tests to unblock the open PR for BEAM-1763.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)