This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e177c9244940d27663496730ed99d447d12722a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Sep 30 00:33:01 2025 +0200

    [hotfix] Introduce OperatorSnapshotFinalizer factory method
---
 .../apache/flink/state/api/output/SnapshotUtils.java   |  2 +-
 .../api/operators/OperatorSnapshotFinalizer.java       | 18 +++++++++++++-----
 .../runtime/tasks/AsyncCheckpointRunnable.java         |  2 +-
 .../api/operators/OperatorSnapshotFinalizerTest.java   |  2 +-
 .../util/AbstractStreamOperatorTestHarness.java        |  2 +-
 5 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
index a3ecabe715f..14688e4ee74 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -72,7 +72,7 @@ public final class SnapshotUtils {
                 operator.snapshotState(checkpointId, timestamp, options, 
storage);
 
         OperatorSubtaskState state =
-                new 
OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
+                
OperatorSnapshotFinalizer.create(snapshotInProgress).getJobManagerOwnedState();
 
         operator.notifyCheckpointComplete(checkpointId);
         return new TaggedOperatorSubtaskState(index, state);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
index c308a6cb8ae..455c2489a74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
@@ -27,12 +27,11 @@ import org.apache.flink.runtime.state.OutputStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import javax.annotation.Nonnull;
-
 import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull;
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singletonOrEmpty;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This class finalizes {@link OperatorSnapshotFutures}. Each object is 
created with a {@link
@@ -47,8 +46,9 @@ public class OperatorSnapshotFinalizer {
     /** Secondary replica of the operator subtask state for faster, local 
recovery on TM. */
     private final OperatorSubtaskState taskLocalState;
 
-    public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures 
snapshotFutures)
+    public static OperatorSnapshotFinalizer create(OperatorSnapshotFutures 
snapshotFutures)
             throws ExecutionException, InterruptedException {
+        checkNotNull(snapshotFutures);
 
         SnapshotResult<KeyedStateHandle> keyedManaged =
                 
FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
@@ -68,7 +68,7 @@ public class OperatorSnapshotFinalizer {
         SnapshotResult<StateObjectCollection<OutputStateHandle>> 
resultSubpartition =
                 snapshotFutures.getResultSubpartitionStateFuture().get();
 
-        jobManagerOwnedState =
+        OperatorSubtaskState jobManagerOwnedState =
                 OperatorSubtaskState.builder()
                         .setManagedOperatorState(
                                 
singletonOrEmpty(operatorManaged.getJobManagerOwnedSnapshot()))
@@ -83,7 +83,7 @@ public class OperatorSnapshotFinalizer {
                                 
emptyIfNull(resultSubpartition.getJobManagerOwnedSnapshot()))
                         .build();
 
-        taskLocalState =
+        OperatorSubtaskState taskLocalState =
                 OperatorSubtaskState.builder()
                         .setManagedOperatorState(
                                 
singletonOrEmpty(operatorManaged.getTaskLocalSnapshot()))
@@ -94,6 +94,14 @@ public class OperatorSnapshotFinalizer {
                         .setResultSubpartitionState(
                                 
emptyIfNull(resultSubpartition.getTaskLocalSnapshot()))
                         .build();
+
+        return new OperatorSnapshotFinalizer(jobManagerOwnedState, 
taskLocalState);
+    }
+
+    public OperatorSnapshotFinalizer(
+            OperatorSubtaskState jobManagerOwnedState, OperatorSubtaskState 
taskLocalState) {
+        this.jobManagerOwnedState = checkNotNull(jobManagerOwnedState);
+        this.taskLocalState = checkNotNull(taskLocalState);
     }
 
     public OperatorSubtaskState getTaskLocalState() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index 2e859d2ab8d..ee674fc1c2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -189,7 +189,7 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
 
             // finalize the async part of all by executing all snapshot 
runnables
             OperatorSnapshotFinalizer finalizedSnapshots =
-                    new OperatorSnapshotFinalizer(snapshotInProgress);
+                    OperatorSnapshotFinalizer.create(snapshotInProgress);
 
             jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                     operatorID, finalizedSnapshots.getJobManagerOwnedState());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
index a64905098af..17d6351e365 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
@@ -93,7 +93,7 @@ class OperatorSnapshotFinalizerTest {
             assertThat(f).isNotDone();
         }
 
-        OperatorSnapshotFinalizer finalizer = new 
OperatorSnapshotFinalizer(snapshotFutures);
+        OperatorSnapshotFinalizer finalizer = 
OperatorSnapshotFinalizer.create(snapshotFutures);
 
         for (Future<?> f : snapshotFutures.getAllFutures()) {
             assertThat(f).isDone();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 31a6019a50a..b72f7299f4b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -761,7 +761,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                         
checkpointStorageAccess.resolveCheckpointStorageLocation(
                                 checkpointId, locationReference));
 
-        return new OperatorSnapshotFinalizer(operatorStateResult);
+        return OperatorSnapshotFinalizer.create(operatorStateResult);
     }
 
     /**

Reply via email to