This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 47fd0984 [runtime] Prune restored processing keys before snapshot
(#826)
47fd0984 is described below
commit 47fd0984fc437bdfb95fc758f59117a190ea6e49
Author: Joey Tong <[email protected]>
AuthorDate: Wed Jun 10 11:01:55 2026 +0800
[runtime] Prune restored processing keys before snapshot (#826)
---
.../runtime/operator/ActionExecutionOperator.java | 7 ++++++
.../operator/ActionExecutionOperatorTest.java | 25 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 91e419bb..45aacc54 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -96,9 +96,11 @@ import pemja.core.PythonInterpreter;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
@@ -935,14 +937,19 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
int maxParallelism =
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks();
KeyGroupRange currentSubtaskKeyGroupRange =
getCurrentSubtaskKeyGroupRange(maxParallelism);
+ Set<Object> ownedKeys = new LinkedHashSet<>();
for (Object key : keys) {
if (!isKeyOwnedByCurrentSubtask(key, maxParallelism,
currentSubtaskKeyGroupRange)) {
continue;
}
+ if (!ownedKeys.add(key)) {
+ continue;
+ }
keySegmentQueue.addKeyToLastSegment(key);
mailboxExecutor.submit(
() -> tryProcessActionTaskForKey(key), "process action
task");
}
+ currentProcessingKeysOpState.update(new ArrayList<>(ownedKeys));
}
getKeyedStateBackend()
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 729f106e..6cf87217 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -217,6 +217,31 @@ public class ActionExecutionOperatorTest {
assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1);
assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero();
+
+ OperatorSubtaskState secondCheckpoint =
+ AbstractStreamOperatorTestHarness.repackageState(
+ ownerHarness.snapshot(2L, 2L),
nonOwnerHarness.snapshot(2L, 2L));
+ OperatorSubtaskState secondRestoreOwnerState =
+ AbstractStreamOperatorTestHarness.repartitionOperatorState(
+ secondCheckpoint,
+ maxParallelism,
+ newParallelism,
+ newParallelism,
+ ownerSubtask);
+
+ try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
restoredOwnerHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class),
+ maxParallelism,
+ newParallelism,
+ ownerSubtask)) {
+ restoredOwnerHarness.initializeState(secondRestoreOwnerState);
+ restoredOwnerHarness.open();
+
+
assertThat(restoredOwnerHarness.getTaskMailbox().size()).isEqualTo(1);
+ }
}
}