joeyutong opened a new issue, #824: URL: https://github.com/apache/flink-agents/issues/824
### Search before asking I searched in the issues and found nothing similar. ### Description `currentProcessingKeysOpState` is registered as operator **union list state**. After a rescale or restore, every new subtask restores the union of all previous subtasks' list state. `ActionExecutionOperator#tryResumeProcessActionTasks` already filters restored keys by key ownership before scheduling recovery work. This means a non-owner subtask will not resume processing a key that belongs to another subtask. However, the non-owner subtask still keeps the non-owned key in its local `currentProcessingKeysOpState`. This stale local state can be written back into the next checkpoint: 1. A job runs with parallelism 1 and starts processing key `K`. 2. `K` is added to `currentProcessingKeysOpState`. 3. A checkpoint is taken while `K` is still in flight, so the checkpoint contains `[K]`. 4. The job is restored or rescaled to parallelism 2. 5. Because this is union list state, both new subtasks restore `[K]`. 6. The owner subtask schedules recovery for `K`; the non-owner subtask skips execution because `K` is not in its key-group range. 7. If the non-owner subtask does not prune `K` from `currentProcessingKeysOpState`, it snapshots `[K]` again in the next checkpoint. 8. The owner subtask also snapshots `[K]` while `K` is still in flight. 9. On the next restore, union list state restores `[K, K]`. This violates the invariant that one in-flight key should appear only once in `currentProcessingKeysOpState`. It can lead to duplicate recovery mailbox submissions for the same key. When the key finishes, `removeProcessingKey(K)` removes all matching entries and returns a count greater than 1, causing the operator check to fail: ```text Current processing key count for key K should be 1, but got 2 ``` Expected behavior: after restore, each subtask should keep only the processing keys it owns in its local operator state. Non-owned restored keys should be pruned so they are not checkpointed again by non-owner subtasks. Duplicate restored keys should also not result in duplicate recovery submissions. Actual behavior: non-owned keys are skipped for execution but remain in the non-owner subtask's union list state, so stale keys can be re-published into later checkpoints. ### How to reproduce This can be reproduced with a unit test around `ActionExecutionOperator`: 1. Start an `ActionExecutionOperator` test harness with `maxParallelism = 4`, `parallelism = 1`, and key selector `value -> value`. 2. Process one element with key `K`. 3. Take a snapshot before draining the mailbox, so `K` is still present in `currentProcessingKeysOpState`. 4. Repartition the snapshot to `parallelism = 2`. 5. Restore both the owner and non-owner subtasks. 6. Verify that only the owner subtask schedules one recovery mailbox item, while the non-owner schedules none. 7. Take a second checkpoint from both restored subtasks while the owner still has `K` in flight. 8. Repackage the two subtask snapshots and restore the owner subtask again. Before the fix, the second restore can see duplicate `K` entries from union list state. This causes duplicate recovery mailbox submissions and can later fail the `removedCount == 1` check when `K` finishes. ### Version and environment - Flink Agents: current `main` branch (`0.3-SNAPSHOT`) - Flink version: `2.2.0` - Java: 11 - Reproduced with a local `ActionExecutionOperatorTest` unit test ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
