This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 109b1e4a79d [SPARK-46346][CORE] Fix Master to update a worker from
`UNKNOWN` to `ALIVE` on `RegisterWorker` msg
109b1e4a79d is described below
commit 109b1e4a79d9a5ec33944887a34c92d453016902
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Dec 10 11:19:46 2023 -0800
[SPARK-46346][CORE] Fix Master to update a worker from `UNKNOWN` to `ALIVE`
on `RegisterWorker` msg
### What changes were proposed in this pull request?
This PR aims to fix `Spark Master`'s recovery process to update a worker
status from `UNKNOWN` to `ALIVE` when it receives a `RegisterWroker` message
from that worker.
### Why are the changes needed?
This only happens during the recovery.
- `Master` already has the recovered worker information in memory with
`UNKNOWN` status.
- `Worker` sends `RegisterWorker` message correctly.
- `Master` keeps its worker status in `UNKNOWN` and informs the worker with
`RegisteredWorker` message with `duplicated` flag.
- Since `Worker` received like the following and will not try to reconnect.
```
23/12/09 23:49:57 INFO Worker: Retrying connection to master (attempt # 3)
23/12/09 23:49:57 INFO Worker: Connecting to master ...:7077...
23/12/09 23:50:04 INFO TransportClientFactory: Successfully created
connection to master...:7077 after 7089 ms (0 ms spent in bootstraps)
23/12/09 23:50:04 WARN Worker: Duplicate registration at master spark://...
23/12/09 23:50:04 INFO Worker: Successfully registered with master
spark://...
```
The `UNKNOWN`-status workers blocks the recovery process and causes a long
delay.
https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L604-L606
After the delay, master simply kills them all.
https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L647-L649
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This case is a little hard to make a unit test.
Manually test.
- Master
```
23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC.
java.io.IOException: Connecting to /***:1024 timed out (10000 ms)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
23/12/10 04:58:54 INFO Master: Registering worker ***:1024 with 2 cores,
23.0 GiB RAM
23/12/10 04:58:54 INFO Master: Worker has been re-registered:
worker-20231210045613-***-1024
```
- Worker
```
23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5)
23/12/10 04:58:45 INFO Worker: Connecting to master master:7077...
23/12/10 04:58:54 INFO TransportClientFactory: Successfully created
connection to master/...:7077 after 63957 ms (0 ms spent in bootstraps)
23/12/10 04:58:54 WARN Worker: Duplicate registration at master
spark://master:7077
23/12/10 04:58:54 INFO Worker: Successfully registered with master
spark://master:7077
23/12/10 04:58:54 INFO Worker: WorkerWebUI is available at
https://...-1***-1024
23/12/10 04:58:54 INFO Worker: Worker cleanup enabled; old application
directories will be deleted in: /data/spark
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44280 from dongjoon-hyun/SPARK-46346.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7346c80aff4..a550f44fc0a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -286,6 +286,10 @@ private[deploy] class Master(
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
+ if (idToWorker(id).state == WorkerState.UNKNOWN) {
+ logInfo("Worker has been re-registered: " + id)
+ idToWorker(id).state = WorkerState.ALIVE
+ }
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress,
true))
} else {
val workerResources =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]