MartijnVisser opened a new pull request, #28463:
URL: https://github.com/apache/flink/pull/28463

   ## What is the purpose of the change
   
   `FutureCompletingBlockingQueue` (the split-fetcher to source-reader 
handover) has a lost-wakeup bug.
   A putter that blocks in `put()` registers its `Condition` in the internal 
`notFull` queue, but that
   condition is not always removed when the putter stops waiting:
   
   - when the putter is gracefully woken via `wakeUpPuttingThread(int)` (and 
returns `false` without
     enqueuing), and
   - when the putter's `await()` returns spuriously (permitted by the 
`Condition` contract) and it
     re-blocks, re-adding its condition.
   
   The leftover (stale or duplicated) entry makes a later `signalNextPutter()`, 
fired when a consumer
   frees a slot, signal a thread that is no longer waiting. A genuinely waiting 
putter is then never
   woken, so a split fetcher can stall indefinitely.
   
   This supersedes the stale PR #26446, which removed the condition only inside 
`wakeUpPuttingThread`.
   That covers the graceful-wakeup case but not the duplicate-condition case 
(`remove()` deletes only
   one occurrence), so the lost wakeup can still occur.
   
   ## Brief change log
   
   - `FutureCompletingBlockingQueue#waitOnPut`: remove the putter's condition 
from `notFull` once
     `await()` returns (in a `finally`), restoring the invariant that `notFull` 
only holds conditions of
     currently waiting putters. The removal is `O(number of concurrently 
blocked putters)` and is off the
     per-record path (it runs only when a putter unblocks).
   - Added a `@VisibleForTesting` accessor (`getNumberOfQueuedPutters()`) so 
the regression test can
     deterministically sequence the two contending putters without depending on 
`Thread.State`.
   - Added 
`FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`, a 
deterministic
     regression test (first commit) that reproduces the stranded-putter stall 
and fails on the current
     code. The 10s latch wait is the functional assertion; the method-level 
`@Timeout(60s)` is a hang
     backstop for this deadlock-regression test.
   
   The change is split into two commits: the first adds the failing 
reproduction test together with the
   small `@VisibleForTesting` accessor it needs to sequence the putters 
deterministically; the second
   applies the fix. (The accessor is grouped with the test it enables rather 
than with the behavioural
   change.)
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter` 
blocks two putters, wakes
     the first gracefully, then frees one slot and asserts the second 
(still-waiting) putter is
     signalled. It is split into a reproduction commit and a fix commit for 
red-green verification: it
     fails on the code before the fix (the still-waiting putter is stranded, 
the latch times out) and
     passes after the fix.
   - Existing tests in the module remain green 
(`FutureCompletingBlockingQueueTest`, `SplitFetcherTest`,
     `SplitFetcherManagerTest`, `SourceReaderBaseTest`).
   - Additionally validated out-of-tree with the Fray controlled-concurrency 
tester (not added as a
     dependency): on the unfixed code Fray deterministically reproduces the 
deadlock; with PR #26446's
     one-line change it still deadlocks (duplicate-condition case); with this 
fix it ran 5000 explored
     interleavings with no deadlock.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no (`FutureCompletingBlockingQueue` is `@Internal`)
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes (Claude Code, Claude Opus 4.8)
   
   Generated-by: Claude Code (Claude Opus 4.8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to