This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2efdf755c [CELEBORN-1711][TEST] Fix flaky test caused by master/worker
setup issue
2efdf755c is described below
commit 2efdf755ccbc5d44d22b8f7ed3dadfa80afb3fb7
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Dec 17 10:45:40 2024 +0800
[CELEBORN-1711][TEST] Fix flaky test caused by master/worker setup issue
### What changes were proposed in this pull request?
1. retry on BindException when starting master/worker http server
2. record the used ports and pre-check whether the selected port is used or
bounded before binding
### Why are the changes needed?
To fix flaky test.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2906 from turboFei/retry_master_suite.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../ApiMasterResourceAuthenticationSuite.scala | 36 +----
.../deploy/master/MasterClusterFeature.scala | 157 +++++++++++++++++++++
.../service/deploy/master/MasterSuite.scala | 137 ++++++++++--------
.../master/http/api/ApiMasterResourceSuite.scala | 36 +----
.../http/api/v1/ApiV1MasterResourceSuite.scala | 36 +----
.../server/common/http/HttpTestHelper.scala | 1 -
.../service/deploy/MiniClusterFeature.scala | 62 +++++---
.../http/api/v1/ApiV1WorkerResourceSuite.scala | 2 +-
.../ApiWorkerResourceAuthenticationSuite.scala | 18 ++-
9 files changed, 296 insertions(+), 189 deletions(-)
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
index 50a3eaf6f..d96607197 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
@@ -17,50 +17,22 @@
package org.apache.celeborn.service.deploy.master
-import java.nio.file.Files
-
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.server.common.HttpService
import
org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
-class ApiMasterResourceAuthenticationSuite extends
ApiBaseResourceAuthenticationSuite {
+class ApiMasterResourceAuthenticationSuite extends
ApiBaseResourceAuthenticationSuite
+ with MasterClusterFeature {
private var master: Master = _
override protected def httpService: HttpService = master
- def getTmpDir(): String = {
- val tmpDir = Files.createTempDirectory(null).toFile
- tmpDir.deleteOnExit()
- tmpDir.getAbsolutePath
- }
-
override def beforeAll(): Unit = {
- val randomMasterPort = Utils.selectRandomInt(1024, 65535)
- val randomHttpPort = randomMasterPort + 1
- celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
- celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
- celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
- celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
- celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key,
randomHttpPort.toString)
-
- val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
- val masterArgs = new MasterArguments(args, celebornConf)
- master = new Master(celebornConf, masterArgs)
- ThreadUtils.newThread(
- new Runnable {
- override def run(): Unit = {
- master.initialize()
- }
- },
- "api-master-thread").start()
+ master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
- master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
- master.rpcEnv.shutdown()
+ shutdownMaster()
}
}
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala
new file mode 100644
index 000000000..65995bfc3
--- /dev/null
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master
+
+import java.io.IOException
+import java.net.{BindException, InetSocketAddress, Socket}
+import java.util.concurrent.TimeUnit
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+
+trait MasterClusterFeature extends Logging {
+ var masterInfo: (Master, Thread) = _
+
+ val maxRetries = 3
+ val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30)
+
+ class RunnerWrap[T](code: => T) extends Thread {
+ override def run(): Unit = {
+ Utils.tryLogNonFatalError(code)
+ }
+ }
+
+ val usedPorts = new java.util.HashSet[Integer]()
+ def portBounded(port: Int): Boolean = {
+ val socket = new Socket()
+ try {
+ socket.connect(new InetSocketAddress("localhost", port), 100)
+ true
+ } catch {
+ case _: IOException => false
+ } finally {
+ socket.close()
+ }
+ }
+ def selectRandomPort(): Int = synchronized {
+ val port = Utils.selectRandomInt(1024, 65535)
+ val portUsed = usedPorts.contains(port) || portBounded(port)
+ usedPorts.add(port)
+ if (portUsed) {
+ selectRandomPort()
+ } else {
+ port
+ }
+ }
+
+ def withRetryOnPortBindException(f: () => Unit): Unit = {
+ var retryCount = 0
+ var pass = false
+ while (!pass) {
+ try {
+ f()
+ pass = true
+ } catch {
+ case e: IOException
+ if e.isInstanceOf[BindException] || Option(e.getCause).exists(
+ _.isInstanceOf[BindException]) =>
+ logError(s"failed due to BindException, retrying (retry count:
$retryCount)", e)
+ retryCount += 1
+ if (retryCount == maxRetries) {
+ logError("failed due to reach the max retry count", e)
+ throw e
+ }
+ }
+ }
+ }
+
+ def setupMasterWithRandomPort(masterConf: Map[String, String] = Map()):
Master = {
+ var master: Master = null
+ withRetryOnPortBindException { () =>
+ val randomPort = selectRandomPort()
+ val randomInternalPort = selectRandomPort()
+ val finalMasterConf = Map(
+ s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
+ s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
+ s"${CelebornConf.MASTER_PORT.key}" -> s"$randomPort",
+ s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort",
+ s"${CelebornConf.MASTER_INTERNAL_PORT.key}" -> s"$randomInternalPort",
+ s"${CelebornConf.MASTER_INTERNAL_ENDPOINTS.key}" ->
s"localhost:$randomInternalPort") ++
+ masterConf
+ master = setUpMaster(masterConf = finalMasterConf)
+ }
+ master
+ }
+
+ private def createMaster(map: Map[String, String] = null): Master = {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.METRICS_ENABLED.key, "false")
+ val httpPort = selectRandomPort()
+ conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort")
+ logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort")
+ if (map != null) {
+ map.foreach(m => conf.set(m._1, m._2))
+ }
+
+ val masterArguments = new MasterArguments(Array(), conf)
+ val master = new Master(conf, masterArguments)
+ if (conf.metricsSystemEnable) {
+ master.metricsSystem.start()
+ }
+ master.startHttpServer()
+
+ Thread.sleep(3000L)
+ master
+ }
+
+ def setUpMaster(masterConf: Map[String, String] = null): Master = {
+ val master = createMaster(masterConf)
+ val masterStartedSignal = Array(false)
+ val masterThread = new RunnerWrap({
+ try {
+ masterStartedSignal(0) = true
+ master.rpcEnv.awaitTermination()
+ } catch {
+ case ex: Exception =>
+ masterStartedSignal(0) = false
+ throw ex
+ }
+ })
+ masterThread.start()
+ masterInfo = (master, masterThread)
+ var masterStartWaitingTime = 0
+ while (!masterStartedSignal.head) {
+ logInfo("waiting for master node starting")
+ if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
+ throw new BindException("cannot start master rpc endpoint")
+ }
+ Thread.sleep(3000)
+ masterStartWaitingTime += 3000
+ }
+ master
+ }
+
+ def shutdownMaster(): Unit = {
+ masterInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ masterInfo._1.rpcEnv.shutdown()
+ Thread.sleep(3000)
+ masterInfo._2.interrupt()
+ usedPorts.clear()
+ }
+}
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index ed27aac48..7b7a7a1f6 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -24,14 +24,13 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout,
PbRegisterWorker}
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils}
class MasterSuite extends AnyFunSuite
with BeforeAndAfterAll
with BeforeAndAfterEach
- with Logging {
+ with MasterClusterFeature {
def getTmpDir(): String = {
val tmpDir = Files.createTempDirectory(null).toFile
@@ -40,72 +39,86 @@ class MasterSuite extends AnyFunSuite
}
test("test single node startup functionality") {
- val conf = new CelebornConf()
- val randomMasterPort = Utils.selectRandomInt(1024, 65535)
- val randomHttpPort = randomMasterPort + 1
- conf.set(CelebornConf.HA_ENABLED.key, "false")
- conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
- conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
- conf.set(CelebornConf.METRICS_ENABLED.key, "true")
- conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
- conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
-
- val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
- val masterArgs = new MasterArguments(args, conf)
- val master = new Master(conf, masterArgs)
- ThreadUtils.newThread(
- new Runnable {
- override def run(): Unit = {
- master.initialize()
- }
- },
- "master-init-thread").start()
- Thread.sleep(5000L)
- master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
- master.rpcEnv.shutdown()
+ withRetryOnPortBindException { () =>
+ val conf = new CelebornConf()
+ val randomMasterPort = selectRandomPort()
+ val randomHttpPort = selectRandomPort()
+ conf.set(CelebornConf.HA_ENABLED.key, "false")
+ conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+ conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+ conf.set(CelebornConf.METRICS_ENABLED.key, "true")
+ conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+ conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
+
+ val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+
+ val masterArgs = new MasterArguments(args, conf)
+ val master = new Master(conf, masterArgs)
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ },
+ "master-init-thread").start()
+ Thread.sleep(5000L)
+ master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ master.rpcEnv.shutdown()
+ }
}
test("test dedicated internal port receives") {
- val conf = new CelebornConf()
- conf.set(CelebornConf.HA_ENABLED.key, "false")
- conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
- conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
- conf.set(CelebornConf.METRICS_ENABLED.key, "true")
- conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
-
- val args = Array("-h", "localhost", "-p", "9097", "--internal-port",
"8097")
-
- val masterArgs = new MasterArguments(args, conf)
- val master = new Master(conf, masterArgs)
- ThreadUtils.newThread(
- new Runnable {
- override def run(): Unit = {
- master.initialize()
- }
- },
- "master-init-thread").start()
- Thread.sleep(5000L)
- master.receive.applyOrElse(
- PbCheckForWorkerTimeout.newBuilder().build(),
- (_: Any) => fail("Unexpected message"))
- master.internalRpcEndpoint.receive.applyOrElse(
- PbCheckForWorkerTimeout.newBuilder().build(),
- (_: Any) => fail("Unexpected message"))
-
- master.internalRpcEndpoint.receiveAndReply(
-
mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse(
- PbRegisterWorker.newBuilder().build(),
- (_: Any) => fail("Unexpected message"))
- master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
- master.rpcEnv.shutdown()
- master.internalRpcEnvInUse.shutdown()
+ withRetryOnPortBindException { () =>
+ val conf = new CelebornConf()
+ val randomMasterPort = selectRandomPort()
+ val randomHttpPort = selectRandomPort()
+ val randomInternalPort = selectRandomPort()
+ conf.set(CelebornConf.HA_ENABLED.key, "false")
+ conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+ conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+ conf.set(CelebornConf.METRICS_ENABLED.key, "true")
+ conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
+ conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
+
+ val args = Array(
+ "-h",
+ "localhost",
+ "-p",
+ randomMasterPort.toString,
+ "--internal-port",
+ randomInternalPort.toString)
+
+ val masterArgs = new MasterArguments(args, conf)
+ val master = new Master(conf, masterArgs)
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ },
+ "master-init-thread").start()
+ Thread.sleep(5000L)
+ master.receive.applyOrElse(
+ PbCheckForWorkerTimeout.newBuilder().build(),
+ (_: Any) => fail("Unexpected message"))
+ master.internalRpcEndpoint.receive.applyOrElse(
+ PbCheckForWorkerTimeout.newBuilder().build(),
+ (_: Any) => fail("Unexpected message"))
+
+ master.internalRpcEndpoint.receiveAndReply(
+
mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse(
+ PbRegisterWorker.newBuilder().build(),
+ (_: Any) => fail("Unexpected message"))
+ master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ master.rpcEnv.shutdown()
+ master.internalRpcEnvInUse.shutdown()
+ }
}
test("test master worker host allow and deny pattern") {
val conf = new CelebornConf()
- val randomMasterPort = Utils.selectRandomInt(1024, 65535)
- val randomHttpPort = randomMasterPort + 1
+ val randomMasterPort = selectRandomPort()
+ val randomHttpPort = selectRandomPort()
conf.set(CelebornConf.HA_ENABLED.key, "false")
conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
index fba331bb2..d34377f08 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
@@ -17,54 +17,26 @@
package org.apache.celeborn.service.deploy.master.http.api
-import java.nio.file.Files
import javax.ws.rs.client.Entity
import javax.ws.rs.core.{Form, MediaType}
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
-import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
+import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature}
-class ApiMasterResourceSuite extends ApiBaseResourceSuite {
+class ApiMasterResourceSuite extends ApiBaseResourceSuite with
MasterClusterFeature {
private var master: Master = _
override protected def httpService: HttpService = master
- def getTmpDir(): String = {
- val tmpDir = Files.createTempDirectory(null).toFile
- tmpDir.deleteOnExit()
- tmpDir.getAbsolutePath
- }
-
override def beforeAll(): Unit = {
- val randomMasterPort = Utils.selectRandomInt(1024, 65535)
- val randomHttpPort = randomMasterPort + 1
- celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
- celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
- celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
- celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
- celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key,
randomHttpPort.toString)
-
- val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
- val masterArgs = new MasterArguments(args, celebornConf)
- master = new Master(celebornConf, masterArgs)
- ThreadUtils.newThread(
- new Runnable {
- override def run(): Unit = {
- master.initialize()
- }
- },
- "master-init-thread").start()
+ master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
- master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
- master.rpcEnv.shutdown()
+ shutdownMaster()
}
test("masterGroupInfo") {
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
index edcec11c0..69dd19e4d 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
@@ -17,57 +17,29 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
-import java.nio.file.Files
import java.util.Collections
import javax.servlet.http.HttpServletResponse
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.rest.v1.model.{ApplicationsResponse,
ExcludeWorkerRequest, HandleResponse, HostnamesResponse,
RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, ShufflesResponse,
WorkerEventsResponse, WorkerId, WorkersResponse}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite
-import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
+import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature}
-class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite {
+class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite with
MasterClusterFeature {
private var master: Master = _
override protected def httpService: HttpService = master
- def getTmpDir(): String = {
- val tmpDir = Files.createTempDirectory(null).toFile
- tmpDir.deleteOnExit()
- tmpDir.getAbsolutePath
- }
-
override def beforeAll(): Unit = {
- val randomMasterPort = Utils.selectRandomInt(1024, 65535)
- val randomHttpPort = randomMasterPort + 1
- celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
- celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
- celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
- celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
- celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key,
randomHttpPort.toString)
-
- val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
-
- val masterArgs = new MasterArguments(args, celebornConf)
- master = new Master(celebornConf, masterArgs)
- ThreadUtils.newThread(
- new Runnable {
- override def run(): Unit = {
- master.initialize()
- }
- },
- "master-init-thread").start()
+ master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
- master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
- master.rpcEnv.shutdown()
+ shutdownMaster()
}
test("shuffle resource") {
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
index 6a6ee9242..8674fc39c 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
@@ -64,7 +64,6 @@ trait HttpTestHelper extends AnyFunSuite
override def beforeAll(): Unit = {
super.beforeAll()
restApiBaseSuite.setUp()
- Thread.sleep(1000) // sleep for http server initialization
}
override def afterAll(): Unit = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index f393872f9..8c3a47a07 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.service.deploy
import java.io.IOException
-import java.net.BindException
+import java.net.{BindException, InetSocketAddress, Socket}
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@@ -28,7 +28,6 @@ import scala.collection.mutable
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
-import org.apache.celeborn.common.util.Utils.selectRandomInt
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -39,6 +38,10 @@ trait MiniClusterFeature extends Logging {
val workerInfos = new mutable.HashMap[Worker, Thread]()
var workerConfForAdding: Map[String, String] = _
+ val maxRetries = 4
+ val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
+ val workersWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
+
class RunnerWrap[T](code: => T) extends Thread {
override def run(): Unit = {
@@ -46,6 +49,29 @@ trait MiniClusterFeature extends Logging {
}
}
+ val usedPorts = new java.util.HashSet[Integer]()
+ def portBounded(port: Int): Boolean = {
+ val socket = new Socket()
+ try {
+ socket.connect(new InetSocketAddress("localhost", port), 100)
+ true
+ } catch {
+ case _: IOException => false
+ } finally {
+ socket.close()
+ }
+ }
+ def selectRandomPort(): Int = synchronized {
+ val port = Utils.selectRandomInt(1024, 65535)
+ val portUsed = usedPorts.contains(port) || portBounded(port)
+ usedPorts.add(port)
+ if (portUsed) {
+ selectRandomPort()
+ } else {
+ port
+ }
+ }
+
def setupMiniClusterWithRandomPorts(
masterConf: Map[String, String] = Map(),
workerConf: Map[String, String] = Map(),
@@ -56,8 +82,8 @@ trait MiniClusterFeature extends Logging {
var workers: collection.Set[Worker] = null
while (!created) {
try {
- val randomPort = selectRandomInt(1024, 65535)
- val randomInternalPort = selectRandomInt(1024, 65535)
+ val randomPort = selectRandomPort()
+ val randomInternalPort = selectRandomPort()
val finalMasterConf = Map(
s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
@@ -84,7 +110,7 @@ trait MiniClusterFeature extends Logging {
_.isInstanceOf[BindException]) =>
logError(s"failed to setup mini cluster, retrying (retry count:
$retryCount)", e)
retryCount += 1
- if (retryCount == 3) {
+ if (retryCount == maxRetries) {
logError("failed to setup mini cluster, reached the max retry
count", e)
throw e
}
@@ -103,7 +129,7 @@ trait MiniClusterFeature extends Logging {
private def createMaster(map: Map[String, String] = null): Master = {
val conf = new CelebornConf()
conf.set(CelebornConf.METRICS_ENABLED.key, "false")
- val httpPort = selectRandomInt(1024, 65535)
+ val httpPort = selectRandomPort()
conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort")
logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort")
if (map != null) {
@@ -128,7 +154,7 @@ trait MiniClusterFeature extends Logging {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
- conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomInt(1024,
65535)}")
+ conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")
conf.set("celeborn.fetch.io.threads", "4")
conf.set("celeborn.push.io.threads", "4")
if (map != null) {
@@ -151,7 +177,6 @@ trait MiniClusterFeature extends Logging {
}
def setUpMaster(masterConf: Map[String, String] = null): Master = {
- val timeout = 30000
val master = createMaster(masterConf)
val masterStartedSignal = Array(false)
val masterThread = new RunnerWrap({
@@ -167,13 +192,13 @@ trait MiniClusterFeature extends Logging {
masterThread.start()
masterInfo = (master, masterThread)
var masterStartWaitingTime = 0
- Thread.sleep(5000)
while (!masterStartedSignal.head) {
logInfo("waiting for master node starting")
- masterStartWaitingTime += 5000
- if (masterStartWaitingTime >= timeout) {
+ if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
throw new BindException("cannot start master rpc endpoint")
}
+ Thread.sleep(5000)
+ masterStartWaitingTime += 5000
}
master
}
@@ -181,7 +206,6 @@ trait MiniClusterFeature extends Logging {
def setUpWorkers(
workerConf: Map[String, String] = null,
workerNum: Int = 3): collection.Set[Worker] = {
- val timeout = 30000
val workers = new Array[Worker](workerNum)
val flagUpdateLock = new ReentrantLock()
val threads = (1 to workerNum).map { i =>
@@ -204,11 +228,9 @@ trait MiniClusterFeature extends Logging {
workerStarted = false
workerStartRetry += 1
logError(s"cannot start worker $i, retrying: ", ex)
- if (workerStartRetry == 3) {
+ if (workerStartRetry == maxRetries) {
logError(s"cannot start worker $i, reached to max retrying",
ex)
throw ex
- } else {
- TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
}
}
}
@@ -238,12 +260,12 @@ trait MiniClusterFeature extends Logging {
} catch {
case ex: Throwable =>
logError("all workers haven't been started retrying", ex)
- Thread.sleep(5000)
- workersWaitingTime += 5000
- if (workersWaitingTime >= timeout) {
- logError(s"cannot start all workers after $timeout ms", ex)
+ if (workersWaitingTime >= workersWaitingTimeoutMs) {
+ logError(s"cannot start all workers after $workersWaitingTimeoutMs
ms", ex)
throw ex
}
+ Thread.sleep(5000)
+ workersWaitingTime += 5000
}
}
workerInfos.keySet
@@ -278,5 +300,7 @@ trait MiniClusterFeature extends Logging {
workerInfos.clear()
masterInfo._2.interrupt()
MemoryManager.reset()
+
+ usedPorts.clear()
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
index 87bc65481..81342ba68 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala
@@ -32,7 +32,7 @@ class ApiV1WorkerResourceSuite extends ApiV1BaseResourceSuite
with MiniClusterFe
override def beforeAll(): Unit = {
logInfo("test initialized, setup celeborn mini cluster")
- val (m, w) =
+ val (_, w) =
setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap,
workerNum = 1)
worker = w.head
super.beforeAll()
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
index 8f0a862e6..5e28441a5 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
@@ -17,27 +17,25 @@
package org.apache.celeborn.service.deploy.worker.storage
-import org.apache.celeborn.common.util.CelebornExitKind
import org.apache.celeborn.server.common.HttpService
import
org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
-import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
-class ApiWorkerResourceAuthenticationSuite extends
ApiBaseResourceAuthenticationSuite {
+class ApiWorkerResourceAuthenticationSuite extends
ApiBaseResourceAuthenticationSuite
+ with MiniClusterFeature {
private var worker: Worker = _
override protected def httpService: HttpService = worker
override def beforeAll(): Unit = {
- val workerArgs = new WorkerArguments(Array(), celebornConf)
- worker = new Worker(celebornConf, workerArgs)
- worker.metricsSystem.start()
- worker.startHttpServer()
+ val (_, w) =
+ setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap,
workerNum = 1)
+ worker = w.head
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
- worker.metricsSystem.stop()
- worker.rpcEnv.shutdown()
- worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ shutdownMiniCluster()
}
}