CodingCat commented on code in PR #2237:
URL: 
https://github.com/apache/incubator-celeborn/pull/2237#discussion_r1468135136


##########
worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala:
##########
@@ -98,22 +139,65 @@ trait MiniClusterFeature extends Logging {
     }
   }
 
-  def setUpMiniCluster(
+  private def setUpMiniCluster(
       masterConf: Map[String, String] = null,
       workerConf: Map[String, String] = null,
       workerNum: Int = 3): (Master, collection.Set[Worker]) = {
     val master = createMaster(masterConf)
-    val masterThread = runnerWrap(master.rpcEnv.awaitTermination())
+    val masterStartedSignal = Array(false)
+    val masterThread = new RunnerWrap({
+      try {
+        masterStartedSignal(0) = true
+        master.rpcEnv.awaitTermination()
+      } catch {
+        case ex: Exception =>
+          masterStartedSignal(0) = false
+          throw ex
+      }
+    })
     masterThread.start()
     masterInfo = (master, masterThread)
-    Thread.sleep(5000L)
-    (1 to workerNum).foreach { _ =>
-      val worker = createWorker(workerConf)
-      val workerThread = runnerWrap(worker.initialize())
-      workerThread.start()
-      workerInfos.put(worker, workerThread)
+    Thread.sleep(20000L)
+
+    if (!masterStartedSignal.head) {
+      throw new BindException("cannot start master rpc endpoint")
     }
-    Thread.sleep(5000L)
+
+    val workers = new Array[Worker](workerNum)
+    val threads = (1 to workerNum).map { i =>
+      val workerThread = new RunnerWrap({
+        var workerStarted = false
+        var workerStartRetry = 0
+        while (!workerStarted) {
+          try {
+            val worker = createWorker(workerConf)
+            this.synchronized {
+              workers(i - 1) = worker
+            }
+            worker.initialize()
+            workerStarted = true
+          } catch {
+            case ex: Exception =>
+              if (workers(i - 1) != null) {
+                workers(i - 1).shutdownGracefully()
+              }
+              workerStartRetry += 1
+              logError(s"cannot start worker $i, retrying: ", ex)
+              if (workerStartRetry == 3) {
+                logError(s"cannot start worker $i, reached to max retrying", 
ex)
+                throw ex
+              } else {
+                Thread.sleep(math.pow(5000, workerStartRetry).toInt)
+              }
+          }
+        }
+      })
+      workerThread.setName(s"worker ${i} starter thread")
+      workerThread
+    }
+    threads.foreach(_.start())
+    Thread.sleep(20000)

Review Comment:
   sure, changed, bring a similar logic like master 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to