MartijnVisser opened a new pull request, #28468:
URL: https://github.com/apache/flink/pull/28468

   ## What is the purpose of the change
   
   Backport of #28463 to `release-2.2`.
   
   `FutureCompletingBlockingQueue` (the split-fetcher to source-reader 
handover) has a lost-wakeup bug. A putter that blocks in `put()` registers its 
`Condition` in the internal `notFull` queue, but that condition is not always 
removed when the putter stops waiting:
   
   - when the putter is gracefully woken via `wakeUpPuttingThread(int)` (and 
returns `false` without enqueuing), and
   - when the putter's `await()` returns spuriously (permitted by the 
`Condition` contract) and it re-blocks, re-adding its condition.
   
   The leftover (stale or duplicated) entry makes a later `signalNextPutter()`, 
fired when a consumer frees a slot, signal a thread that is no longer waiting. 
A genuinely waiting putter is then never woken, so a split fetcher can stall 
indefinitely.
   
   ## Brief change log
   
   - `FutureCompletingBlockingQueue#waitOnPut`: remove the putter's condition 
from `notFull` once `await()` returns (in a `finally`), restoring the invariant 
that `notFull` only holds conditions of currently waiting putters.
   - Added a `@VisibleForTesting` accessor (`getNumberOfQueuedPutters()`) so 
the regression test can deterministically sequence the two contending putters. 
Like the existing `@VisibleForTesting take()` in the same class, this trips the 
blanket connector "depend only on public API" ArchUnit rule, so it is recorded 
in the `flink-architecture-tests-production` violation store alongside the 
existing entry.
   - Added 
`FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`, a 
deterministic regression test that reproduces the stranded-putter stall and 
fails on the unfixed code.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter` 
blocks two putters, wakes the first gracefully, then frees one slot and asserts 
the second (still-waiting) putter is signalled. It fails on the code before the 
fix (the still-waiting putter is stranded, the latch times out) and passes 
after the fix.
   - Existing tests in the module remain green.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no (`FutureCompletingBlockingQueue` is `@Internal`)
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes (Claude Code, Claude Opus 4.8)
   
   Generated-by: Claude Code (Claude Opus 4.8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to