[ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742285#comment-17742285
 ] 

Rui Fan edited comment on FLINK-32049 at 7/12/23 6:42 AM:
----------------------------------------------------------

Thanks report this JIRA, and thanks [~Weijie Guo] 's analysis, there is indeed 
a thread safety bug here.

There are three cases that update registering from true to false:
 # Registered maxSubtasksPerChannelStateFile subtasks to the executor
 # checkpoint started
 # A subtask is released

Offline discussed with [~Weijie Guo] , the case1 is thread safe, the case2 and 
case3 are thread-unsafe.

h1. The reason about this bug:

When one subtask is closing, it will call the 
ChannelStateWriteRequestExecutorImpl#releaseSubtask. The [releaseSubtask 
method|https://github.com/apache/flink/blob/2dfff436c09821fb658bf8d289206b9ef85bb25b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java#L390]
 will hold 2 locks at the different time.

* The first one is ChannelStateWriteRequestExecutorImpl#lock, and update the 
isRegistering from true to false.
* The second one is onRegistered.accept(this) will hold 
ChannelStateWriteRequestExecutorFactory#lock and update the 
ChannelStateWriteRequestExecutorFactory#executor

However, these 2 locks are held at the different time. The following calls will 
meet a bug:

* Subtask 1 finished the first step of releaseSubtask, and doesn't hold the 
second lock.
* Subtask 2 is starting, and call the executor#registerSubtask

Subtask2 will throw *_This executor has been registered_*.

This bug can be reproduced by the unit test. I have added a unit test with 
multiple threads to trigger this bug. After the 
[PR|https://github.com/apache/flink/pull/22983], the test is fine.

h1. Question

Case2 and case3 are thread-unsafe and bug, however, I didn't see they happen 
from CI log. I uploaded the log with this exception : 
`java.lang.IllegalStateException: This executor has been registered.` The 
detailed log in mvn-3.log.
 


was (Author: fanrui):
Thanks report this JIRA, and thanks [~Weijie Guo] 's analysis, there is indeed 
a thread safety bug here.

There are three cases that update registering from true to false:
 # Registered maxSubtasksPerChannelStateFile subtasks to the executor
 # checkpoint started
 # A subtask is released

Offline discussed with [~Weijie Guo] , the case1 is thread safe, the case2 and 
case3 are thread-unsafe.

h1. The reason about this bug:

When one subtask is closing, it will call the 
ChannelStateWriteRequestExecutorImpl#releaseSubtask. The [releaseSubtask 
method|https://github.com/apache/flink/blob/2dfff436c09821fb658bf8d289206b9ef85bb25b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java#L390]
 will hold 2 locks at the different time.

* The first one is ChannelStateWriteRequestExecutorImpl#lock, and update the 
isRegistering from true to false.
* The second one is onRegistered.accept(this) will hold 
ChannelStateWriteRequestExecutorFactory#lock and update the 
ChannelStateWriteRequestExecutorFactory#executor

However, these 2 locks are held at the different time. The following calls will 
meet a bug:

* Subtask 1 finished the first step of releaseSubtask, and doesn't hold the 
second lock.
* Subtask 2 is starting, and call the executor#registerSubtask

Subtask2 will throw *_This executor has been registered_*.

This bug can be reproduced by the unit test. I have added a unit test with 
multiple threads to trigger this bug. After the 
[PR|https://github.com/apache/flink/pull/22983], the test is fine.

h1. Question

Case2 and case3 are thread-unsafe and bug, however, I didn't see they happen 
from CI log.
 

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> -----------------------------------------------------------
>
>                 Key: FLINK-32049
>                 URL: https://issues.apache.org/jira/browse/FLINK-32049
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.18.0, 1.17.1
>            Reporter: Sergey Nuyanzin
>            Assignee: Qingsheng Ren
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>         Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip
>
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14       at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14       ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14     org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14       ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14     java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14       at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14       at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14       ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14       at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14       ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to