[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742285#comment-17742285 ]
Rui Fan edited comment on FLINK-32049 at 7/12/23 6:42 AM: ---------------------------------------------------------- Thanks report this JIRA, and thanks [~Weijie Guo] 's analysis, there is indeed a thread safety bug here. There are three cases that update registering from true to false: # Registered maxSubtasksPerChannelStateFile subtasks to the executor # checkpoint started # A subtask is released Offline discussed with [~Weijie Guo] , the case1 is thread safe, the case2 and case3 are thread-unsafe. h1. The reason about this bug: When one subtask is closing, it will call the ChannelStateWriteRequestExecutorImpl#releaseSubtask. The [releaseSubtask method|https://github.com/apache/flink/blob/2dfff436c09821fb658bf8d289206b9ef85bb25b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java#L390] will hold 2 locks at the different time. * The first one is ChannelStateWriteRequestExecutorImpl#lock, and update the isRegistering from true to false. * The second one is onRegistered.accept(this) will hold ChannelStateWriteRequestExecutorFactory#lock and update the ChannelStateWriteRequestExecutorFactory#executor However, these 2 locks are held at the different time. The following calls will meet a bug: * Subtask 1 finished the first step of releaseSubtask, and doesn't hold the second lock. * Subtask 2 is starting, and call the executor#registerSubtask Subtask2 will throw *_This executor has been registered_*. This bug can be reproduced by the unit test. I have added a unit test with multiple threads to trigger this bug. After the [PR|https://github.com/apache/flink/pull/22983], the test is fine. h1. Question Case2 and case3 are thread-unsafe and bug, however, I didn't see they happen from CI log. I uploaded the log with this exception : `java.lang.IllegalStateException: This executor has been registered.` The detailed log in mvn-3.log. was (Author: fanrui): Thanks report this JIRA, and thanks [~Weijie Guo] 's analysis, there is indeed a thread safety bug here. There are three cases that update registering from true to false: # Registered maxSubtasksPerChannelStateFile subtasks to the executor # checkpoint started # A subtask is released Offline discussed with [~Weijie Guo] , the case1 is thread safe, the case2 and case3 are thread-unsafe. h1. The reason about this bug: When one subtask is closing, it will call the ChannelStateWriteRequestExecutorImpl#releaseSubtask. The [releaseSubtask method|https://github.com/apache/flink/blob/2dfff436c09821fb658bf8d289206b9ef85bb25b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java#L390] will hold 2 locks at the different time. * The first one is ChannelStateWriteRequestExecutorImpl#lock, and update the isRegistering from true to false. * The second one is onRegistered.accept(this) will hold ChannelStateWriteRequestExecutorFactory#lock and update the ChannelStateWriteRequestExecutorFactory#executor However, these 2 locks are held at the different time. The following calls will meet a bug: * Subtask 1 finished the first step of releaseSubtask, and doesn't hold the second lock. * Subtask 2 is starting, and call the executor#registerSubtask Subtask2 will throw *_This executor has been registered_*. This bug can be reproduced by the unit test. I have added a unit test with multiple threads to trigger this bug. After the [PR|https://github.com/apache/flink/pull/22983], the test is fine. h1. Question Case2 and case3 are thread-unsafe and bug, however, I didn't see they happen from CI log. > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > ----------------------------------------------------------- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.18.0, 1.17.1 > Reporter: Sergey Nuyanzin > Assignee: Qingsheng Ren > Priority: Critical > Labels: pull-request-available, test-stability > Attachments: logs-cron_azure-test_cron_azure_connect_2-1686196685.zip > > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772&view=logs&j=fc7981dc-d266-55b0-5fff-f0d0a2294e36&t=1a9b228a-3e0e-598f-fc81-c321539dfdbf&l=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)