[
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101562#comment-16101562
]
Fabian Hueske commented on FLINK-7245:
--------------------------------------
Great, thanks for the response [~xccui]!
Ad 2) No, not really for mentoring. I envision the new operator as a skeleton
that automatically takes care of delaying the watermarks. In order to do that,
the custom user code would need to report the smallest timestamps which will be
emitted in the future. I called the method to report these timestamps "hook",
probably not the most appropriate term.
Ad 3) Hmm, I was thinking about this a bit. What I said before about the
{{OperatorStateStore.getUnionListState()}} would not work. We need to
checkpoint the smallest future timestamp for each key. Since keys can be
assigned to different operators in case of rescaling or recovery, we need to
ensure that this information is kept together with the keys. However, operators
have no access to keys but only to key groups (key groups are the unit of key
distribution in Flink). Hence, we need to keep PriorityQueues for each key
group and checkpoint those together with the keygroup. If a key group is moved
to a different operator, the priority queue will be move there as well. I
talked to [~aljoscha] and we can do this only on the lowest level of operator
abstraction which is the {{AbstractStreamOperator}}. Have a look at the
{{snapshotState()}} method which iterates over all key groups to snapshot timer
information. I have to admit, I'm not familiar with the source code at this
level.
Ad 4) Each operator keeps track of all watermarks it receives from all input
channels (i.e., each parallel instance of each input operator). For each input
channel, it keeps maximum (i.e., latest received) watermark and computes its
own watermark (which is emitted via all outgoing channels) as the minimum of
those maximum input channel watermarks. So, each operator has only a single
watermark (i.e., not a watermark per key). However, our custom code will have
to operate on multiple keys and keys are strictly separated from each other
such that no key is aware of the lowest timestamp of the others.
Ad 5) That's a good point. We can make that configurable as well. I thought,
emitting a watermark when a watermark is received would be a good default.
Cheers, Fabian
> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Xingcan Cui
> Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the
> {{AbstractStreamOperator}} instantly.
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> if (timeServiceManager != null) {
> timeServiceManager.advanceWatermark(mark);
> }
> output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these
> watermarks (e.g., join or aggregate results) may be regarded as delayed by
> the downstream operators since their timestamps must be less than or equal to
> the corresponding triggers.
> This issue aims to add another "working mode", which supports holding back
> watermarks, to current operators. These watermarks should be blocked and
> stored by the operators until all the corresponding new generated results are
> emitted.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)