[
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15839419#comment-15839419
]
ASF GitHub Bot commented on FLINK-4616:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3031
A re-clarification about backwards compatibility for state type change:
Currently, one limitation for compatible applications across savepoint
restore is that you can't change the type of state otherwise state restore will
fail, therefore not compatible. The only work around, is to have another field
as the new state with the new type, and somehow try to "encode" / "decode" the
watermark state into / from the original `Tuple2<KafkaTopicPartition, Long>`. I
don't think this is easily possible ...
At the same time, there was recent discussion about letting the window
operators also checkpoint watermarks. So perhaps we might not need to let the
Kafka sources checkpoint watermarks in the end, if the window operators already
take care of restoring the previous event time.
What I'm curious about right now is whether or not in the future,
redistributions of Kafka partition states across source subtasks will work well
with the checkpointed watermarks in the downstream window operators. I don't
think there should be a problem.
@aljoscha can you perhaps help clarify this?
> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---------------------------------------------------------------------------
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.1.1
> Reporter: Yuri Makhno
> Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following
> messages queue results will be different after checkpoint restore and during
> normal processing:
> A(ts = 30)
> B(ts = 35)
> ------ checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)