MartijnVisser opened a new pull request, #28471:
URL: https://github.com/apache/flink/pull/28471
## What is the purpose of the change
Backport of #28463 to `release-1.20`.
`FutureCompletingBlockingQueue` (the split-fetcher to source-reader
handover) has a lost-wakeup bug. A putter that blocks in `put()` registers its
`Condition` in the internal `notFull` queue, but that condition is not always
removed when the putter stops waiting:
- when the putter is gracefully woken via `wakeUpPuttingThread(int)` (and
returns `false` without enqueuing), and
- when the putter's `await()` returns spuriously (permitted by the
`Condition` contract) and it re-blocks, re-adding its condition.
The leftover (stale or duplicated) entry makes a later `signalNextPutter()`,
fired when a consumer frees a slot, signal a thread that is no longer waiting.
A genuinely waiting putter is then never woken, so a split fetcher can stall
indefinitely.
## Brief change log
- `FutureCompletingBlockingQueue#waitOnPut`: remove the putter's condition
from `notFull` once `await()` returns (in a `finally`), restoring the invariant
that `notFull` only holds conditions of currently waiting putters.
- Added a `@VisibleForTesting` accessor (`getNumberOfQueuedPutters()`) so
the regression test can deterministically sequence the two contending putters.
Like the existing `@VisibleForTesting take()` in the same class, this trips the
blanket connector "depend only on public API" ArchUnit rule, so it is recorded
in the `flink-architecture-tests-production` violation store alongside the
existing entry.
- Added
`FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`, a
deterministic regression test that reproduces the stranded-putter stall and
fails on the unfixed code.
## Verifying this change
This change added tests and can be verified as follows:
- `FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`
blocks two putters, wakes the first gracefully, then frees one slot and asserts
the second (still-waiting) putter is signalled. It fails on the code before the
fix (the still-waiting putter is stranded, the latch times out) and passes
after the fix.
- Existing tests in the module remain green.
- **1.20 adaptation:** the new regression test was ported from JUnit 5 to
JUnit 4 (`@Test(timeout = 60_000)` and a `public` method/class), matching the
rest of `FutureCompletingBlockingQueueTest` on this branch. The test body is
otherwise identical to #28463.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no (`FutureCompletingBlockingQueue` is `@Internal`)
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes (Claude Code, Claude Opus 4.8)
Generated-by: Claude Code (Claude Opus 4.8)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]