wilmerdooley opened a new pull request, #28484:
URL: https://github.com/apache/flink/pull/28484
## What is the purpose of the change
When both `table.exec.mini-batch.enabled` and
`execution.checkpointing.unaligned.enabled` are on, ProcessingTimeService timer
callbacks are not fired between checkpoints; they only run during checkpoint
barrier handling. This is a regression from 1.20 introduced by the urgent-mail
system (FLINK-35796): unaligned checkpoint barriers are submitted as urgent
mails, and `TaskMailboxImpl` then skipped non-urgent mails (the timer
callbacks) while an urgent mail was present, starving them until the next
checkpoint.
This change lets non-urgent mails make progress when the batch would
otherwise be empty, without changing urgent-mail prioritization.
## Brief change log
- `TaskMailboxImpl.moveUrgentMailsToBatchIfNeeded`: compute
`shouldOnlyMoveUrgentMails = onlyMoveUrgentMails && !isBatchEmpty` and gate the
non-urgent put-back/break on it, so when the batch is empty a non-urgent mail
is moved into the batch instead of being returned to the queue and skipped.
Urgent mails still take FIFO priority via `batch.addFirst`.
## Verifying this change
This change added a unit test:
`TaskMailboxImplTest.testNonUrgentMailQueuedBehindUrgentMailIsTakenFromBatch`
asserts that a non-urgent mail queued behind an urgent mail is still taken from
the batch (not starved) while the urgent mail is taken first. The test fails on
the pre-fix code, where the non-urgent mail was returned to the queue and
skipped.
## Does this pull request potentially affect one of the following parts
- Dependencies (does it add or upgrade a dependency): no
- Public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes (the task
mailbox take/createBatch path runs once per mailbox-loop iteration; this change
adds two local boolean computations and changes only the empty-batch branch,
with no new lock or allocation, so no measurable per-record overhead is
expected; it does change how often non-urgent mails are drained on the task
thread)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (it interacts with
unaligned-checkpoint barrier delivery and processing-time timer firing between
checkpoints; the change is confined to the mailbox take path and does not touch
JobManager, Kubernetes, Yarn, ZooKeeper, snapshot format, or recovery state)
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
This PR was written with the assistance of generative AI tooling.
- [X] Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code
JIRA: https://issues.apache.org/jira/browse/FLINK-39898
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]