This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 826ee60  [SPARK-23191][CORE] Warn rather than terminate when duplicate 
worker register happens
826ee60 is described below

commit 826ee6074e15e106f8e157a9697052da75428ac7
Author: wuyi <[email protected]>
AuthorDate: Tue May 28 11:59:29 2019 +0800

    [SPARK-23191][CORE] Warn rather than terminate when duplicate worker 
register happens
    
    ## What changes were proposed in this pull request?
    
    ### Standalone HA Background
    
    In Spark Standalone HA mode, we'll have multiple masters running at the 
same time. But, there's only one master leader, which actively serving 
scheduling requests. Once this master leader crashes, other masters would 
compete for the leader and only one master is guaranteed to be elected as new 
master leader, which would reconstruct the state from the original master 
leader and continute to serve scheduling requests.
    
    ### Related Issues
    
    #2828 firstly introduces the bug of *duplicate Worker registration*, and 
#3447 fixed it. But there're still corner cases(see SPARK-23191 for details) 
where #3447 can not cover it:
    
    * CASE 1
    (1) Initially, Worker registered with Master A.
    (2) After a while, the connection channel between Master A and Worker 
becomes inactive(e.g. due to network drop), and Worker is notified about that 
by calling `onDisconnected` from NettyRpcEnv
    (3) When Worker invokes `onDisconnected`, then, it will attempt to 
reconnect to all masters(including Master A)
    (4) At the meanwhile, network between Worker and Master A recover,  Worker 
successfully register to Master A again
    (5) Master A response with `RegisterWorkerFailed("Duplicate worker ID")`
    (6) Worker receives that msg, exit
    
    * CASE 2
    (1) Master A lost leadership(sends `RevokedLeadership` to itself). Master B 
takes over and recovery everything from master A(which would  register workers 
for the first time in Master B) and sends `MasterChanged` to Worker
    (2) Before Master A receives `RevokedLeadership`, it receives a late 
`HeartBeat` from Worker(which had been removed in Master A due to heartbeat 
timeout previously), so it sends `ReconnectWorker`  to worker
    (3) Worker receives `MasterChanged` before `ReconnectWorker` , changing 
masterRef to Master B
    (4) Subsequently, Worker receives `ReconnectWorker` from Master A, then it 
reconnects to all masters
    (5) Master B receives register request again from the Worker,  response 
with `RegisterWorkerFailed("Duplicate worker ID")`
    (6) Worker receives that msg, exit
    
    In CASE 1, it is difficult for the Worker to know Master A's state. 
Normally, Worker thinks Master A has already died and is impossible that Master 
A would response with Worker's re-connect request.
    
    In CASE 2, we can see race condition between `RevokedLeadership` and 
`HeartBeat`. Actually, Master A has already been revoked leadership while 
processing `HeartBeat` msg. That's means the state between Master and Zookeeper 
could be out of sync for a while.
    
    ### Solutions
    
    In this PR, instead of exiting Worker process when *duplicate Worker 
registration* happens, we suggest to log warn about it. This would be fine 
since Master actually perform no-op when it receives duplicate registration 
from a Worker. In turn, Worker could continue living with that Master normally 
without any side effect.
    
    ## How was this patch tested?
    
    Tested Manually.
    
    I followed the steps as  Neeraj Gupta suggested in JIRA SPARK-23191 to 
reproduce the case 1.
    
    Before this pr, Worker would be DEAD from UI.
    After this pr, Worker just warn the duplicate register behavior (as you can 
see the second last row in log snippet below), and still be ALIVE from UI.
    
    ```
    19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for 
master to reconnect...
    19/05/09 20:58:32 INFO Worker: wuyi.local:7077 Disassociated !
    19/05/09 20:58:32 INFO Worker: Connecting to master wuyi.local:7077...
    19/05/09 20:58:32 ERROR Worker: Connection to master failed! Waiting for 
master to reconnect...
    19/05/09 20:58:32 INFO Worker: Not spawning another attempt to register 
with the master, since there is an attempt scheduled already.
    19/05/09 20:58:37 WARN TransportClientFactory: DNS resolution for 
wuyi.local/127.0.0.1:7077 took 5005 ms
    19/05/09 20:58:37 INFO TransportClientFactory: Found inactive connection to 
wuyi.local/127.0.0.1:7077, creating a new one.
    19/05/09 20:58:37 INFO TransportClientFactory: Successfully created 
connection to wuyi.local/127.0.0.1:7077 after 3 ms (0 ms spent in bootstraps)
    19/05/09 20:58:37 WARN Worker: Duplicate registration at master 
spark://wuyi.local:7077
    19/05/09 20:58:37 INFO Worker: Successfully registered with master 
spark://wuyi.local:7077
    ```
    
    Closes #24569 from Ngone51/fix-worker-dup-register-error.
    
    Authored-by: wuyi <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/deploy/DeployMessage.scala   |  4 +++-
 .../scala/org/apache/spark/deploy/master/Master.scala   |  4 ++--
 .../scala/org/apache/spark/deploy/worker/Worker.scala   | 17 +++++++++++++----
 .../org/apache/spark/deploy/master/MasterSuite.scala    |  4 ++--
 4 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 49a319a..5723b0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -96,11 +96,13 @@ private[deploy] object DeployMessages {
    * @param masterWebUiUrl the master Web UI address
    * @param masterAddress the master address used by the worker to connect. It 
should be
    *                      [[RegisterWorker.masterAddress]].
+   * @param duplicate whether it is a duplicate register request from the 
worker
    */
   case class RegisteredWorker(
       master: RpcEndpointRef,
       masterWebUiUrl: String,
-      masterAddress: RpcAddress) extends DeployMessage with 
RegisterWorkerResponse
+      masterAddress: RpcAddress,
+      duplicate: Boolean) extends DeployMessage with RegisterWorkerResponse
 
   case class RegisterWorkerFailed(message: String) extends DeployMessage with 
RegisterWorkerResponse
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5db9b52..3c0a49e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -250,13 +250,13 @@ private[deploy] class Master(
       if (state == RecoveryState.STANDBY) {
         workerRef.send(MasterInStandby)
       } else if (idToWorker.contains(id)) {
-        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
+        workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
true))
       } else {
         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
           workerRef, workerWebUiUrl)
         if (registerWorker(worker)) {
           persistenceEngine.addWorker(worker)
-          workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
+          workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
false))
           schedule()
         } else {
           val workerAddress = worker.endpoint.address
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 974e5468..b432feb 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -393,12 +393,21 @@ private[deploy] class Worker(
 
   private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = 
synchronized {
     msg match {
-      case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
-        if (preferConfiguredMasterAddress) {
-          logInfo("Successfully registered with master " + 
masterAddress.toSparkURL)
+      case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, 
duplicate) =>
+        val preferredMasterAddress = if (preferConfiguredMasterAddress) {
+          masterAddress.toSparkURL
         } else {
-          logInfo("Successfully registered with master " + 
masterRef.address.toSparkURL)
+          masterRef.address.toSparkURL
         }
+
+        // there're corner cases which we could hardly avoid duplicate worker 
registration,
+        // e.g. Master disconnect(maybe due to network drop) and recover 
immediately, see
+        // SPARK-23191 for more details.
+        if (duplicate) {
+          logWarning(s"Duplicate registration at master 
$preferredMasterAddress")
+        }
+
+        logInfo(s"Successfully registered with master $preferredMasterAddress")
         registered = true
         changeMaster(masterRef, masterWebUiUrl, masterAddress)
         forwardMessageScheduler.scheduleAtFixedRate(
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 0c4b105..f19e998 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -71,7 +71,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = 
new SparkConf) extend
   val appDesc = DeployTestUtils.createAppDesc()
   val drivers = mutable.HashSet[String]()
   override def receive: PartialFunction[Any, Unit] = {
-    case RegisteredWorker(masterRef, _, _) =>
+    case RegisteredWorker(masterRef, _, _, _) =>
       masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
     case LaunchDriver(driverId, desc) =>
       drivers += driverId
@@ -626,7 +626,7 @@ class MasterSuite extends SparkFunSuite
       override val rpcEnv: RpcEnv = master.rpcEnv
 
       override def receive: PartialFunction[Any, Unit] = {
-        case RegisteredWorker(_, _, masterAddress) =>
+        case RegisteredWorker(_, _, masterAddress, _) =>
           receivedMasterAddress = masterAddress
       }
     })


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to