wilmerdooley opened a new pull request, #28484:
URL: https://github.com/apache/flink/pull/28484

   ## What is the purpose of the change
   
   When both `table.exec.mini-batch.enabled` and 
`execution.checkpointing.unaligned.enabled` are on, ProcessingTimeService timer 
callbacks are not fired between checkpoints; they only run during checkpoint 
barrier handling. This is a regression from 1.20 introduced by the urgent-mail 
system (FLINK-35796): unaligned checkpoint barriers are submitted as urgent 
mails, and `TaskMailboxImpl` then skipped non-urgent mails (the timer 
callbacks) while an urgent mail was present, starving them until the next 
checkpoint.
   
   This change lets non-urgent mails make progress when the batch would 
otherwise be empty, without changing urgent-mail prioritization.
   
   ## Brief change log
   
     - `TaskMailboxImpl.moveUrgentMailsToBatchIfNeeded`: compute 
`shouldOnlyMoveUrgentMails = onlyMoveUrgentMails && !isBatchEmpty` and gate the 
non-urgent put-back/break on it, so when the batch is empty a non-urgent mail 
is moved into the batch instead of being returned to the queue and skipped. 
Urgent mails still take FIFO priority via `batch.addFirst`.
   
   ## Verifying this change
   
   This change added a unit test: 
`TaskMailboxImplTest.testNonUrgentMailQueuedBehindUrgentMailIsTakenFromBatch` 
asserts that a non-urgent mail queued behind an urgent mail is still taken from 
the batch (not starved) while the urgent mail is taken first. The test fails on 
the pre-fix code, where the non-urgent mail was returned to the queue and 
skipped.
   
   ## Does this pull request potentially affect one of the following parts
   
     - Dependencies (does it add or upgrade a dependency): no
     - Public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes (the task 
mailbox take/createBatch path runs once per mailbox-loop iteration; this change 
adds two local boolean computations and changes only the empty-batch branch, 
with no new lock or allocation, so no measurable per-record overhead is 
expected; it does change how often non-urgent mails are drained on the task 
thread)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (it interacts with 
unaligned-checkpoint barrier delivery and processing-time timer firing between 
checkpoints; the change is confined to the mailbox take path and does not touch 
JobManager, Kubernetes, Yarn, ZooKeeper, snapshot format, or recovery state)
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   
   This PR was written with the assistance of generative AI tooling.
   - [X] Was generative AI tooling used to co-author this PR?
   
   Generated-by: Claude Code
   
   JIRA: https://issues.apache.org/jira/browse/FLINK-39898
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to