This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 47fd0984 [runtime] Prune restored processing keys before snapshot 
(#826)
47fd0984 is described below

commit 47fd0984fc437bdfb95fc758f59117a190ea6e49
Author: Joey Tong <[email protected]>
AuthorDate: Wed Jun 10 11:01:55 2026 +0800

    [runtime] Prune restored processing keys before snapshot (#826)
---
 .../runtime/operator/ActionExecutionOperator.java  |  7 ++++++
 .../operator/ActionExecutionOperatorTest.java      | 25 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 91e419bb..45aacc54 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -96,9 +96,11 @@ import pemja.core.PythonInterpreter;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND;
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
@@ -935,14 +937,19 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             int maxParallelism = 
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks();
             KeyGroupRange currentSubtaskKeyGroupRange =
                     getCurrentSubtaskKeyGroupRange(maxParallelism);
+            Set<Object> ownedKeys = new LinkedHashSet<>();
             for (Object key : keys) {
                 if (!isKeyOwnedByCurrentSubtask(key, maxParallelism, 
currentSubtaskKeyGroupRange)) {
                     continue;
                 }
+                if (!ownedKeys.add(key)) {
+                    continue;
+                }
                 keySegmentQueue.addKeyToLastSegment(key);
                 mailboxExecutor.submit(
                         () -> tryProcessActionTaskForKey(key), "process action 
task");
             }
+            currentProcessingKeysOpState.update(new ArrayList<>(ownedKeys));
         }
 
         getKeyedStateBackend()
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 729f106e..6cf87217 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -217,6 +217,31 @@ public class ActionExecutionOperatorTest {
 
             assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1);
             assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero();
+
+            OperatorSubtaskState secondCheckpoint =
+                    AbstractStreamOperatorTestHarness.repackageState(
+                            ownerHarness.snapshot(2L, 2L), 
nonOwnerHarness.snapshot(2L, 2L));
+            OperatorSubtaskState secondRestoreOwnerState =
+                    AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                            secondCheckpoint,
+                            maxParallelism,
+                            newParallelism,
+                            newParallelism,
+                            ownerSubtask);
+
+            try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
restoredOwnerHarness =
+                    new KeyedOneInputStreamOperatorTestHarness<>(
+                            new 
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+                            (KeySelector<Long, Long>) value -> value,
+                            TypeInformation.of(Long.class),
+                            maxParallelism,
+                            newParallelism,
+                            ownerSubtask)) {
+                restoredOwnerHarness.initializeState(secondRestoreOwnerState);
+                restoredOwnerHarness.open();
+
+                
assertThat(restoredOwnerHarness.getTaskMailbox().size()).isEqualTo(1);
+            }
         }
     }
 

Reply via email to