[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15839419#comment-15839419
 ] 

ASF GitHub Bot commented on FLINK-4616:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3031
  
    A re-clarification about backwards compatibility for state type change:
    Currently, one limitation for compatible applications across savepoint 
restore is that you can't change the type of state otherwise state restore will 
fail, therefore not compatible. The only work around, is to have another field 
as the new state with the new type, and somehow try to "encode" / "decode" the 
watermark state into / from the original `Tuple2<KafkaTopicPartition, Long>`. I 
don't think this is easily possible ...
    
    At the same time, there was recent discussion about letting the window 
operators also checkpoint watermarks. So perhaps we might not need to let the 
Kafka sources checkpoint watermarks in the end, if the window operators already 
take care of restoring the previous event time.
    What I'm curious about right now is whether or not in the future, 
redistributions of Kafka partition states across source subtasks will work well 
with the checkpointed watermarks in the downstream window operators. I don't 
think there should be a problem.
    
    @aljoscha can you perhaps help clarify this?


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-4616
>                 URL: https://issues.apache.org/jira/browse/FLINK-4616
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.1
>            Reporter: Yuri Makhno
>            Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> ------ checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to