This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 12d605223 [CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots
should check null for endpoint
12d605223 is described below
commit 12d60522394607691ed066e2fd065db43a45a8e2
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Nov 15 22:12:38 2023 +0800
[CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots should check
null for endpoint
### What changes were proposed in this pull request?
When I kill -9 a Worker process, Master will not exclude the worker until
heartbeat timeout.
During this time, Master will still allocate slots on this Worker, causing
NPE when register shuffle
```
Caused by: java.lang.NullPointerException
at
org.apache.celeborn.client.LifecycleManager.requestWorkerReserveSlots(LifecycleManager.scala:1246)
~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.celeborn.client.LifecycleManager.$anonfun$reserveSlots$2(LifecycleManager.scala:864)
~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:301)
~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
~[scala-library-2.12.15.jar:?]
at scala.util.Success.$anonfun$map$1(Try.scala:255)
~[scala-library-2.12.15.jar:?]
at scala.util.Success.map(Try.scala:213)
~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
~[scala-library-2.12.15.jar:?]
at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
~[scala-library-2.12.15.jar:?]
at
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
~[?:1.8.0_372]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
~[?:1.8.0_372]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
~[?:1.8.0_372]
at
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
~[?:1.8.0_372]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
~[?:1.8.0_372]
```
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test and passes GA
Closes #2104 from waitinfuture/1130.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 40 +++++++++++++---------
1 file changed, 24 insertions(+), 16 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9f0ca9da4..cf54dfcd8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
import java.util
import java.util.{function, List => JList}
import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture,
TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.function.Consumer
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -33,6 +35,7 @@ import
org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
+import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo,
WorkerInfo}
@@ -720,22 +723,27 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
Math.min(Math.max(1, workerPartitionLocations.size),
conf.clientRpcMaxParallelism)
ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
case (workerInfo, (primaryLocations, replicaLocations)) =>
- val res = requestWorkerReserveSlots(
- workerInfo.endpoint,
- ReserveSlots(
- appUniqueId,
- shuffleId,
- primaryLocations,
- replicaLocations,
- partitionSplitThreshold,
- partitionSplitMode,
- getPartitionType(shuffleId),
- rangeReadFilter,
- userIdentifier,
- conf.pushDataTimeoutMs,
- if (getPartitionType(shuffleId) == PartitionType.MAP)
- conf.clientShuffleMapPartitionSplitEnabled
- else true))
+ val res =
+ if (workerInfo.endpoint == null) {
+ ReserveSlotsResponse(StatusCode.REQUEST_FAILED, s"$workerInfo
endpoint is NULL!")
+ } else {
+ requestWorkerReserveSlots(
+ workerInfo.endpoint,
+ ReserveSlots(
+ appUniqueId,
+ shuffleId,
+ primaryLocations,
+ replicaLocations,
+ partitionSplitThreshold,
+ partitionSplitMode,
+ getPartitionType(shuffleId),
+ rangeReadFilter,
+ userIdentifier,
+ conf.pushDataTimeoutMs,
+ if (getPartitionType(shuffleId) == PartitionType.MAP)
+ conf.clientShuffleMapPartitionSplitEnabled
+ else true))
+ }
if (res.status.equals(StatusCode.SUCCESS)) {
logDebug(s"Successfully allocated " +
s"partitions buffer for shuffleId $shuffleId" +