This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ceed216a3 [CELEBORN-1317][FOLLOWUP] Retry to setup mini cluster if the
cause is BindException
ceed216a3 is described below
commit ceed216a3976c97e8ffb87be02ef30781dfe00ca
Author: Fei Wang <[email protected]>
AuthorDate: Thu Mar 28 10:28:47 2024 +0800
[CELEBORN-1317][FOLLOWUP] Retry to setup mini cluster if the cause is
BindException
### What changes were proposed in this pull request?
To fix the UT for http server port already in use issue.
For Jetty HttpServer, if failed to bind port, the exception is IOException
and the cause is BindException, we should retry for that.
Before:
```
case e: BindException => // retry to setup mini cluster
```
Now:
```
case e: IOException
if e.isInstanceOf[BindException] || Option(e.getCause).exists(
_.isInstanceOf[BindException]) => // retry to setup mini cluster
```
### Why are the changes needed?
To fix the UT for http server port already in use issue.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Will trigger GA for 3 three times.
Closes #2424 from turboFei/set_connector_stop_timeout.
Authored-by: Fei Wang <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../{ => http/api}/ApiMasterResourceSuite.scala | 3 +-
.../celeborn/server/common/http/HttpServer.scala | 33 +++++++++++++---------
.../service/deploy/MiniClusterFeature.scala | 5 +++-
.../api}/ApiWorkerResourceSuite.scala | 22 +++++++--------
4 files changed, 36 insertions(+), 27 deletions(-)
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
similarity index 96%
rename from
master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
rename to
master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
index f5e06e6a5..5c272446f 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.master
+package org.apache.celeborn.service.deploy.master.http.api
import javax.ws.rs.core.MediaType
@@ -25,6 +25,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
+import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
class ApiMasterResourceSuite extends ApiBaseResourceSuite {
private var master: Master = _
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index 336132576..cbb61136e 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -44,27 +44,34 @@ private[celeborn] case class HttpServer(
isStarted = true
} catch {
case e: Exception =>
- stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ stopInternal(CelebornExitKind.EXIT_IMMEDIATELY)
throw e
}
}
def stop(exitCode: Int): Unit = synchronized {
if (isStarted) {
- if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
- server.setStopTimeout(0)
- }
- logInfo(s"$role: Stopping HttpServer")
- server.stop()
- connector.stop()
- server.getThreadPool match {
- case lifeCycle: LifeCycle => lifeCycle.stop()
- case _ =>
- }
- logInfo(s"$role: HttpServer stopped.")
- isStarted = false
+ stopInternal(exitCode)
}
}
+
+ private def stopInternal(exitCode: Int): Unit = {
+ if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
+ server.setStopTimeout(0)
+ connector.setStopTimeout(0)
+ }
+ logInfo(s"$role: Stopping HttpServer")
+ server.stop()
+ server.join()
+ connector.stop()
+ server.getThreadPool match {
+ case lifeCycle: LifeCycle => lifeCycle.stop()
+ case _ =>
+ }
+ logInfo(s"$role: HttpServer stopped.")
+ isStarted = false
+ }
+
def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
def addHandler(handler: Handler): Unit = synchronized {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index beecb76a5..914512cca 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -17,6 +17,7 @@
package org.apache.celeborn.service.deploy
+import java.io.IOException
import java.net.BindException
import java.nio.file.Files
import java.util.concurrent.locks.{Lock, ReentrantLock}
@@ -70,7 +71,9 @@ trait MiniClusterFeature extends Logging {
workers = w
created = true
} catch {
- case e: BindException =>
+ case e: IOException
+ if e.isInstanceOf[BindException] || Option(e.getCause).exists(
+ _.isInstanceOf[BindException]) =>
logError(s"failed to setup mini cluster, retrying (retry count:
$retryCount)", e)
retryCount += 1
if (retryCount == 3) {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
similarity index 75%
rename from
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
rename to
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
index cc09764d0..d950d0674 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
@@ -15,33 +15,31 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker.storage
+package org.apache.celeborn.service.deploy.worker.http.api
import javax.ws.rs.core.MediaType
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
-import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
-class ApiWorkerResourceSuite extends ApiBaseResourceSuite {
+class ApiWorkerResourceSuite extends ApiBaseResourceSuite with
MiniClusterFeature {
private var worker: Worker = _
override protected def httpService: HttpService = worker
override def beforeAll(): Unit = {
- val workerArgs = new WorkerArguments(Array(), celebornConf)
- worker = new Worker(celebornConf, workerArgs)
- worker.metricsSystem.start()
- worker.startHttpServer()
+ logInfo("test initialized, setup celeborn mini cluster")
+ val (_, w) =
+ setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap,
workerNum = 1)
+ worker = w.head
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
- worker.metricsSystem.stop()
- worker.rpcEnv.shutdown()
- worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ logInfo("all test complete, stop celeborn mini cluster")
+ shutdownMiniCluster()
}
test("listPartitionLocationInfo") {