abhijeet2096 opened a new pull request, #28474: URL: https://github.com/apache/flink/pull/28474
## What is the purpose of the change Fixes [FLINK-39330](https://issues.apache.org/jira/browse/FLINK-39330) — `FileSourceTextLinesITCase.testBoundedTextFileSourceWithJobManagerFailover` is flaky. The flakiness is a latent **`NullPointerException` in `CollectSinkFunction` teardown** that is triggered by the JobManager-failover timing the test exercises. When the collect-sink operator is torn down **before** `initializeState()`/`open()` ran, `CollectSinkOperator#close()` calls `CollectSinkFunction#accumulateFinalResults()` while `bufferLock` (and `serverThread`) are still `null`, throwing an NPE. The NPE surfaces in the task's exception-handler path and is escalated to a **fatal error that shuts down the TaskManager**. In the single-TaskManager `MiniCluster` used by the test, the recovered job can then never reacquire slots, starves for the full `slot.request.timeout` (5 min), and finally fails with `NoResourceAvailableException`; with `maxNumberRestartAttempts=1` recovery is suppressed and the job dies, surfacing to the test as `RuntimeException: Failed to fetch next result`. `bufferLock` is only assigned in `initBuffer()` (reached from `initializeState()`/`open()`), so it is `null` whenever the operator is closed before it was ever opened — exactly what happens on the failover teardown race. ### Root-cause / failure chain (from a reproduced failing iteration, DEBUG logging) 1. Test revokes JM leadership (`triggerJobManagerFailover`); the TaskExecutor closes the job's tasks. 2. `CollectSinkOperator.close()` → `CollectSinkFunction.accumulateFinalResults()` throws: ``` FATAL - exception in exception handler of task Sink: Data stream collect sink (1/1)#0 java.lang.NullPointerException: Cannot invoke "java.util.concurrent.locks.ReentrantLock.lock()" because "this.bufferLock" is null at CollectSinkFunction.accumulateFinalResults(CollectSinkFunction.java:308) at CollectSinkOperator.close(CollectSinkOperator.java:48) ``` 3. The fatal error shuts the TaskManager down (`Stopping TaskExecutor …`, then `Ignoring the freeing of slot … because the TaskExecutor is shutting down`). 4. The `MiniCluster` has `setNumberTaskManagers(1)`, so its only TM is gone → recovered JM logs `Matching resource requirements … Current resources: (none)`. 5. After exactly the `slot.request.timeout` (300000 ms in the logs), the slot pool fails the requests → `NoResourceAvailableException` → `Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1)`. ## Brief change log - `CollectSinkFunction#accumulateFinalResults()` returns early when `bufferLock == null` (function never opened → nothing buffered to accumulate). - `CollectSinkFunction#close()` guards `serverThread != null` (no server thread to stop if never opened). - Added `CollectSinkFunctionTest#testCloseBeforeOpenDoesNotThrow` regression test. ## Verifying this change This change added tests and can be verified as follows: - Added unit test `CollectSinkFunctionTest#testCloseBeforeOpenDoesNotThrow`, which constructs a `CollectSinkFunction` that is never opened and asserts that `accumulateFinalResults()` and `close()` do not throw (`Tests run: 1, Failures: 0`). - Manually reproduced the flake by running `testBoundedTextFileSourceWithJobManagerFailover` with IntelliJ's **Repeat → Until Failure** (failed by ~iteration 7 without the fix). With the fix the TaskManager is no longer killed during the JobManager failover and the loop stays green. Debugging steps used to localize the root cause: 1. Reproduced locally via Repeat → Until Failure; observed the surface error `Failed to fetch next result` at `CollectResultIterator.hasNext`. 2. Followed the cause-chain to `Recovery is suppressed … NoResourceAvailableException`, which indicated the job died for lack of slots rather than a data error. 3. Enabled DEBUG logging on the scheduler / slot-management / TaskExecutor classes (see below) and re-ran until failure to capture the full ordering. 4. The decisive evidence was the `FATAL … bufferLock is null` NPE that killed the TaskManager **before** the resource starvation, i.e. the starvation was a downstream symptom. 5. Traced `bufferLock` initialization to `initBuffer()` (only reached via `initializeState`/`open`), confirming the unguarded close-before-open path. log4j entries used during investigation (`flink-connector-files/src/test/resources/log4j2-test.properties`, investigation-only — not part of this PR): ```properties rootLogger.level = OFF logger.failover.name = org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler logger.failover.level = DEBUG logger.scheduler.name = org.apache.flink.runtime.scheduler.DefaultScheduler logger.scheduler.level = DEBUG logger.execgraph.name = org.apache.flink.runtime.executiongraph.DefaultExecutionGraph logger.execgraph.level = DEBUG logger.slotmanager.name = org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager logger.slotmanager.level = DEBUG logger.slotpool.name = org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge logger.slotpool.level = DEBUG logger.jobmaster.name = org.apache.flink.runtime.jobmaster.JobMaster logger.jobmaster.level = DEBUG logger.taskexecutor.name = org.apache.flink.runtime.taskexecutor.TaskExecutor logger.taskexecutor.level = DEBUG logger.jobleader.name = org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService logger.jobleader.level = DEBUG ``` Full DEBUG logs of passing vs. failing iterations (the `FATAL … bufferLock is null` stack and the `Current resources: (none)` → `NoResourceAvailableException` window) — screenshots attached below. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no** (only hardens collect-sink teardown against an uninitialized-close NPE) - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** --- ##### Was generative AI tooling used to co-author this PR? - [X] Yes (Claude Code, Claude Opus 4.8) Generated-by: Claude Code (Claude Opus 4.8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
