[ 
https://issues.apache.org/jira/browse/FLINK-39145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083344#comment-18083344
 ] 

Rui Fan commented on FLINK-39145:
---------------------------------

h2. Summary

{{UnalignedCheckpointITCase}} can fail when a sink task is cancelled during 
recovery before its operator state has finished initialization. In that 
lifecycle, {{VerifyingSinkBase.close()}} currently assumes {{state}} is 
non-null and dereferences it, which can throw {{{}NullPointerException{}}}.
h2. Evidence

In {{{}log/20260525_203013.log{}}}, {{failing-map (1/5)#3}} fails in 
{{{}initializeState{}}}:

{{}}
{code:java}
Failing initializeState @ 6 (3 attempt) 
FailingMapper.initializeState -> StreamTask.restoreStateAndGates -> 
StreamTask.restore {code}
{{}}

This is an intentional test failure path:

{{}}
{code:java}
checkFail(failDuringRecovery, "initializeState"); {code}
{{}}

{{UnalignedCheckpointITCase}} configures that recovery failure for attempt 3:

 

{{state -> state.runNumber == 3}}

The failed mapper triggers a region restart:

 

{{25 tasks will be restarted to recover the failed task ...}}

During that restart, some sink subtasks are cancelled while still in 
{{DEPLOYING}} or {{{}INITIALIZING{}}}.
h2. Bug

Cancelling a task before operator state initialization completes is a valid 
recovery lifecycle. Test sink cleanup must tolerate this state. 
{{VerifyingSinkBase.close()}} should not assume that {{initializeState()}} has 
already assigned {{{}state{}}}.
h2. Fix Direction

Make {{VerifyingSinkBase.close()}} handle {{state == null}} as an 
initialized-no-output case or skip state-derived counters for that attempt, 
while preserving normal counter validation for initialized attempts.

> UnalignedCheckpointITCase.execute fails
> ---------------------------------------
>
>                 Key: FLINK-39145
>                 URL: https://issues.apache.org/jira/browse/FLINK-39145
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>            Reporter: Efrat Levitan
>            Assignee: Efrat Levitan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2026-02-26-14-59-36-837.png
>
>
> CI flakiness due to 
> {code}
> Feb 22 11:50:40       at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> Feb 22 11:50:40       at 
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> Feb 22 11:50:40       at 
> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> Feb 22 11:50:40       at 
> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
> Feb 22 11:50:40       at 
> org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
> Feb 22 11:50:40       at 
> org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Feb 22 11:50:40 Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not acquire the minimum required resources.
> Feb 22 11:50:40       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:226)
> Feb 22 11:50:40       ... 37 more
> Feb 22 11:50:40 Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not acquire the minimum required resources.
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
> Feb 22 11:50:40       at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
> Feb 22 11:50:40       ... 35 more
> Feb 22 11:50:40 Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not acquire the minimum required resources.
> Feb 22 11:50:40 
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=72565&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=9d734c8c-6253-55e6-3bce-47e7cdf68ac4|http://example.com]
> Possibly related to https://issues.apache.org/jira/browse/FLINK-38403 (error 
> is always Could not acquire the minimum required resources but the TM failure 
> is different every time)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to