[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335116#comment-16335116
 ] 

Eron Wright  commented on FLINK-5479:
-------------------------------------

To elaborate on my earlier comment about `max.message.time.difference.ms`, 
let's consider the ideal watermark for the two types of timestamps supported by 
Kafka (as per KIP-32), CreateTime and LogAppendTime.

In LogAppendTime, the timestamp is monotonically increasing with each message, 
and corresponds to the wall clock time of the broker at append time.   The 
per-partition watermark could simply track the message time.   The complication 
is how to advance the watermark when the partition is idle; an in-band 
heartbeat from the broker (informing the client about the progression of its 
wall clock) would be ideal.

In CreateTime, the timestamp is supplied by the producer, but the broker may 
enforce an upper bound ("max difference") on the delta between the message 
timestamp and the broker's current time.  The ideal per-partition watermark 
would be the broker's current time minus the max difference.

 

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-5479
>                 URL: https://issues.apache.org/jira/browse/FLINK-5479
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to