abhijeet2096 opened a new pull request, #28474:
URL: https://github.com/apache/flink/pull/28474

   ## What is the purpose of the change
   
   Fixes [FLINK-39330](https://issues.apache.org/jira/browse/FLINK-39330) — 
`FileSourceTextLinesITCase.testBoundedTextFileSourceWithJobManagerFailover` is 
flaky.
   
   The flakiness is a latent **`NullPointerException` in `CollectSinkFunction` 
teardown** that is triggered by the JobManager-failover timing the test 
exercises. When the collect-sink operator is torn down **before** 
`initializeState()`/`open()` ran, `CollectSinkOperator#close()` calls 
`CollectSinkFunction#accumulateFinalResults()` while `bufferLock` (and 
`serverThread`) are still `null`, throwing an NPE. The NPE surfaces in the 
task's exception-handler path and is escalated to a **fatal error that shuts 
down the TaskManager**. In the single-TaskManager `MiniCluster` used by the 
test, the recovered job can then never reacquire slots, starves for the full 
`slot.request.timeout` (5 min), and finally fails with 
`NoResourceAvailableException`; with `maxNumberRestartAttempts=1` recovery is 
suppressed and the job dies, surfacing to the test as `RuntimeException: Failed 
to fetch next result`.
   
   `bufferLock` is only assigned in `initBuffer()` (reached from 
`initializeState()`/`open()`), so it is `null` whenever the operator is closed 
before it was ever opened — exactly what happens on the failover teardown race.
   
   ### Root-cause / failure chain (from a reproduced failing iteration, DEBUG 
logging)
   
   1. Test revokes JM leadership (`triggerJobManagerFailover`); the 
TaskExecutor closes the job's tasks.
   2. `CollectSinkOperator.close()` → 
`CollectSinkFunction.accumulateFinalResults()` throws:
      ```
      FATAL - exception in exception handler of task Sink: Data stream collect 
sink (1/1)#0
      java.lang.NullPointerException: Cannot invoke 
"java.util.concurrent.locks.ReentrantLock.lock()" because "this.bufferLock" is 
null
          at 
CollectSinkFunction.accumulateFinalResults(CollectSinkFunction.java:308)
          at CollectSinkOperator.close(CollectSinkOperator.java:48)
      ```
   3. The fatal error shuts the TaskManager down (`Stopping TaskExecutor …`, 
then `Ignoring the freeing of slot … because the TaskExecutor is shutting 
down`).
   4. The `MiniCluster` has `setNumberTaskManagers(1)`, so its only TM is gone 
→ recovered JM logs `Matching resource requirements … Current resources: 
(none)`.
   5. After exactly the `slot.request.timeout` (300000 ms in the logs), the 
slot pool fails the requests → `NoResourceAvailableException` → `Recovery is 
suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1)`.
   
   ## Brief change log
   
     - `CollectSinkFunction#accumulateFinalResults()` returns early when 
`bufferLock == null` (function never opened → nothing buffered to accumulate).
     - `CollectSinkFunction#close()` guards `serverThread != null` (no server 
thread to stop if never opened).
     - Added `CollectSinkFunctionTest#testCloseBeforeOpenDoesNotThrow` 
regression test.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added unit test 
`CollectSinkFunctionTest#testCloseBeforeOpenDoesNotThrow`, which constructs a 
`CollectSinkFunction` that is never opened and asserts that 
`accumulateFinalResults()` and `close()` do not throw (`Tests run: 1, Failures: 
0`).
     - Manually reproduced the flake by running 
`testBoundedTextFileSourceWithJobManagerFailover` with IntelliJ's **Repeat → 
Until Failure** (failed by ~iteration 7 without the fix). With the fix the 
TaskManager is no longer killed during the JobManager failover and the loop 
stays green.
   
   Debugging steps used to localize the root cause:
   
     1. Reproduced locally via Repeat → Until Failure; observed the surface 
error `Failed to fetch next result` at `CollectResultIterator.hasNext`.
     2. Followed the cause-chain to `Recovery is suppressed … 
NoResourceAvailableException`, which indicated the job died for lack of slots 
rather than a data error.
     3. Enabled DEBUG logging on the scheduler / slot-management / TaskExecutor 
classes (see below) and re-ran until failure to capture the full ordering.
     4. The decisive evidence was the `FATAL … bufferLock is null` NPE that 
killed the TaskManager **before** the resource starvation, i.e. the starvation 
was a downstream symptom.
     5. Traced `bufferLock` initialization to `initBuffer()` (only reached via 
`initializeState`/`open`), confirming the unguarded close-before-open path.
   
   log4j entries used during investigation 
(`flink-connector-files/src/test/resources/log4j2-test.properties`, 
investigation-only — not part of this PR):
   ```properties
   rootLogger.level = OFF
   logger.failover.name = 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler
   logger.failover.level = DEBUG
   logger.scheduler.name = org.apache.flink.runtime.scheduler.DefaultScheduler
   logger.scheduler.level = DEBUG
   logger.execgraph.name = 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph
   logger.execgraph.level = DEBUG
   logger.slotmanager.name = 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager
   logger.slotmanager.level = DEBUG
   logger.slotpool.name = 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge
   logger.slotpool.level = DEBUG
   logger.jobmaster.name = org.apache.flink.runtime.jobmaster.JobMaster
   logger.jobmaster.level = DEBUG
   logger.taskexecutor.name = org.apache.flink.runtime.taskexecutor.TaskExecutor
   logger.taskexecutor.level = DEBUG
   logger.jobleader.name = 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService
   logger.jobleader.level = DEBUG
   ```
   
   Full DEBUG logs of passing vs. failing iterations (the `FATAL … bufferLock 
is null` stack and the `Current resources: (none)` → 
`NoResourceAvailableException` window) — screenshots attached below.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no** (only hardens 
collect-sink teardown against an uninitialized-close NPE)
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes (Claude Code, Claude Opus 4.8)
   
   Generated-by: Claude Code (Claude Opus 4.8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to