This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b4bbe4b15 [CELEBORN-1171] Add UT for LifecycleManager's async setup 
endpoints
b4bbe4b15 is described below

commit b4bbe4b15182882b4172b239bc22afba7babe9b4
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Fri Dec 15 11:00:13 2023 +0800

    [CELEBORN-1171] Add UT for LifecycleManager's async setup endpoints
    
    ### What changes were proposed in this pull request?
    as title
    
    ### Why are the changes needed?
    as title
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Passes GA
    
    Closes #2159 from waitinfuture/1171.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  | 97 ++++++++++++----------
 .../LifecycleManagerSetupEndpointSuite.scala       | 89 ++++++++++++++++++++
 ...nagerSuit.scala => LifecycleManagerSuite.scala} |  2 +-
 3 files changed, 142 insertions(+), 46 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 6f8565cc6..023e06642 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -343,6 +343,57 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       handleReportShuffleFetchFailure(context, appShuffleId, shuffleId)
   }
 
+  def setupEndpoints(
+      slots: WorkerResource,
+      shuffleId: Int,
+      connectFailedWorkers: ShuffleFailedWorkers): Unit = {
+    val futures = new util.LinkedList[(Future[RpcEndpointRef], WorkerInfo)]()
+    slots.asScala foreach { case (workerInfo, _) =>
+      val future = rpcEnv.asyncSetupEndpointRefByAddr(RpcEndpointAddress(
+        RpcAddress.apply(workerInfo.host, workerInfo.rpcPort),
+        WORKER_EP))
+      futures.add((future, workerInfo))
+    }
+
+    var timeout = conf.rpcAskTimeout.duration.toMillis
+    val delta = 50
+    while (timeout > 0 && !futures.isEmpty) {
+      val iter = futures.iterator
+      while (iter.hasNext) {
+        val (future, workerInfo) = iter.next()
+        if (future.isCompleted) {
+          future.value.get match {
+            case scala.util.Success(endpointRef) =>
+              workerInfo.endpoint = endpointRef
+            case scala.util.Failure(e) =>
+              logError(
+                s"Init rpc client failed for $shuffleId on $workerInfo during 
reserve slots.",
+                e)
+              connectFailedWorkers.put(
+                workerInfo,
+                (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+          }
+          iter.remove()
+        }
+      }
+
+      if (!futures.isEmpty) {
+        Thread.sleep(delta)
+        timeout -= delta
+      }
+    }
+    if (!futures.isEmpty) {
+      val iter = futures.iterator()
+      while (iter.hasNext) {
+        val (_, workerInfo) = iter.next()
+        logError(s"Init rpc client failed for $shuffleId on $workerInfo during 
reserve slots, reason: Timeout.")
+        connectFailedWorkers.put(
+          workerInfo,
+          (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+      }
+    }
+  }
+
   private def offerAndReserveSlots(
       context: RegisterCallContext,
       shuffleId: Int,
@@ -518,51 +569,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     val connectFailedWorkers = new ShuffleFailedWorkers()
 
     // Second, for each worker, try to initialize the endpoint.
-    val futures = new util.LinkedList[(Future[RpcEndpointRef], WorkerInfo)]()
-    slots.asScala foreach { case (workerInfo, _) =>
-      val future = rpcEnv.asyncSetupEndpointRefByAddr(RpcEndpointAddress(
-        RpcAddress.apply(workerInfo.host, workerInfo.rpcPort),
-        WORKER_EP))
-      futures.add((future, workerInfo))
-    }
-
-    var timeout = conf.rpcAskTimeout.duration.toMillis
-    val delta = 50
-    while (timeout > 0 && !futures.isEmpty) {
-      val iter = futures.iterator
-      while (iter.hasNext) {
-        val (future, workerInfo) = iter.next()
-        if (future.isCompleted) {
-          future.value.get match {
-            case scala.util.Success(endpointRef) =>
-              workerInfo.endpoint = endpointRef
-            case scala.util.Failure(e) =>
-              logError(
-                s"Init rpc client failed for $shuffleId on $workerInfo during 
reserve slots.",
-                e)
-              connectFailedWorkers.put(
-                workerInfo,
-                (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
-          }
-          iter.remove()
-        }
-      }
-
-      if (!futures.isEmpty) {
-        Thread.sleep(delta)
-        timeout -= delta
-      }
-    }
-    if (!futures.isEmpty) {
-      val iter = futures.iterator()
-      while (iter.hasNext) {
-        val (_, workerInfo) = iter.next()
-        logError(s"Init rpc client failed for $shuffleId on $workerInfo during 
reserve slots, reason: Timeout.")
-        connectFailedWorkers.put(
-          workerInfo,
-          (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
-      }
-    }
+    setupEndpoints(slots, shuffleId, connectFailedWorkers)
 
     
candidatesWorkers.removeAll(connectFailedWorkers.asScala.keys.toList.asJava)
     workerStatusTracker.recordWorkerFailure(connectFailedWorkers)
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSetupEndpointSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSetupEndpointSuite.scala
new file mode 100644
index 000000000..3c0a4ba22
--- /dev/null
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSetupEndpointSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.client
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
+import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.common.util.CelebornExitKind
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+
+class LifecycleManagerSetupEndpointSuite extends WithShuffleClientSuite with 
MiniClusterFeature {
+  private val masterPort = 19097
+
+  celebornConf.set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")
+    .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
+    .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val masterConf = Map(
+      "celeborn.master.host" -> "localhost",
+      "celeborn.master.port" -> masterPort.toString)
+    val workerConf = Map(
+      "celeborn.master.endpoints" -> s"localhost:$masterPort")
+    setUpMiniCluster(masterConf, workerConf)
+  }
+
+  test("test setup endpoints with all workers good") {
+    val lifecycleManager: LifecycleManager = new LifecycleManager(APP, 
celebornConf)
+    val ids = new util.ArrayList[Integer](100)
+    0 until 100 foreach { ids.add(_) }
+    val res = lifecycleManager.requestMasterRequestSlotsWithRetry(0, ids)
+    assert(res.status == StatusCode.SUCCESS)
+    assert(res.workerResource.keySet().size() == 3)
+
+    val connectFailedWorkers = new ShuffleFailedWorkers()
+    lifecycleManager.setupEndpoints(res.workerResource, 0, 
connectFailedWorkers)
+    assert(connectFailedWorkers.isEmpty)
+
+    lifecycleManager.stop()
+  }
+
+  test("test setup endpoints with one worker down") {
+    val lifecycleManager: LifecycleManager = new LifecycleManager(APP, 
celebornConf)
+    val ids = new util.ArrayList[Integer](100)
+    0 until 100 foreach {
+      ids.add(_)
+    }
+    val res = lifecycleManager.requestMasterRequestSlotsWithRetry(0, ids)
+    assert(res.status == StatusCode.SUCCESS)
+    assert(res.workerResource.keySet().size() == 3)
+
+    val firstWorker = workerInfos.keySet.head
+    firstWorker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+    firstWorker.rpcEnv.shutdown()
+
+    val connectFailedWorkers = new ShuffleFailedWorkers()
+    lifecycleManager.setupEndpoints(res.workerResource, 0, 
connectFailedWorkers)
+    assert(connectFailedWorkers.size() == 1)
+    assert(connectFailedWorkers.keySet().asScala.head == 
firstWorker.workerInfo)
+
+    lifecycleManager.stop()
+  }
+
+  override def afterAll(): Unit = {
+    logInfo("all test complete , stop celeborn mini cluster")
+    shutdownMiniCluster()
+  }
+}
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
similarity index 98%
rename from 
tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
rename to 
tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
index ac99d12fd..d310011c8 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 
-class LifecycleManagerSuit extends WithShuffleClientSuite with 
MiniClusterFeature {
+class LifecycleManagerSuite extends WithShuffleClientSuite with 
MiniClusterFeature {
   private val masterPort = 19097
 
   celebornConf.set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")

Reply via email to