[
https://issues.apache.org/jira/browse/SPARK-48758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-48758:
-----------------------------------
Labels: pull-request-available (was: )
> Race condition between executor registration and heartbeat
> ----------------------------------------------------------
>
> Key: SPARK-48758
> URL: https://issues.apache.org/jira/browse/SPARK-48758
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.4.3
> Reporter: MJ Deng
> Priority: Major
> Labels: pull-request-available
>
> We found a race condition in our prod jobs when executor finished
> registration but when the it starts to heartbeat, the driver tells the
> executor is still unknown.
> The below test case will demonstrate the issue:
> test("test the executor registered states are consistent between scheduler
> and heartbeater") {
> val rpcEnv = sc.env.rpcEnv
> val fakeClusterManager = new FakeClusterManager(rpcEnv)
> val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm",
> fakeClusterManager)
> val listenerBus = new LiveListenerBus(sc.conf)
> val mockMetricsSystem = mock(classOf[MetricsSystem])
> listenerBus.start(sc, mockMetricsSystem)
> when(sc.listenerBus).thenReturn(listenerBus)
> when(sc.heartbeatReceiver).thenReturn(heartbeatReceiverRef)
> val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv,
> fakeClusterManagerRef)
> when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
>
> heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
>
> // simulate the slow event listener in the listener queue before the
> heartbeatReceiver listener
> sc.listenerBus.addToManagementQueue(new SparkListener() {
> override def onExecutorAdded(executorAdded:
> SparkListenerExecutorAdded): Unit = {
> if (executorAdded.executorId != "driver") {
> Thread.sleep(2000)
> }
> }
> })
> // register the heartbeatReceiver listener to make sure it's after the
> // slow listener above
> sc.listenerBus.addToManagementQueue(heartbeatReceiver)
>
> fakeSchedulerBackend.start()
> val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
> val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1",
> dummyExecutorEndpoint1)
>
> // register the executor
> fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
> RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0,
> Map.empty))
>
> // simulate the executor starting to heartbeat
> val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
> Heartbeat(executorId1,
> Array.empty[(Long, Seq[AccumulatorV2[_, _]])],
> BlockManagerId(executorId1, "1.2.3.4", 8008),
> new ExecutorMetrics(Array.empty[Long])))
>
> assert(!response.reregisterBlockManager)
> }
>
> Will proposed a PR for the fix soon.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]