[
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065661#comment-16065661
]
Eugene Kirpichov commented on BEAM-2140:
----------------------------------------
Working backwards from that, in the "read Pubsub topic names from Kafka" case:
let "topicsPC" be the PCollection of topic names, and "recordsPC" be the
PCollection of records read from these topics.
We want the watermark of "recordsPC" to be a lower bound on timestamps of new
records that will ever be read from any of the current or future topics.
Currently known elements of topicsPC already provide this bound for their
records via the watermark hold, but for elements of recordsPC that will be
produced from future elements of topicsPC, the only information we have is the
watermark of topicsPC.
So, the ideal observable watermark behavior is as follows:
- elements in topicsPC have timestamps, and timestamp of an element in topicsPC
is a reasonable starting watermark for elements produced from this topic into
recordsPC.
- watermark of recordsPC should be min(current watermark holds set by currently
pending element/restriction pairs from topicsPC, watermark of topicsPC itself)
- the first term describes what records can arrive from currently read topics,
the second term, from future topics, due to bullet 1.
- in the special case where topicsPC is bounded (e.g. Create.of()) and its
watermark has advanced to infinity, this reduces to just the current watermark
holds, which is correct.
Now, our problem is that if watermark of topicsPC advances to infinity (e.g.
because it was bounded and we've processed the initial ProcessElement calls for
its element/restriction pairs), the runner thinks that it's a promise that
"likely nothing new will appear in this PCollection" which is not true of the
processing-time timer set by SDF.
On the other hand, if we hold the watermark of topicsPC at the original ancient
timestamp of the element/restriction pair, the runner will interpret it as "I
can only promise you that new elements/timers in topicsPC will have a timestamp
later than this ancient timestamp" which is unnecessarily restrictive - in
reality, new elements/timers in topicsPC will either come from the transform
that produces topicsPC, or from new processing-time timers scheduled by the
currently read topics, and watermark should be min(these).
How do we make a promise about future event-time timestamps of processing-time
timers? You say "Currently processing time timers are treated as inputs with a
timestamp equal to the input watermark at the moment of their arrival" - which
would be the current watermark of "topicsPC" I suppose? I think that would be
consistent with the desired behavior above.
(clearly more thought is needed, but thought I'd dump this anyway)
> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled
> the tests to unblock the open PR for BEAM-1763.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)