MartijnVisser opened a new pull request, #28463:
URL: https://github.com/apache/flink/pull/28463
## What is the purpose of the change
`FutureCompletingBlockingQueue` (the split-fetcher to source-reader
handover) has a lost-wakeup bug.
A putter that blocks in `put()` registers its `Condition` in the internal
`notFull` queue, but that
condition is not always removed when the putter stops waiting:
- when the putter is gracefully woken via `wakeUpPuttingThread(int)` (and
returns `false` without
enqueuing), and
- when the putter's `await()` returns spuriously (permitted by the
`Condition` contract) and it
re-blocks, re-adding its condition.
The leftover (stale or duplicated) entry makes a later `signalNextPutter()`,
fired when a consumer
frees a slot, signal a thread that is no longer waiting. A genuinely waiting
putter is then never
woken, so a split fetcher can stall indefinitely.
This supersedes the stale PR #26446, which removed the condition only inside
`wakeUpPuttingThread`.
That covers the graceful-wakeup case but not the duplicate-condition case
(`remove()` deletes only
one occurrence), so the lost wakeup can still occur.
## Brief change log
- `FutureCompletingBlockingQueue#waitOnPut`: remove the putter's condition
from `notFull` once
`await()` returns (in a `finally`), restoring the invariant that `notFull`
only holds conditions of
currently waiting putters. The removal is `O(number of concurrently
blocked putters)` and is off the
per-record path (it runs only when a putter unblocks).
- Added a `@VisibleForTesting` accessor (`getNumberOfQueuedPutters()`) so
the regression test can
deterministically sequence the two contending putters without depending on
`Thread.State`.
- Added
`FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`, a
deterministic
regression test (first commit) that reproduces the stranded-putter stall
and fails on the current
code. The 10s latch wait is the functional assertion; the method-level
`@Timeout(60s)` is a hang
backstop for this deadlock-regression test.
The change is split into two commits: the first adds the failing
reproduction test together with the
small `@VisibleForTesting` accessor it needs to sequence the putters
deterministically; the second
applies the fix. (The accessor is grouped with the test it enables rather
than with the behavioural
change.)
## Verifying this change
This change added tests and can be verified as follows:
- `FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`
blocks two putters, wakes
the first gracefully, then frees one slot and asserts the second
(still-waiting) putter is
signalled. It is split into a reproduction commit and a fix commit for
red-green verification: it
fails on the code before the fix (the still-waiting putter is stranded,
the latch times out) and
passes after the fix.
- Existing tests in the module remain green
(`FutureCompletingBlockingQueueTest`, `SplitFetcherTest`,
`SplitFetcherManagerTest`, `SourceReaderBaseTest`).
- Additionally validated out-of-tree with the Fray controlled-concurrency
tester (not added as a
dependency): on the unfixed code Fray deterministically reproduces the
deadlock; with PR #26446's
one-line change it still deadlocks (duplicate-condition case); with this
fix it ran 5000 explored
interleavings with no deadlock.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no (`FutureCompletingBlockingQueue` is `@Internal`)
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes (Claude Code, Claude Opus 4.8)
Generated-by: Claude Code (Claude Opus 4.8)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]