ASF GitHub Bot commented on FLINK-5479:

Github user juhoautio commented on the issue:

    @tzulitai did you ever test your code? I tried it and it allowed watermarks 
to proceed but apparently too aggressively, as it caused a lot of data loss.
    I'm looking for a quick fix for this issue, as it seems that FLINK-5479 
won't be fixed too soon. So I would very much like to hear if you have been 
able to fix this in some lighter way.
    My understanding of your PR is that it doesn't work reliably because it 
just seems to add an internal timeout, that could be surpassed whenever the 
consumer is for example busy consuming other partitions. Please comment if this 
perception is wrong.
    I'm thinking that it should instead get the information that a partition 
was idle from the kafka client, and only in that case (empty result from 
client) create a newer watermark for that partition. It shouldn't mark the 
partition to some idle state – and shouldn't create newer watermarks 
periodically without any connection to another empty result from the client. 
New watermarks should be only generated as a callback of the kafka client 

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> ------------------------------------------------------------------------------
>                 Key: FLINK-5479
>                 URL: https://issues.apache.org/jira/browse/FLINK-5479
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.6.0
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.

This message was sent by Atlassian JIRA

Reply via email to