Yordan Pavlov created FLINK-33109:
-------------------------------------
Summary: Watermark alignment not applied after recovery from
checkpoint
Key: FLINK-33109
URL: https://issues.apache.org/jira/browse/FLINK-33109
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.17.1
Reporter: Yordan Pavlov
Attachments: image-2023-09-18-15-40-06-868.png,
image-2023-09-18-15-46-16-106.png
I am observing a problem where after recovery from a checkpoint the Kafka
source watermarks would start to diverge not honoring the watermark alignment
setting I have applied.
I have a Kafka source which reads a topic with 32 partitions. I am applying the
following watermark strategy:
{code:java}
new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg =>
msg.value.getTimestamp)
.withWatermarkAlignment("alignment-sources-group",
time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
This works great up until my job needs to recover from checkpoint. Once the
recovery takes place, no alignment is taking place any more. This can best be
illustrated by looking at the watermark metrics for various operators in the
image:
!image-2023-09-18-15-40-06-868.png!
You can see how the watermarks disperse after the recovery. Trying to debug the
problem I noticed that before the failure there would be calls in
{code:java}
SourceCoordinator::announceCombinedWatermark()
{code}
after the recovery, no calls get there, so no value for
{code:java}
watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
is ever read. I can manually fix the problem If I stop the job, clear all state
from Zookeeper and then manually start Flink providing the last checkpoint with
{code:java}
'–fromSavepoint'{code}
flag. This would cause the SourceCoordinator to be constructed properly and
watermark drift to be checked. Once recovery manually watermarks would again
converge to the allowed drift as seen in the metrics:
!image-2023-09-18-15-46-16-106.png!
Let me know If I can be helpful by providing any more information.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)