This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2efdf755c [CELEBORN-1711][TEST] Fix flaky test caused by master/worker 
setup issue
2efdf755c is described below

commit 2efdf755ccbc5d44d22b8f7ed3dadfa80afb3fb7
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Dec 17 10:45:40 2024 +0800

    [CELEBORN-1711][TEST] Fix flaky test caused by master/worker setup issue
    
    ### What changes were proposed in this pull request?
    
    1. retry on BindException when starting master/worker http server
    2. record the used ports and pre-check whether the selected port is used or 
bounded before binding
    
    ### Why are the changes needed?
    
    To fix flaky test.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    GA.
    
    Closes #2906 from turboFei/retry_master_suite.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../ApiMasterResourceAuthenticationSuite.scala     |  36 +----
 .../deploy/master/MasterClusterFeature.scala       | 157 +++++++++++++++++++++
 .../service/deploy/master/MasterSuite.scala        | 137 ++++++++++--------
 .../master/http/api/ApiMasterResourceSuite.scala   |  36 +----
 .../http/api/v1/ApiV1MasterResourceSuite.scala     |  36 +----
 .../server/common/http/HttpTestHelper.scala        |   1 -
 .../service/deploy/MiniClusterFeature.scala        |  62 +++++---
 .../http/api/v1/ApiV1WorkerResourceSuite.scala     |   2 +-
 .../ApiWorkerResourceAuthenticationSuite.scala     |  18 ++-
 9 files changed, 296 insertions(+), 189 deletions(-)

diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
index 50a3eaf6f..d96607197 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
@@ -17,50 +17,22 @@
 
 package org.apache.celeborn.service.deploy.master
 
-import java.nio.file.Files
-
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.HttpService
 import 
org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
 
-class ApiMasterResourceAuthenticationSuite extends 
ApiBaseResourceAuthenticationSuite {
+class ApiMasterResourceAuthenticationSuite extends 
ApiBaseResourceAuthenticationSuite
+  with MasterClusterFeature {
   private var master: Master = _
 
   override protected def httpService: HttpService = master
 
-  def getTmpDir(): String = {
-    val tmpDir = Files.createTempDirectory(null).toFile
-    tmpDir.deleteOnExit()
-    tmpDir.getAbsolutePath
-  }
-
   override def beforeAll(): Unit = {
-    val randomMasterPort = Utils.selectRandomInt(1024, 65535)
-    val randomHttpPort = randomMasterPort + 1
-    celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
-    celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
-    celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
-    celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
-    celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, 
randomHttpPort.toString)
-
-    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
-    val masterArgs = new MasterArguments(args, celebornConf)
-    master = new Master(celebornConf, masterArgs)
-    ThreadUtils.newThread(
-      new Runnable {
-        override def run(): Unit = {
-          master.initialize()
-        }
-      },
-      "api-master-thread").start()
+    master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
-    master.rpcEnv.shutdown()
+    shutdownMaster()
   }
 }
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala
new file mode 100644
index 000000000..65995bfc3
--- /dev/null
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master
+
+import java.io.IOException
+import java.net.{BindException, InetSocketAddress, Socket}
+import java.util.concurrent.TimeUnit
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+
+trait MasterClusterFeature extends Logging {
+  var masterInfo: (Master, Thread) = _
+
+  val maxRetries = 3
+  val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30)
+
+  class RunnerWrap[T](code: => T) extends Thread {
+    override def run(): Unit = {
+      Utils.tryLogNonFatalError(code)
+    }
+  }
+
+  val usedPorts = new java.util.HashSet[Integer]()
+  def portBounded(port: Int): Boolean = {
+    val socket = new Socket()
+    try {
+      socket.connect(new InetSocketAddress("localhost", port), 100)
+      true
+    } catch {
+      case _: IOException => false
+    } finally {
+      socket.close()
+    }
+  }
+  def selectRandomPort(): Int = synchronized {
+    val port = Utils.selectRandomInt(1024, 65535)
+    val portUsed = usedPorts.contains(port) || portBounded(port)
+    usedPorts.add(port)
+    if (portUsed) {
+      selectRandomPort()
+    } else {
+      port
+    }
+  }
+
+  def withRetryOnPortBindException(f: () => Unit): Unit = {
+    var retryCount = 0
+    var pass = false
+    while (!pass) {
+      try {
+        f()
+        pass = true
+      } catch {
+        case e: IOException
+            if e.isInstanceOf[BindException] || Option(e.getCause).exists(
+              _.isInstanceOf[BindException]) =>
+          logError(s"failed due to BindException, retrying (retry count: 
$retryCount)", e)
+          retryCount += 1
+          if (retryCount == maxRetries) {
+            logError("failed due to reach the max retry count", e)
+            throw e
+          }
+      }
+    }
+  }
+
+  def setupMasterWithRandomPort(masterConf: Map[String, String] = Map()): 
Master = {
+    var master: Master = null
+    withRetryOnPortBindException { () =>
+      val randomPort = selectRandomPort()
+      val randomInternalPort = selectRandomPort()
+      val finalMasterConf = Map(
+        s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
+        s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
+        s"${CelebornConf.MASTER_PORT.key}" -> s"$randomPort",
+        s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort",
+        s"${CelebornConf.MASTER_INTERNAL_PORT.key}" -> s"$randomInternalPort",
+        s"${CelebornConf.MASTER_INTERNAL_ENDPOINTS.key}" -> 
s"localhost:$randomInternalPort") ++
+        masterConf
+      master = setUpMaster(masterConf = finalMasterConf)
+    }
+    master
+  }
+
+  private def createMaster(map: Map[String, String] = null): Master = {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.METRICS_ENABLED.key, "false")
+    val httpPort = selectRandomPort()
+    conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort")
+    logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort")
+    if (map != null) {
+      map.foreach(m => conf.set(m._1, m._2))
+    }
+
+    val masterArguments = new MasterArguments(Array(), conf)
+    val master = new Master(conf, masterArguments)
+    if (conf.metricsSystemEnable) {
+      master.metricsSystem.start()
+    }
+    master.startHttpServer()
+
+    Thread.sleep(3000L)
+    master
+  }
+
+  def setUpMaster(masterConf: Map[String, String] = null): Master = {
+    val master = createMaster(masterConf)
+    val masterStartedSignal = Array(false)
+    val masterThread = new RunnerWrap({
+      try {
+        masterStartedSignal(0) = true
+        master.rpcEnv.awaitTermination()
+      } catch {
+        case ex: Exception =>
+          masterStartedSignal(0) = false
+          throw ex
+      }
+    })
+    masterThread.start()
+    masterInfo = (master, masterThread)
+    var masterStartWaitingTime = 0
+    while (!masterStartedSignal.head) {
+      logInfo("waiting for master node starting")
+      if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
+        throw new BindException("cannot start master rpc endpoint")
+      }
+      Thread.sleep(3000)
+      masterStartWaitingTime += 3000
+    }
+    master
+  }
+
+  def shutdownMaster(): Unit = {
+    masterInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+    masterInfo._1.rpcEnv.shutdown()
+    Thread.sleep(3000)
+    masterInfo._2.interrupt()
+    usedPorts.clear()
+  }
+}
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index ed27aac48..7b7a7a1f6 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -24,14 +24,13 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout, 
PbRegisterWorker}
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils}
 
 class MasterSuite extends AnyFunSuite
   with BeforeAndAfterAll
   with BeforeAndAfterEach
-  with Logging {
+  with MasterClusterFeature {
 
   def getTmpDir(): String = {
     val tmpDir = Files.createTempDirectory(null).toFile
@@ -40,72 +39,86 @@ class MasterSuite extends AnyFunSuite
   }
 
   test("test single node startup functionality") {
-    val conf = new CelebornConf()
-    val randomMasterPort = Utils.selectRandomInt(1024, 65535)
-    val randomHttpPort = randomMasterPort + 1
-    conf.set(CelebornConf.HA_ENABLED.key, "false")
-    conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
-    conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
-    conf.set(CelebornConf.METRICS_ENABLED.key, "true")
-    conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
-    conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
-
-    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
-    val masterArgs = new MasterArguments(args, conf)
-    val master = new Master(conf, masterArgs)
-    ThreadUtils.newThread(
-      new Runnable {
-        override def run(): Unit = {
-          master.initialize()
-        }
-      },
-      "master-init-thread").start()
-    Thread.sleep(5000L)
-    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
-    master.rpcEnv.shutdown()
+    withRetryOnPortBindException { () =>
+      val conf = new CelebornConf()
+      val randomMasterPort = selectRandomPort()
+      val randomHttpPort = selectRandomPort()
+      conf.set(CelebornConf.HA_ENABLED.key, "false")
+      conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+      conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+      conf.set(CelebornConf.METRICS_ENABLED.key, "true")
+      conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+      conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
+
+      val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+
+      val masterArgs = new MasterArguments(args, conf)
+      val master = new Master(conf, masterArgs)
+      ThreadUtils.newThread(
+        new Runnable {
+          override def run(): Unit = {
+            master.initialize()
+          }
+        },
+        "master-init-thread").start()
+      Thread.sleep(5000L)
+      master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+      master.rpcEnv.shutdown()
+    }
   }
 
   test("test dedicated internal port receives") {
-    val conf = new CelebornConf()
-    conf.set(CelebornConf.HA_ENABLED.key, "false")
-    conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
-    conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
-    conf.set(CelebornConf.METRICS_ENABLED.key, "true")
-    conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
-
-    val args = Array("-h", "localhost", "-p", "9097", "--internal-port", 
"8097")
-
-    val masterArgs = new MasterArguments(args, conf)
-    val master = new Master(conf, masterArgs)
-    ThreadUtils.newThread(
-      new Runnable {
-        override def run(): Unit = {
-          master.initialize()
-        }
-      },
-      "master-init-thread").start()
-    Thread.sleep(5000L)
-    master.receive.applyOrElse(
-      PbCheckForWorkerTimeout.newBuilder().build(),
-      (_: Any) => fail("Unexpected message"))
-    master.internalRpcEndpoint.receive.applyOrElse(
-      PbCheckForWorkerTimeout.newBuilder().build(),
-      (_: Any) => fail("Unexpected message"))
-
-    master.internalRpcEndpoint.receiveAndReply(
-      
mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse(
-      PbRegisterWorker.newBuilder().build(),
-      (_: Any) => fail("Unexpected message"))
-    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
-    master.rpcEnv.shutdown()
-    master.internalRpcEnvInUse.shutdown()
+    withRetryOnPortBindException { () =>
+      val conf = new CelebornConf()
+      val randomMasterPort = selectRandomPort()
+      val randomHttpPort = selectRandomPort()
+      val randomInternalPort = selectRandomPort()
+      conf.set(CelebornConf.HA_ENABLED.key, "false")
+      conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+      conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+      conf.set(CelebornConf.METRICS_ENABLED.key, "true")
+      conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+      conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
+
+      val args = Array(
+        "-h",
+        "localhost",
+        "-p",
+        randomMasterPort.toString,
+        "--internal-port",
+        randomInternalPort.toString)
+
+      val masterArgs = new MasterArguments(args, conf)
+      val master = new Master(conf, masterArgs)
+      ThreadUtils.newThread(
+        new Runnable {
+          override def run(): Unit = {
+            master.initialize()
+          }
+        },
+        "master-init-thread").start()
+      Thread.sleep(5000L)
+      master.receive.applyOrElse(
+        PbCheckForWorkerTimeout.newBuilder().build(),
+        (_: Any) => fail("Unexpected message"))
+      master.internalRpcEndpoint.receive.applyOrElse(
+        PbCheckForWorkerTimeout.newBuilder().build(),
+        (_: Any) => fail("Unexpected message"))
+
+      master.internalRpcEndpoint.receiveAndReply(
+        
mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse(
+        PbRegisterWorker.newBuilder().build(),
+        (_: Any) => fail("Unexpected message"))
+      master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+      master.rpcEnv.shutdown()
+      master.internalRpcEnvInUse.shutdown()
+    }
   }
 
   test("test master worker host allow and deny pattern") {
     val conf = new CelebornConf()
-    val randomMasterPort = Utils.selectRandomInt(1024, 65535)
-    val randomHttpPort = randomMasterPort + 1
+    val randomMasterPort = selectRandomPort()
+    val randomHttpPort = selectRandomPort()
     conf.set(CelebornConf.HA_ENABLED.key, "false")
     conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
index fba331bb2..d34377f08 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
@@ -17,54 +17,26 @@
 
 package org.apache.celeborn.service.deploy.master.http.api
 
-import java.nio.file.Files
 import javax.ws.rs.client.Entity
 import javax.ws.rs.core.{Form, MediaType}
 
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.HttpService
 import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
-import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
+import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature}
 
-class ApiMasterResourceSuite extends ApiBaseResourceSuite {
+class ApiMasterResourceSuite extends ApiBaseResourceSuite with 
MasterClusterFeature {
   private var master: Master = _
 
   override protected def httpService: HttpService = master
 
-  def getTmpDir(): String = {
-    val tmpDir = Files.createTempDirectory(null).toFile
-    tmpDir.deleteOnExit()
-    tmpDir.getAbsolutePath
-  }
-
   override def beforeAll(): Unit = {
-    val randomMasterPort = Utils.selectRandomInt(1024, 65535)
-    val randomHttpPort = randomMasterPort + 1
-    celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
-    celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
-    celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
-    celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
-    celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, 
randomHttpPort.toString)
-
-    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
-    val masterArgs = new MasterArguments(args, celebornConf)
-    master = new Master(celebornConf, masterArgs)
-    ThreadUtils.newThread(
-      new Runnable {
-        override def run(): Unit = {
-          master.initialize()
-        }
-      },
-      "master-init-thread").start()
+    master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
-    master.rpcEnv.shutdown()
+    shutdownMaster()
   }
 
   test("masterGroupInfo") {
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
index edcec11c0..69dd19e4d 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
@@ -17,57 +17,29 @@
 
 package org.apache.celeborn.service.deploy.master.http.api.v1
 
-import java.nio.file.Files
 import java.util.Collections
 import javax.servlet.http.HttpServletResponse
 import javax.ws.rs.client.Entity
 import javax.ws.rs.core.MediaType
 
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
 import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, 
ExcludeWorkerRequest, HandleResponse, HostnamesResponse, 
RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, ShufflesResponse, 
WorkerEventsResponse, WorkerId, WorkersResponse}
 import org.apache.celeborn.server.common.HttpService
 import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite
-import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
+import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature}
 
-class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite {
+class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite with 
MasterClusterFeature {
   private var master: Master = _
 
   override protected def httpService: HttpService = master
 
-  def getTmpDir(): String = {
-    val tmpDir = Files.createTempDirectory(null).toFile
-    tmpDir.deleteOnExit()
-    tmpDir.getAbsolutePath
-  }
-
   override def beforeAll(): Unit = {
-    val randomMasterPort = Utils.selectRandomInt(1024, 65535)
-    val randomHttpPort = randomMasterPort + 1
-    celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
-    celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
-    celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
-    celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
-    celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, 
randomHttpPort.toString)
-
-    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
-    val masterArgs = new MasterArguments(args, celebornConf)
-    master = new Master(celebornConf, masterArgs)
-    ThreadUtils.newThread(
-      new Runnable {
-        override def run(): Unit = {
-          master.initialize()
-        }
-      },
-      "master-init-thread").start()
+    master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
-    master.rpcEnv.shutdown()
+    shutdownMaster()
   }
 
   test("shuffle resource") {
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
index 6a6ee9242..8674fc39c 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
@@ -64,7 +64,6 @@ trait HttpTestHelper extends AnyFunSuite
   override def beforeAll(): Unit = {
     super.beforeAll()
     restApiBaseSuite.setUp()
-    Thread.sleep(1000) // sleep for http server initialization
   }
 
   override def afterAll(): Unit = {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index f393872f9..8c3a47a07 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.service.deploy
 
 import java.io.IOException
-import java.net.BindException
+import java.net.{BindException, InetSocketAddress, Socket}
 import java.nio.file.Files
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
@@ -28,7 +28,6 @@ import scala.collection.mutable
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
-import org.apache.celeborn.common.util.Utils.selectRandomInt
 import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
 import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -39,6 +38,10 @@ trait MiniClusterFeature extends Logging {
   val workerInfos = new mutable.HashMap[Worker, Thread]()
   var workerConfForAdding: Map[String, String] = _
 
+  val maxRetries = 4
+  val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
+  val workersWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
+
   class RunnerWrap[T](code: => T) extends Thread {
 
     override def run(): Unit = {
@@ -46,6 +49,29 @@ trait MiniClusterFeature extends Logging {
     }
   }
 
+  val usedPorts = new java.util.HashSet[Integer]()
+  def portBounded(port: Int): Boolean = {
+    val socket = new Socket()
+    try {
+      socket.connect(new InetSocketAddress("localhost", port), 100)
+      true
+    } catch {
+      case _: IOException => false
+    } finally {
+      socket.close()
+    }
+  }
+  def selectRandomPort(): Int = synchronized {
+    val port = Utils.selectRandomInt(1024, 65535)
+    val portUsed = usedPorts.contains(port) || portBounded(port)
+    usedPorts.add(port)
+    if (portUsed) {
+      selectRandomPort()
+    } else {
+      port
+    }
+  }
+
   def setupMiniClusterWithRandomPorts(
       masterConf: Map[String, String] = Map(),
       workerConf: Map[String, String] = Map(),
@@ -56,8 +82,8 @@ trait MiniClusterFeature extends Logging {
     var workers: collection.Set[Worker] = null
     while (!created) {
       try {
-        val randomPort = selectRandomInt(1024, 65535)
-        val randomInternalPort = selectRandomInt(1024, 65535)
+        val randomPort = selectRandomPort()
+        val randomInternalPort = selectRandomPort()
         val finalMasterConf = Map(
           s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
           s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
@@ -84,7 +110,7 @@ trait MiniClusterFeature extends Logging {
               _.isInstanceOf[BindException]) =>
           logError(s"failed to setup mini cluster, retrying (retry count: 
$retryCount)", e)
           retryCount += 1
-          if (retryCount == 3) {
+          if (retryCount == maxRetries) {
             logError("failed to setup mini cluster, reached the max retry 
count", e)
             throw e
           }
@@ -103,7 +129,7 @@ trait MiniClusterFeature extends Logging {
   private def createMaster(map: Map[String, String] = null): Master = {
     val conf = new CelebornConf()
     conf.set(CelebornConf.METRICS_ENABLED.key, "false")
-    val httpPort = selectRandomInt(1024, 65535)
+    val httpPort = selectRandomPort()
     conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort")
     logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort")
     if (map != null) {
@@ -128,7 +154,7 @@ trait MiniClusterFeature extends Logging {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
     conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
     conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
-    conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomInt(1024, 
65535)}")
+    conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")
     conf.set("celeborn.fetch.io.threads", "4")
     conf.set("celeborn.push.io.threads", "4")
     if (map != null) {
@@ -151,7 +177,6 @@ trait MiniClusterFeature extends Logging {
   }
 
   def setUpMaster(masterConf: Map[String, String] = null): Master = {
-    val timeout = 30000
     val master = createMaster(masterConf)
     val masterStartedSignal = Array(false)
     val masterThread = new RunnerWrap({
@@ -167,13 +192,13 @@ trait MiniClusterFeature extends Logging {
     masterThread.start()
     masterInfo = (master, masterThread)
     var masterStartWaitingTime = 0
-    Thread.sleep(5000)
     while (!masterStartedSignal.head) {
       logInfo("waiting for master node starting")
-      masterStartWaitingTime += 5000
-      if (masterStartWaitingTime >= timeout) {
+      if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
         throw new BindException("cannot start master rpc endpoint")
       }
+      Thread.sleep(5000)
+      masterStartWaitingTime += 5000
     }
     master
   }
@@ -181,7 +206,6 @@ trait MiniClusterFeature extends Logging {
   def setUpWorkers(
       workerConf: Map[String, String] = null,
       workerNum: Int = 3): collection.Set[Worker] = {
-    val timeout = 30000
     val workers = new Array[Worker](workerNum)
     val flagUpdateLock = new ReentrantLock()
     val threads = (1 to workerNum).map { i =>
@@ -204,11 +228,9 @@ trait MiniClusterFeature extends Logging {
               workerStarted = false
               workerStartRetry += 1
               logError(s"cannot start worker $i, retrying: ", ex)
-              if (workerStartRetry == 3) {
+              if (workerStartRetry == maxRetries) {
                 logError(s"cannot start worker $i, reached to max retrying", 
ex)
                 throw ex
-              } else {
-                TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
               }
           }
         }
@@ -238,12 +260,12 @@ trait MiniClusterFeature extends Logging {
       } catch {
         case ex: Throwable =>
           logError("all workers haven't been started retrying", ex)
-          Thread.sleep(5000)
-          workersWaitingTime += 5000
-          if (workersWaitingTime >= timeout) {
-            logError(s"cannot start all workers after $timeout ms", ex)
+          if (workersWaitingTime >= workersWaitingTimeoutMs) {
+            logError(s"cannot start all workers after $workersWaitingTimeoutMs 
ms", ex)
             throw ex
           }
+          Thread.sleep(5000)
+          workersWaitingTime += 5000
       }
     }
     workerInfos.keySet
@@ -278,5 +300,7 @@ trait MiniClusterFeature extends Logging {
     workerInfos.clear()
     masterInfo._2.interrupt()
     MemoryManager.reset()
+
+    usedPorts.clear()
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
index 87bc65481..81342ba68 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
@@ -32,7 +32,7 @@ class ApiV1WorkerResourceSuite extends ApiV1BaseResourceSuite 
with MiniClusterFe
 
   override def beforeAll(): Unit = {
     logInfo("test initialized, setup celeborn mini cluster")
-    val (m, w) =
+    val (_, w) =
       setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, 
workerNum = 1)
     worker = w.head
     super.beforeAll()
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
index 8f0a862e6..5e28441a5 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
@@ -17,27 +17,25 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
-import org.apache.celeborn.common.util.CelebornExitKind
 import org.apache.celeborn.server.common.HttpService
 import 
org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
-import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
 
-class ApiWorkerResourceAuthenticationSuite extends 
ApiBaseResourceAuthenticationSuite {
+class ApiWorkerResourceAuthenticationSuite extends 
ApiBaseResourceAuthenticationSuite
+  with MiniClusterFeature {
   private var worker: Worker = _
   override protected def httpService: HttpService = worker
 
   override def beforeAll(): Unit = {
-    val workerArgs = new WorkerArguments(Array(), celebornConf)
-    worker = new Worker(celebornConf, workerArgs)
-    worker.metricsSystem.start()
-    worker.startHttpServer()
+    val (_, w) =
+      setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, 
workerNum = 1)
+    worker = w.head
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    worker.metricsSystem.stop()
-    worker.rpcEnv.shutdown()
-    worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+    shutdownMiniCluster()
   }
 }

Reply via email to