joeyutong opened a new issue, #824:
URL: https://github.com/apache/flink-agents/issues/824

   ### Search before asking
   
   I searched in the issues and found nothing similar.
   
   ### Description
   
   `currentProcessingKeysOpState` is registered as operator **union list 
state**. After a rescale or restore, every new subtask restores the union of 
all previous subtasks' list state.
   
   `ActionExecutionOperator#tryResumeProcessActionTasks` already filters 
restored keys by key ownership before scheduling recovery work. This means a 
non-owner subtask will not resume processing a key that belongs to another 
subtask. However, the non-owner subtask still keeps the non-owned key in its 
local `currentProcessingKeysOpState`.
   
   This stale local state can be written back into the next checkpoint:
   
   1. A job runs with parallelism 1 and starts processing key `K`.
   2. `K` is added to `currentProcessingKeysOpState`.
   3. A checkpoint is taken while `K` is still in flight, so the checkpoint 
contains `[K]`.
   4. The job is restored or rescaled to parallelism 2.
   5. Because this is union list state, both new subtasks restore `[K]`.
   6. The owner subtask schedules recovery for `K`; the non-owner subtask skips 
execution because `K` is not in its key-group range.
   7. If the non-owner subtask does not prune `K` from 
`currentProcessingKeysOpState`, it snapshots `[K]` again in the next checkpoint.
   8. The owner subtask also snapshots `[K]` while `K` is still in flight.
   9. On the next restore, union list state restores `[K, K]`.
   
   This violates the invariant that one in-flight key should appear only once 
in `currentProcessingKeysOpState`. It can lead to duplicate recovery mailbox 
submissions for the same key. When the key finishes, `removeProcessingKey(K)` 
removes all matching entries and returns a count greater than 1, causing the 
operator check to fail:
   
   ```text
   Current processing key count for key K should be 1, but got 2
   ```
   
   Expected behavior: after restore, each subtask should keep only the 
processing keys it owns in its local operator state. Non-owned restored keys 
should be pruned so they are not checkpointed again by non-owner subtasks. 
Duplicate restored keys should also not result in duplicate recovery 
submissions.
   
   Actual behavior: non-owned keys are skipped for execution but remain in the 
non-owner subtask's union list state, so stale keys can be re-published into 
later checkpoints.
   
   ### How to reproduce
   
   This can be reproduced with a unit test around `ActionExecutionOperator`:
   
   1. Start an `ActionExecutionOperator` test harness with `maxParallelism = 
4`, `parallelism = 1`, and key selector `value -> value`.
   2. Process one element with key `K`.
   3. Take a snapshot before draining the mailbox, so `K` is still present in 
`currentProcessingKeysOpState`.
   4. Repartition the snapshot to `parallelism = 2`.
   5. Restore both the owner and non-owner subtasks.
   6. Verify that only the owner subtask schedules one recovery mailbox item, 
while the non-owner schedules none.
   7. Take a second checkpoint from both restored subtasks while the owner 
still has `K` in flight.
   8. Repackage the two subtask snapshots and restore the owner subtask again.
   
   Before the fix, the second restore can see duplicate `K` entries from union 
list state. This causes duplicate recovery mailbox submissions and can later 
fail the `removedCount == 1` check when `K` finishes.
   
   ### Version and environment
   
   - Flink Agents: current `main` branch (`0.3-SNAPSHOT`)
   - Flink version: `2.2.0`
   - Java: 11
   - Reproduced with a local `ActionExecutionOperatorTest` unit test
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to