[
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772616#comment-17772616
]
Yordan Pavlov edited comment on FLINK-33109 at 10/6/23 3:44 PM:
----------------------------------------------------------------
Hi [~fanrui]
sorry for the delay, running this image gives me the following error, shortly
after start
{code:java}
2023-10-06 14:54:20.725 [AsyncOperations-thread-1] INFO
o.a.flink.streaming.runtime.tasks.AsyncCheckpointRunnable -
TumblingEventTimeWindows (10/32)#0 - asynchronous part of checkpoint 1 could
not be completed.
java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: 'void
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.<init>(java.util.UUID,
org.apache.flink.runtime.state.KeyGroupRange, long, java.util.List,
java.util.List, org.apache.flink.runtime.state.StreamStateHandle, long)'
at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodError: 'void
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.<init>(java.util.UUID,
org.apache.flink.runtime.state.KeyGroupRange, long, java.util.List,
java.util.List, org.apache.flink.runtime.state.StreamStateHandle, long)'
at
org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy$RocksDBNativeFullSnapshotOperation.get(RocksNativeFullSnapshotStrategy.java:198)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:541)
... 6 common frames omitted{code}
so I couldn't test if has the problem I am facing.
Attached you can find a simple program which would exhibit the problem with the
latest stable image 1.17.1: [^WatermarkTest-1.scala]
what I would observe is that different sources would converge shortly after
start having a small watermark alignment value. However if I kill the task
manager and restart from a checkpoint they would start to diverge as if no
watermark alignment is present.
was (Author: yordanpavlov):
Hi [~fanrui]
sorry for the delay, running this image gives me the following error, shortly
after start
{code:java}
2023-10-06 14:54:20.725 [AsyncOperations-thread-1] INFO
o.a.flink.streaming.runtime.tasks.AsyncCheckpointRunnable -
TumblingEventTimeWindows (10/32)#0 - asynchronous part of checkpoint 1 could
not be completed.
java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: 'void
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.<init>(java.util.UUID,
org.apache.flink.runtime.state.KeyGroupRange, long, java.util.List,
java.util.List, org.apache.flink.runtime.state.StreamStateHandle, long)'
at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodError: 'void
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.<init>(java.util.UUID,
org.apache.flink.runtime.state.KeyGroupRange, long, java.util.List,
java.util.List, org.apache.flink.runtime.state.StreamStateHandle, long)'
at
org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy$RocksDBNativeFullSnapshotOperation.get(RocksNativeFullSnapshotStrategy.java:198)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:541)
... 6 common frames omitted{code}
so I couldn't test if has the problem I am facing.
Attached you can find a simple program which would exhibit the problem with the
latest stable image 1.17.1: [^WatermarkTest.scala]
what I would observe is that different sources would converge shortly after
start having a small watermark alignment value. However if I kill the task
manager and restart from a checkpoint they would start to diverge as if no
watermark alignment is present.
> Watermark alignment not applied after recovery from checkpoint
> --------------------------------------------------------------
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.17.1
> Reporter: Yordan Pavlov
> Priority: Major
> Attachments: WatermarkTest-1.scala,
> image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka
> source watermarks would start to diverge not honoring the watermark alignment
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg =>
> msg.value.getTimestamp)
> .withWatermarkAlignment("alignment-sources-group",
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>
> This works great up until my job needs to recover from checkpoint. Once the
> recovery takes place, no alignment is taking place any more. This can best be
> illustrated by looking at the watermark metrics for various operators in the
> image:
> !image-2023-09-18-15-40-06-868.png!
>
> You can see how the watermarks disperse after the recovery. Trying to debug
> the problem I noticed that before the failure there would be calls in
>
> {code:java}
> SourceCoordinator::announceCombinedWatermark()
> {code}
> after the recovery, no calls get there, so no value for
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all
> state from Zookeeper and then manually start Flink providing the last
> checkpoint with
> {code:java}
> '–fromSavepoint'{code}
> flag. This would cause the SourceCoordinator to be constructed properly and
> watermark drift to be checked. Once recovery manually watermarks would again
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>
> Let me know If I can be helpful by providing any more information.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)