This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de280facfa5 [hotfix][runtime] Polish check hints and lines calling for 
StreamOperatorStateHandler#snapshotState. (#26477)
de280facfa5 is described below

commit de280facfa56405efb48a8ded3181f7d0bf3df3a
Author: Pan Yuepeng <[email protected]>
AuthorDate: Fri Apr 18 11:07:15 2025 +0800

    [hotfix][runtime] Polish check hints and lines calling for 
StreamOperatorStateHandler#snapshotState. (#26477)
---
 .../flink/streaming/api/operators/StreamOperatorStateHandler.java  | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index 9ea0623e319..09dcd710ee7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -232,12 +232,10 @@ public class StreamOperatorStateHandler {
         try {
             if (timeServiceManager.isPresent()) {
                 boolean requiresLegacyRawKeyedStateSnapshots;
-                final InternalTimeServiceManager<?> manager;
                 if (useAsyncState) {
                     checkState(
                             asyncKeyedStateBackend != null,
-                            "keyedStateBackend should be available with 
timeServiceManager");
-                    manager = timeServiceManager.get();
+                            "asyncKeyedStateBackend should be available with 
timeServiceManager");
                     requiresLegacyRawKeyedStateSnapshots =
                             
asyncKeyedStateBackend.requiresLegacySynchronousTimerSnapshots(
                                     checkpointOptions.getCheckpointType());
@@ -245,8 +243,6 @@ public class StreamOperatorStateHandler {
                     checkState(
                             keyedStateBackend != null,
                             "keyedStateBackend should be available with 
timeServiceManager");
-                    manager = timeServiceManager.get();
-
                     requiresLegacyRawKeyedStateSnapshots =
                             keyedStateBackend instanceof 
AbstractKeyedStateBackend
                                     && ((AbstractKeyedStateBackend<?>) 
keyedStateBackend)
@@ -257,6 +253,7 @@ public class StreamOperatorStateHandler {
                     checkState(
                             !isUsingCustomRawKeyedState,
                             "Attempting to snapshot timers to raw keyed state, 
but this operator has custom raw keyed state to write.");
+                    final InternalTimeServiceManager<?> manager = 
timeServiceManager.get();
                     manager.snapshotToRawKeyedState(
                             snapshotContext.getRawKeyedOperatorStateOutput(), 
operatorName);
                 }

Reply via email to