waitinfuture commented on code in PR #2237:
URL:
https://github.com/apache/incubator-celeborn/pull/2237#discussion_r1467248347
##########
master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala:
##########
@@ -17,13 +17,15 @@
package org.apache.celeborn.service.deploy.master
+import scala.util.Random
Review Comment:
unnecessary import
##########
worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala:
##########
@@ -98,22 +139,65 @@ trait MiniClusterFeature extends Logging {
}
}
- def setUpMiniCluster(
+ private def setUpMiniCluster(
masterConf: Map[String, String] = null,
workerConf: Map[String, String] = null,
workerNum: Int = 3): (Master, collection.Set[Worker]) = {
val master = createMaster(masterConf)
- val masterThread = runnerWrap(master.rpcEnv.awaitTermination())
+ val masterStartedSignal = Array(false)
+ val masterThread = new RunnerWrap({
+ try {
+ masterStartedSignal(0) = true
+ master.rpcEnv.awaitTermination()
+ } catch {
+ case ex: Exception =>
+ masterStartedSignal(0) = false
+ throw ex
+ }
+ })
masterThread.start()
masterInfo = (master, masterThread)
- Thread.sleep(5000L)
- (1 to workerNum).foreach { _ =>
- val worker = createWorker(workerConf)
- val workerThread = runnerWrap(worker.initialize())
- workerThread.start()
- workerInfos.put(worker, workerThread)
+ Thread.sleep(20000L)
Review Comment:
This can make it slow, maybe use a loop to check every 5s and break if
`masterStartedSignal` is true
##########
worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala:
##########
@@ -98,22 +139,65 @@ trait MiniClusterFeature extends Logging {
}
}
- def setUpMiniCluster(
+ private def setUpMiniCluster(
masterConf: Map[String, String] = null,
workerConf: Map[String, String] = null,
workerNum: Int = 3): (Master, collection.Set[Worker]) = {
val master = createMaster(masterConf)
- val masterThread = runnerWrap(master.rpcEnv.awaitTermination())
+ val masterStartedSignal = Array(false)
+ val masterThread = new RunnerWrap({
+ try {
+ masterStartedSignal(0) = true
+ master.rpcEnv.awaitTermination()
+ } catch {
+ case ex: Exception =>
+ masterStartedSignal(0) = false
+ throw ex
+ }
+ })
masterThread.start()
masterInfo = (master, masterThread)
- Thread.sleep(5000L)
- (1 to workerNum).foreach { _ =>
- val worker = createWorker(workerConf)
- val workerThread = runnerWrap(worker.initialize())
- workerThread.start()
- workerInfos.put(worker, workerThread)
+ Thread.sleep(20000L)
+
+ if (!masterStartedSignal.head) {
+ throw new BindException("cannot start master rpc endpoint")
}
- Thread.sleep(5000L)
+
+ val workers = new Array[Worker](workerNum)
+ val threads = (1 to workerNum).map { i =>
+ val workerThread = new RunnerWrap({
+ var workerStarted = false
+ var workerStartRetry = 0
+ while (!workerStarted) {
+ try {
+ val worker = createWorker(workerConf)
+ this.synchronized {
+ workers(i - 1) = worker
+ }
+ worker.initialize()
+ workerStarted = true
+ } catch {
+ case ex: Exception =>
+ if (workers(i - 1) != null) {
+ workers(i - 1).shutdownGracefully()
+ }
+ workerStartRetry += 1
+ logError(s"cannot start worker $i, retrying: ", ex)
+ if (workerStartRetry == 3) {
+ logError(s"cannot start worker $i, reached to max retrying",
ex)
+ throw ex
+ } else {
+ Thread.sleep(math.pow(5000, workerStartRetry).toInt)
+ }
+ }
+ }
+ })
+ workerThread.setName(s"worker ${i} starter thread")
+ workerThread
+ }
+ threads.foreach(_.start())
+ Thread.sleep(20000)
Review Comment:
ditto, we can check whether worker has started every 5s
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]