This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new b520176e3 [CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots 
should check null for endpoint
b520176e3 is described below

commit b520176e3d67f73308a3edc5250bb117a857c901
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Nov 15 22:12:38 2023 +0800

    [CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots should check 
null for endpoint
    
    ### What changes were proposed in this pull request?
    When I kill -9 a Worker process, Master will not exclude the worker until 
heartbeat timeout.
    During this time, Master will still allocate slots on this Worker, causing 
NPE when register shuffle
    ```
    Caused by: java.lang.NullPointerException
            at 
org.apache.celeborn.client.LifecycleManager.requestWorkerReserveSlots(LifecycleManager.scala:1246)
 ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
            at 
org.apache.celeborn.client.LifecycleManager.$anonfun$reserveSlots$2(LifecycleManager.scala:864)
 ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
            at 
org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:301)
 ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
            at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) 
~[scala-library-2.12.15.jar:?]
            at scala.util.Success.$anonfun$map$1(Try.scala:255) 
~[scala-library-2.12.15.jar:?]
            at scala.util.Success.map(Try.scala:213) 
~[scala-library-2.12.15.jar:?]
            at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) 
~[scala-library-2.12.15.jar:?]
            at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) 
~[scala-library-2.12.15.jar:?]
            at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) 
~[scala-library-2.12.15.jar:?]
            at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) 
~[scala-library-2.12.15.jar:?]
            at 
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
 ~[?:1.8.0_372]
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
~[?:1.8.0_372]
            at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
~[?:1.8.0_372]
            at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
~[?:1.8.0_372]
            at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
~[?:1.8.0_372]
    ```
    
    ### Why are the changes needed?
    ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test and passes GA
    
    Closes #2104 from waitinfuture/1130.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 12d60522394607691ed066e2fd065db43a45a8e2)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  | 40 +++++++++++++---------
 1 file changed, 24 insertions(+), 16 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9f0ca9da4..cf54dfcd8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.{function, List => JList}
 import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, 
TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.function.Consumer
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -33,6 +35,7 @@ import 
org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
 import org.apache.celeborn.client.listener.WorkerStatusListener
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.client.MasterClient
+import org.apache.celeborn.common.exception.CelebornIOException
 import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, 
WorkerInfo}
@@ -720,22 +723,27 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       Math.min(Math.max(1, workerPartitionLocations.size), 
conf.clientRpcMaxParallelism)
     ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
       case (workerInfo, (primaryLocations, replicaLocations)) =>
-        val res = requestWorkerReserveSlots(
-          workerInfo.endpoint,
-          ReserveSlots(
-            appUniqueId,
-            shuffleId,
-            primaryLocations,
-            replicaLocations,
-            partitionSplitThreshold,
-            partitionSplitMode,
-            getPartitionType(shuffleId),
-            rangeReadFilter,
-            userIdentifier,
-            conf.pushDataTimeoutMs,
-            if (getPartitionType(shuffleId) == PartitionType.MAP)
-              conf.clientShuffleMapPartitionSplitEnabled
-            else true))
+        val res =
+          if (workerInfo.endpoint == null) {
+            ReserveSlotsResponse(StatusCode.REQUEST_FAILED, s"$workerInfo 
endpoint is NULL!")
+          } else {
+            requestWorkerReserveSlots(
+              workerInfo.endpoint,
+              ReserveSlots(
+                appUniqueId,
+                shuffleId,
+                primaryLocations,
+                replicaLocations,
+                partitionSplitThreshold,
+                partitionSplitMode,
+                getPartitionType(shuffleId),
+                rangeReadFilter,
+                userIdentifier,
+                conf.pushDataTimeoutMs,
+                if (getPartitionType(shuffleId) == PartitionType.MAP)
+                  conf.clientShuffleMapPartitionSplitEnabled
+                else true))
+          }
         if (res.status.equals(StatusCode.SUCCESS)) {
           logDebug(s"Successfully allocated " +
             s"partitions buffer for shuffleId $shuffleId" +

Reply via email to