This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 826ee60 [SPARK-23191][CORE] Warn rather than terminate when duplicate
worker register happens
826ee60 is described below
commit 826ee6074e15e106f8e157a9697052da75428ac7
Author: wuyi <[email protected]>
AuthorDate: Tue May 28 11:59:29 2019 +0800
[SPARK-23191][CORE] Warn rather than terminate when duplicate worker
register happens
## What changes were proposed in this pull request?
### Standalone HA Background
In Spark Standalone HA mode, we'll have multiple masters running at the
same time. But, there's only one master leader, which actively serving
scheduling requests. Once this master leader crashes, other masters would
compete for the leader and only one master is guaranteed to be elected as new
master leader, which would reconstruct the state from the original master
leader and continute to serve scheduling requests.
### Related Issues
#2828 firstly introduces the bug of *duplicate Worker registration*, and
#3447 fixed it. But there're still corner cases(see SPARK-23191 for details)
where #3447 can not cover it:
* CASE 1
(1) Initially, Worker registered with Master A.
(2) After a while, the connection channel between Master A and Worker
becomes inactive(e.g. due to network drop), and Worker is notified about that
by calling `onDisconnected` from NettyRpcEnv
(3) When Worker invokes `onDisconnected`, then, it will attempt to
reconnect to all masters(including Master A)
(4) At the meanwhile, network between Worker and Master A recover, Worker
successfully register to Master A again
(5) Master A response with `RegisterWorkerFailed("Duplicate worker ID")`
(6) Worker receives that msg, exit
* CASE 2
(1) Master A lost leadership(sends `RevokedLeadership` to itself). Master B
takes over and recovery everything from master A(which would register workers
for the first time in Master B) and sends `MasterChanged` to Worker
(2) Before Master A receives `RevokedLeadership`, it receives a late
`HeartBeat` from Worker(which had been removed in Master A due to heartbeat
timeout previously), so it sends `ReconnectWorker` to worker
(3) Worker receives `MasterChanged` before `ReconnectWorker` , changing
masterRef to Master B
(4) Subsequently, Worker receives `ReconnectWorker` from Master A, then it
reconnects to all masters
(5) Master B receives register request again from the Worker, response
with `RegisterWorkerFailed("Duplicate worker ID")`
(6) Worker receives that msg, exit
In CASE 1, it is difficult for the Worker to know Master A's state.
Normally, Worker thinks Master A has already died and is impossible that Master
A would response with Worker's re-connect request.
In CASE 2, we can see race condition between `RevokedLeadership` and
`HeartBeat`. Actually, Master A has already been revoked leadership while
processing `HeartBeat` msg. That's means the state between Master and Zookeeper
could be out of sync for a while.
### Solutions
In this PR, instead of exiting Worker process when *duplicate Worker
registration* happens, we suggest to log warn about it. This would be fine
since Master actually perform no-op when it receives duplicate registration
from a Worker. In turn, Worker could continue living with that Master normally
without any side effect.
## How was this patch tested?
Tested Manually.
I followed the steps as Neeraj Gupta suggested in JIRA SPARK-23191 to
reproduce the case 1.
Before this pr, Worker would be DEAD from UI.
After this pr, Worker just warn the duplicate register behavior (as you can
see the second last row in log snippet below), and still be ALIVE from UI.
```
19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for
master to reconnect...
19/05/09 20:58:32 INFO Worker: wuyi.local:7077 Disassociated !
19/05/09 20:58:32 INFO Worker: Connecting to master wuyi.local:7077...
19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for
master to reconnect...
19/05/09 20:58:32 INFO Worker: Not spawning another attempt to register
with the master, since there is an attempt scheduled already.
19/05/09 20:58:37 WARN TransportClientFactory: DNS resolution for
wuyi.local/127.0.0.1:7077 took 5005 ms
19/05/09 20:58:37 INFO TransportClientFactory: Found inactive connection to
wuyi.local/127.0.0.1:7077, creating a new one.
19/05/09 20:58:37 INFO TransportClientFactory: Successfully created
connection to wuyi.local/127.0.0.1:7077 after 3 ms (0 ms spent in bootstraps)
19/05/09 20:58:37 WARN Worker: Duplicate registration at master
spark://wuyi.local:7077
19/05/09 20:58:37 INFO Worker: Successfully registered with master
spark://wuyi.local:7077
```
Closes #24569 from Ngone51/fix-worker-dup-register-error.
Authored-by: wuyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/deploy/DeployMessage.scala | 4 +++-
.../scala/org/apache/spark/deploy/master/Master.scala | 4 ++--
.../scala/org/apache/spark/deploy/worker/Worker.scala | 17 +++++++++++++----
.../org/apache/spark/deploy/master/MasterSuite.scala | 4 ++--
4 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 49a319a..5723b0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -96,11 +96,13 @@ private[deploy] object DeployMessages {
* @param masterWebUiUrl the master Web UI address
* @param masterAddress the master address used by the worker to connect. It
should be
* [[RegisterWorker.masterAddress]].
+ * @param duplicate whether it is a duplicate register request from the
worker
*/
case class RegisteredWorker(
master: RpcEndpointRef,
masterWebUiUrl: String,
- masterAddress: RpcAddress) extends DeployMessage with
RegisterWorkerResponse
+ masterAddress: RpcAddress,
+ duplicate: Boolean) extends DeployMessage with RegisterWorkerResponse
case class RegisterWorkerFailed(message: String) extends DeployMessage with
RegisterWorkerResponse
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5db9b52..3c0a49e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -250,13 +250,13 @@ private[deploy] class Master(
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
- workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
+ workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress,
true))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
- workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
+ workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress,
false))
schedule()
} else {
val workerAddress = worker.endpoint.address
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 974e5468..b432feb 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -393,12 +393,21 @@ private[deploy] class Worker(
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit =
synchronized {
msg match {
- case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
- if (preferConfiguredMasterAddress) {
- logInfo("Successfully registered with master " +
masterAddress.toSparkURL)
+ case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress,
duplicate) =>
+ val preferredMasterAddress = if (preferConfiguredMasterAddress) {
+ masterAddress.toSparkURL
} else {
- logInfo("Successfully registered with master " +
masterRef.address.toSparkURL)
+ masterRef.address.toSparkURL
}
+
+ // there're corner cases which we could hardly avoid duplicate worker
registration,
+ // e.g. Master disconnect(maybe due to network drop) and recover
immediately, see
+ // SPARK-23191 for more details.
+ if (duplicate) {
+ logWarning(s"Duplicate registration at master
$preferredMasterAddress")
+ }
+
+ logInfo(s"Successfully registered with master $preferredMasterAddress")
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwardMessageScheduler.scheduleAtFixedRate(
diff --git
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 0c4b105..f19e998 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -71,7 +71,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf =
new SparkConf) extend
val appDesc = DeployTestUtils.createAppDesc()
val drivers = mutable.HashSet[String]()
override def receive: PartialFunction[Any, Unit] = {
- case RegisteredWorker(masterRef, _, _) =>
+ case RegisteredWorker(masterRef, _, _, _) =>
masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
case LaunchDriver(driverId, desc) =>
drivers += driverId
@@ -626,7 +626,7 @@ class MasterSuite extends SparkFunSuite
override val rpcEnv: RpcEnv = master.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
- case RegisteredWorker(_, _, masterAddress) =>
+ case RegisteredWorker(_, _, masterAddress, _) =>
receivedMasterAddress = masterAddress
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]