This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ceed216a3 [CELEBORN-1317][FOLLOWUP] Retry to setup mini cluster if the 
cause is BindException
ceed216a3 is described below

commit ceed216a3976c97e8ffb87be02ef30781dfe00ca
Author: Fei Wang <[email protected]>
AuthorDate: Thu Mar 28 10:28:47 2024 +0800

    [CELEBORN-1317][FOLLOWUP] Retry to setup mini cluster if the cause is 
BindException
    
    ### What changes were proposed in this pull request?
    To fix the UT for http server port already in use issue.
    
    For Jetty HttpServer, if failed to bind port, the exception is IOException 
and the cause is BindException, we should retry for that.
    
    Before:
    ```
        case e: BindException => // retry to setup mini cluster
    ```
    
    Now:
    ```
        case e: IOException
             if e.isInstanceOf[BindException] || Option(e.getCause).exists(
               _.isInstanceOf[BindException]) =>  // retry to setup mini cluster
    ```
    
    ### Why are the changes needed?
    
    To fix the UT for http server port already in use issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Will trigger GA for 3 three times.
    
    Closes #2424 from turboFei/set_connector_stop_timeout.
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../{ => http/api}/ApiMasterResourceSuite.scala    |  3 +-
 .../celeborn/server/common/http/HttpServer.scala   | 33 +++++++++++++---------
 .../service/deploy/MiniClusterFeature.scala        |  5 +++-
 .../api}/ApiWorkerResourceSuite.scala              | 22 +++++++--------
 4 files changed, 36 insertions(+), 27 deletions(-)

diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
similarity index 96%
rename from 
master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
rename to 
master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
index f5e06e6a5..5c272446f 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.master
+package org.apache.celeborn.service.deploy.master.http.api
 
 import javax.ws.rs.core.MediaType
 
@@ -25,6 +25,7 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
 import org.apache.celeborn.server.common.HttpService
 import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
+import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
 
 class ApiMasterResourceSuite extends ApiBaseResourceSuite {
   private var master: Master = _
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index 336132576..cbb61136e 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -44,27 +44,34 @@ private[celeborn] case class HttpServer(
       isStarted = true
     } catch {
       case e: Exception =>
-        stop(CelebornExitKind.EXIT_IMMEDIATELY)
+        stopInternal(CelebornExitKind.EXIT_IMMEDIATELY)
         throw e
     }
   }
 
   def stop(exitCode: Int): Unit = synchronized {
     if (isStarted) {
-      if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
-        server.setStopTimeout(0)
-      }
-      logInfo(s"$role: Stopping HttpServer")
-      server.stop()
-      connector.stop()
-      server.getThreadPool match {
-        case lifeCycle: LifeCycle => lifeCycle.stop()
-        case _ =>
-      }
-      logInfo(s"$role: HttpServer stopped.")
-      isStarted = false
+      stopInternal(exitCode)
     }
   }
+
+  private def stopInternal(exitCode: Int): Unit = {
+    if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
+      server.setStopTimeout(0)
+      connector.setStopTimeout(0)
+    }
+    logInfo(s"$role: Stopping HttpServer")
+    server.stop()
+    server.join()
+    connector.stop()
+    server.getThreadPool match {
+      case lifeCycle: LifeCycle => lifeCycle.stop()
+      case _ =>
+    }
+    logInfo(s"$role: HttpServer stopped.")
+    isStarted = false
+  }
+
   def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
 
   def addHandler(handler: Handler): Unit = synchronized {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index beecb76a5..914512cca 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.service.deploy
 
+import java.io.IOException
 import java.net.BindException
 import java.nio.file.Files
 import java.util.concurrent.locks.{Lock, ReentrantLock}
@@ -70,7 +71,9 @@ trait MiniClusterFeature extends Logging {
         workers = w
         created = true
       } catch {
-        case e: BindException =>
+        case e: IOException
+            if e.isInstanceOf[BindException] || Option(e.getCause).exists(
+              _.isInstanceOf[BindException]) =>
           logError(s"failed to setup mini cluster, retrying (retry count: 
$retryCount)", e)
           retryCount += 1
           if (retryCount == 3) {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
similarity index 75%
rename from 
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
rename to 
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
index cc09764d0..d950d0674 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
@@ -15,33 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker.storage
+package org.apache.celeborn.service.deploy.worker.http.api
 
 import javax.ws.rs.core.MediaType
 
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
 import org.apache.celeborn.server.common.HttpService
 import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
-import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
 
-class ApiWorkerResourceSuite extends ApiBaseResourceSuite {
+class ApiWorkerResourceSuite extends ApiBaseResourceSuite with 
MiniClusterFeature {
   private var worker: Worker = _
   override protected def httpService: HttpService = worker
 
   override def beforeAll(): Unit = {
-    val workerArgs = new WorkerArguments(Array(), celebornConf)
-    worker = new Worker(celebornConf, workerArgs)
-    worker.metricsSystem.start()
-    worker.startHttpServer()
+    logInfo("test initialized, setup celeborn mini cluster")
+    val (_, w) =
+      setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, 
workerNum = 1)
+    worker = w.head
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    worker.metricsSystem.stop()
-    worker.rpcEnv.shutdown()
-    worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+    logInfo("all test complete, stop celeborn mini cluster")
+    shutdownMiniCluster()
   }
 
   test("listPartitionLocationInfo") {

Reply via email to