jubins opened a new pull request, #28333:
URL: https://github.com/apache/flink/pull/28333

   ## What is the purpose of the change
   
   Fixes FLINK-39875 — The `processRecord()` method in `ResultStore` catches 
`InterruptedException` but does not restore the thread's interrupt status, 
which violates Java concurrency best practices and could prevent proper 
shutdown or cancellation of the result retrieval thread.
   
   When a thread's `wait()` is interrupted, the interrupt status is cleared. 
According to Java concurrency best practices, when catching 
`InterruptedException`, code should either propagate the exception or restore 
the interrupt status by calling `Thread.currentThread().interrupt()`. Failing 
to do so can cause the `ResultRetrievalThread` to not respond properly to 
shutdown signals sent by the `close()` method.
   
   This PR restores the interrupt status and ensures the thread exits promptly 
when interrupted.
   
   ## Brief change log
   
   - Added `Thread.currentThread().interrupt()` in `processRecord()` catch 
block to restore interrupt status
   - Added early `return` to skip record processing when interrupted  
   - Enhanced `ResultRetrievalThread` loop condition to check 
`!Thread.interrupted()`
   - Added debug logging for interrupt events
   - Created `ResultStoreTest` with 4 comprehensive test cases:
     - `testInterruptHandling()` — Verifies proper interrupt handling during 
execution
     - `testProcessRecordWithFullBuffer()` — Tests buffer management with 
multiple records
     - `testInterruptDuringBufferWait()` — Validates interrupt response when 
waiting for buffer space
     - `testGracefulShutdown()` — Ensures quick and clean shutdown
   
   ## Verifying this change
   
   This change is covered by 4 new unit tests in `ResultStoreTest`:
   
   - **`testInterruptHandling()`** — Verifies that when the ResultStore is 
closed, the retrieval thread properly handles the interrupt and stops running
   - **`testProcessRecordWithFullBuffer()`** — Tests that records are processed 
correctly when the buffer fills up and waits are required
   - **`testInterruptDuringBufferWait()`** — Validates that interrupts are 
handled promptly even when the thread is waiting for buffer space
   - **`testGracefulShutdown()`** — Ensures that closing the ResultStore 
completes quickly (under 1 second), demonstrating proper interrupt handling
   
   All tests use timeout guards (10 seconds) to prevent hanging if interrupt 
handling fails.
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no** — `ResultStore` is an internal class in the SQL 
Gateway
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **no** — only 
affects error/shutdown paths
   - Anything that affects deployment or recovery (JobManager, Checkpointing, 
Kubernetes/Yarn, ZooKeeper): **no** — limited to SQL Gateway result handling
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **no**
   - If yes, how is the feature documented? **not applicable**
   
   ## Was generative AI tooling used to co-author this PR?
   
   - [x] Yes — Claude Code was used as a pair-programming assistant. All code 
was written, understood, and verified by the author.
   
   **Generated-by:** Claude Opus 4.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to