This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e177c9244940d27663496730ed99d447d12722a Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Sep 30 00:33:01 2025 +0200 [hotfix] Introduce OperatorSnapshotFinalizer factory method --- .../apache/flink/state/api/output/SnapshotUtils.java | 2 +- .../api/operators/OperatorSnapshotFinalizer.java | 18 +++++++++++++----- .../runtime/tasks/AsyncCheckpointRunnable.java | 2 +- .../api/operators/OperatorSnapshotFinalizerTest.java | 2 +- .../util/AbstractStreamOperatorTestHarness.java | 2 +- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index a3ecabe715f..14688e4ee74 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -72,7 +72,7 @@ public final class SnapshotUtils { operator.snapshotState(checkpointId, timestamp, options, storage); OperatorSubtaskState state = - new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState(); + OperatorSnapshotFinalizer.create(snapshotInProgress).getJobManagerOwnedState(); operator.notifyCheckpointComplete(checkpointId); return new TaggedOperatorSubtaskState(index, state); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java index c308a6cb8ae..455c2489a74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java @@ -27,12 +27,11 @@ import org.apache.flink.runtime.state.OutputStateHandle; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.util.concurrent.FutureUtils; -import javax.annotation.Nonnull; - import java.util.concurrent.ExecutionException; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singletonOrEmpty; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * This class finalizes {@link OperatorSnapshotFutures}. Each object is created with a {@link @@ -47,8 +46,9 @@ public class OperatorSnapshotFinalizer { /** Secondary replica of the operator subtask state for faster, local recovery on TM. */ private final OperatorSubtaskState taskLocalState; - public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFutures) + public static OperatorSnapshotFinalizer create(OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + checkNotNull(snapshotFutures); SnapshotResult<KeyedStateHandle> keyedManaged = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); @@ -68,7 +68,7 @@ public class OperatorSnapshotFinalizer { SnapshotResult<StateObjectCollection<OutputStateHandle>> resultSubpartition = snapshotFutures.getResultSubpartitionStateFuture().get(); - jobManagerOwnedState = + OperatorSubtaskState jobManagerOwnedState = OperatorSubtaskState.builder() .setManagedOperatorState( singletonOrEmpty(operatorManaged.getJobManagerOwnedSnapshot())) @@ -83,7 +83,7 @@ public class OperatorSnapshotFinalizer { emptyIfNull(resultSubpartition.getJobManagerOwnedSnapshot())) .build(); - taskLocalState = + OperatorSubtaskState taskLocalState = OperatorSubtaskState.builder() .setManagedOperatorState( singletonOrEmpty(operatorManaged.getTaskLocalSnapshot())) @@ -94,6 +94,14 @@ public class OperatorSnapshotFinalizer { .setResultSubpartitionState( emptyIfNull(resultSubpartition.getTaskLocalSnapshot())) .build(); + + return new OperatorSnapshotFinalizer(jobManagerOwnedState, taskLocalState); + } + + public OperatorSnapshotFinalizer( + OperatorSubtaskState jobManagerOwnedState, OperatorSubtaskState taskLocalState) { + this.jobManagerOwnedState = checkNotNull(jobManagerOwnedState); + this.taskLocalState = checkNotNull(taskLocalState); } public OperatorSubtaskState getTaskLocalState() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index 2e859d2ab8d..ee674fc1c2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -189,7 +189,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { // finalize the async part of all by executing all snapshot runnables OperatorSnapshotFinalizer finalizedSnapshots = - new OperatorSnapshotFinalizer(snapshotInProgress); + OperatorSnapshotFinalizer.create(snapshotInProgress); jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( operatorID, finalizedSnapshots.getJobManagerOwnedState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java index a64905098af..17d6351e365 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java @@ -93,7 +93,7 @@ class OperatorSnapshotFinalizerTest { assertThat(f).isNotDone(); } - OperatorSnapshotFinalizer finalizer = new OperatorSnapshotFinalizer(snapshotFutures); + OperatorSnapshotFinalizer finalizer = OperatorSnapshotFinalizer.create(snapshotFutures); for (Future<?> f : snapshotFutures.getAllFutures()) { assertThat(f).isDone(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 31a6019a50a..b72f7299f4b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -761,7 +761,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { checkpointStorageAccess.resolveCheckpointStorageLocation( checkpointId, locationReference)); - return new OperatorSnapshotFinalizer(operatorStateResult); + return OperatorSnapshotFinalizer.create(operatorStateResult); } /**
