[ 
https://issues.apache.org/jira/browse/SPARK-48758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-48758:
-----------------------------------
    Labels: pull-request-available  (was: )

> Race condition between executor registration and heartbeat
> ----------------------------------------------------------
>
>                 Key: SPARK-48758
>                 URL: https://issues.apache.org/jira/browse/SPARK-48758
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.3
>            Reporter: MJ Deng
>            Priority: Major
>              Labels: pull-request-available
>
> We found a race condition in our prod jobs when executor finished 
> registration but when the it starts to heartbeat, the driver tells the 
> executor is still unknown.
> The below test case will demonstrate the issue:
> test("test the executor registered states are consistent between scheduler 
> and heartbeater") {
>     val rpcEnv = sc.env.rpcEnv
>     val fakeClusterManager = new FakeClusterManager(rpcEnv)
>     val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", 
> fakeClusterManager)
>     val listenerBus = new LiveListenerBus(sc.conf)
>     val mockMetricsSystem = mock(classOf[MetricsSystem])
>     listenerBus.start(sc, mockMetricsSystem)
>     when(sc.listenerBus).thenReturn(listenerBus)
>     when(sc.heartbeatReceiver).thenReturn(heartbeatReceiverRef)
>     val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, 
> fakeClusterManagerRef)
>     when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
>  
>     heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
>  
>     // simulate the slow event listener in the listener queue before the 
> heartbeatReceiver listener
>     sc.listenerBus.addToManagementQueue(new SparkListener() {
>       override def onExecutorAdded(executorAdded: 
> SparkListenerExecutorAdded): Unit = {
>         if (executorAdded.executorId != "driver") {
>           Thread.sleep(2000)
>         }
>       }
>     })
>     // register the heartbeatReceiver listener to make sure it's after the
>     // slow listener above
>     sc.listenerBus.addToManagementQueue(heartbeatReceiver)
>  
>     fakeSchedulerBackend.start()
>     val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
>     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", 
> dummyExecutorEndpoint1)
>  
>     // register the executor
>     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
>       RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, 
> Map.empty))
>  
>     // simulate the executor starting to heartbeat
>     val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
>         Heartbeat(executorId1,
>           Array.empty[(Long, Seq[AccumulatorV2[_, _]])],
>           BlockManagerId(executorId1, "1.2.3.4", 8008),
>           new ExecutorMetrics(Array.empty[Long])))
>  
>     assert(!response.reregisterBlockManager)
>   }
>  
> Will proposed a PR for the fix soon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to