This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new e54317de [runtime] Prune restored processing keys before snapshot 
(#825)
e54317de is described below

commit e54317deb5ccc504c4ec0371b4cf4d30ee6182bd
Author: Joey Tong <[email protected]>
AuthorDate: Wed Jun 10 10:16:21 2026 +0800

    [runtime] Prune restored processing keys before snapshot (#825)
---
 .../runtime/operator/ActionExecutionOperator.java  |  8 +++++
 .../runtime/operator/OperatorStateManager.java     |  5 ++++
 .../operator/ActionExecutionOperatorTest.java      | 35 ++++++++++++++++++++++
 3 files changed, 48 insertions(+)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index caefb955..665551bb 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -55,8 +55,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -557,15 +560,20 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             KeyGroupRange currentSubtaskKeyGroupRange =
                     stateManager.getCurrentSubtaskKeyGroupRange(
                             maxParallelism, getRuntimeContext());
+            Set<Object> ownedKeys = new LinkedHashSet<>();
             for (Object key : keys) {
                 if (!stateManager.isKeyOwnedByCurrentSubtask(
                         key, maxParallelism, currentSubtaskKeyGroupRange)) {
                     continue;
                 }
+                if (!ownedKeys.add(key)) {
+                    continue;
+                }
                 eventRouter.getKeySegmentQueue().addKeyToLastSegment(key);
                 mailboxExecutor.submit(
                         () -> tryProcessActionTaskForKey(key), "process action 
task");
             }
+            stateManager.replaceProcessingKeys(new ArrayList<>(ownedKeys));
         }
 
         stateManager.forEachPendingInputEventKey(
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
index eee3c2dd..19f1f67c 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.List;
 
 import static org.apache.flink.agents.runtime.utils.StateUtil.*;
 
@@ -265,6 +266,10 @@ class OperatorStateManager {
         currentProcessingKeysOpState.add(key);
     }
 
+    void replaceProcessingKeys(List<Object> keys) throws Exception {
+        currentProcessingKeysOpState.update(keys);
+    }
+
     int removeProcessingKey(Object key) throws Exception {
         return removeFromListState(currentProcessingKeysOpState, key);
     }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 41cb5109..9ac18e3e 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -232,6 +232,41 @@ public class ActionExecutionOperatorTest {
 
             assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1);
             assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero();
+            assertThat(
+                            ((ActionExecutionOperator<Long, Object>) 
ownerHarness.getOperator())
+                                    .getOperatorStateManager()
+                                    .getProcessingKeys())
+                    .containsExactly(key);
+            assertThat(
+                            ((ActionExecutionOperator<Long, Object>) 
nonOwnerHarness.getOperator())
+                                    .getOperatorStateManager()
+                                    .getProcessingKeys())
+                    .isEmpty();
+
+            OperatorSubtaskState secondCheckpoint =
+                    AbstractStreamOperatorTestHarness.repackageState(
+                            ownerHarness.snapshot(2L, 2L), 
nonOwnerHarness.snapshot(2L, 2L));
+            OperatorSubtaskState secondRestoreOwnerState =
+                    AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                            secondCheckpoint,
+                            maxParallelism,
+                            newParallelism,
+                            newParallelism,
+                            ownerSubtask);
+
+            try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
restoredOwnerHarness =
+                    new KeyedOneInputStreamOperatorTestHarness<>(
+                            new 
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+                            (KeySelector<Long, Long>) value -> value,
+                            TypeInformation.of(Long.class),
+                            maxParallelism,
+                            newParallelism,
+                            ownerSubtask)) {
+                restoredOwnerHarness.initializeState(secondRestoreOwnerState);
+                restoredOwnerHarness.open();
+
+                
assertThat(restoredOwnerHarness.getTaskMailbox().size()).isEqualTo(1);
+            }
         }
     }
 

Reply via email to