This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new de280facfa5 [hotfix][runtime] Polish check hints and lines calling for
StreamOperatorStateHandler#snapshotState. (#26477)
de280facfa5 is described below
commit de280facfa56405efb48a8ded3181f7d0bf3df3a
Author: Pan Yuepeng <[email protected]>
AuthorDate: Fri Apr 18 11:07:15 2025 +0800
[hotfix][runtime] Polish check hints and lines calling for
StreamOperatorStateHandler#snapshotState. (#26477)
---
.../flink/streaming/api/operators/StreamOperatorStateHandler.java | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index 9ea0623e319..09dcd710ee7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -232,12 +232,10 @@ public class StreamOperatorStateHandler {
try {
if (timeServiceManager.isPresent()) {
boolean requiresLegacyRawKeyedStateSnapshots;
- final InternalTimeServiceManager<?> manager;
if (useAsyncState) {
checkState(
asyncKeyedStateBackend != null,
- "keyedStateBackend should be available with
timeServiceManager");
- manager = timeServiceManager.get();
+ "asyncKeyedStateBackend should be available with
timeServiceManager");
requiresLegacyRawKeyedStateSnapshots =
asyncKeyedStateBackend.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());
@@ -245,8 +243,6 @@ public class StreamOperatorStateHandler {
checkState(
keyedStateBackend != null,
"keyedStateBackend should be available with
timeServiceManager");
- manager = timeServiceManager.get();
-
requiresLegacyRawKeyedStateSnapshots =
keyedStateBackend instanceof
AbstractKeyedStateBackend
&& ((AbstractKeyedStateBackend<?>)
keyedStateBackend)
@@ -257,6 +253,7 @@ public class StreamOperatorStateHandler {
checkState(
!isUsingCustomRawKeyedState,
"Attempting to snapshot timers to raw keyed state,
but this operator has custom raw keyed state to write.");
+ final InternalTimeServiceManager<?> manager =
timeServiceManager.get();
manager.snapshotToRawKeyedState(
snapshotContext.getRawKeyedOperatorStateOutput(),
operatorName);
}