This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new e54317de [runtime] Prune restored processing keys before snapshot
(#825)
e54317de is described below
commit e54317deb5ccc504c4ec0371b4cf4d30ee6182bd
Author: Joey Tong <[email protected]>
AuthorDate: Wed Jun 10 10:16:21 2026 +0800
[runtime] Prune restored processing keys before snapshot (#825)
---
.../runtime/operator/ActionExecutionOperator.java | 8 +++++
.../runtime/operator/OperatorStateManager.java | 5 ++++
.../operator/ActionExecutionOperatorTest.java | 35 ++++++++++++++++++++++
3 files changed, 48 insertions(+)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index caefb955..665551bb 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -55,8 +55,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER;
import static org.apache.flink.util.Preconditions.checkState;
@@ -557,15 +560,20 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
KeyGroupRange currentSubtaskKeyGroupRange =
stateManager.getCurrentSubtaskKeyGroupRange(
maxParallelism, getRuntimeContext());
+ Set<Object> ownedKeys = new LinkedHashSet<>();
for (Object key : keys) {
if (!stateManager.isKeyOwnedByCurrentSubtask(
key, maxParallelism, currentSubtaskKeyGroupRange)) {
continue;
}
+ if (!ownedKeys.add(key)) {
+ continue;
+ }
eventRouter.getKeySegmentQueue().addKeyToLastSegment(key);
mailboxExecutor.submit(
() -> tryProcessActionTaskForKey(key), "process action
task");
}
+ stateManager.replaceProcessingKeys(new ArrayList<>(ownedKeys));
}
stateManager.forEachPendingInputEventKey(
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
index eee3c2dd..19f1f67c 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.List;
import static org.apache.flink.agents.runtime.utils.StateUtil.*;
@@ -265,6 +266,10 @@ class OperatorStateManager {
currentProcessingKeysOpState.add(key);
}
+ void replaceProcessingKeys(List<Object> keys) throws Exception {
+ currentProcessingKeysOpState.update(keys);
+ }
+
int removeProcessingKey(Object key) throws Exception {
return removeFromListState(currentProcessingKeysOpState, key);
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 41cb5109..9ac18e3e 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -232,6 +232,41 @@ public class ActionExecutionOperatorTest {
assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1);
assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero();
+ assertThat(
+ ((ActionExecutionOperator<Long, Object>)
ownerHarness.getOperator())
+ .getOperatorStateManager()
+ .getProcessingKeys())
+ .containsExactly(key);
+ assertThat(
+ ((ActionExecutionOperator<Long, Object>)
nonOwnerHarness.getOperator())
+ .getOperatorStateManager()
+ .getProcessingKeys())
+ .isEmpty();
+
+ OperatorSubtaskState secondCheckpoint =
+ AbstractStreamOperatorTestHarness.repackageState(
+ ownerHarness.snapshot(2L, 2L),
nonOwnerHarness.snapshot(2L, 2L));
+ OperatorSubtaskState secondRestoreOwnerState =
+ AbstractStreamOperatorTestHarness.repartitionOperatorState(
+ secondCheckpoint,
+ maxParallelism,
+ newParallelism,
+ newParallelism,
+ ownerSubtask);
+
+ try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
restoredOwnerHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class),
+ maxParallelism,
+ newParallelism,
+ ownerSubtask)) {
+ restoredOwnerHarness.initializeState(secondRestoreOwnerState);
+ restoredOwnerHarness.open();
+
+
assertThat(restoredOwnerHarness.getTaskMailbox().size()).isEqualTo(1);
+ }
}
}