This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new eafa56aff [CELEBORN-674] Support revive for empty locations
eafa56aff is described below

commit eafa56affd7469e8941567040df3aa8bbc628b3d
Author: Ethan Feng <[email protected]>
AuthorDate: Tue Jun 13 20:35:04 2023 +0800

    [CELEBORN-674] Support revive for empty locations
    
    ### What changes were proposed in this pull request?
    If some task retry or scheduled by speculation, the executors will keep 
failing because revive does not support old partition is empty.
    Celeborn will trigger stage end if all mapper task calls mapper end, this 
is not what spark thinks a stage ends.  So in this moment, kill spark executors 
will cause the spark task to rerun the current stage. Shuffle client will need 
to register shuffle first but get empty partition locations, and it will need 
to revive to get the latest location with empty locations.
    
    Here are logs example
    
    ```
    23/06/13 08:32:11 ERROR ShuffleClientImpl: Exception raised while reviving 
for shuffle 12 map 970 attempt 27 partition 152 epoch -1.
    java.lang.NullPointerException
            at 
org.apache.celeborn.common.util.PbSerDeUtils$.toPbPartitionLocation(PbSerDeUtils.scala:258)
            at 
org.apache.celeborn.common.protocol.message.ControlMessages$Revive$.apply(ControlMessages.scala:207)
            at 
org.apache.celeborn.client.ShuffleClientImpl.revive(ShuffleClientImpl.java:573)
            at 
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:656)
            at 
org.apache.celeborn.client.ShuffleClientImpl.pushData(ShuffleClientImpl.java:984)
            at 
org.apache.celeborn.client.write.DataPusher.pushData(DataPusher.java:197)
            at 
org.apache.celeborn.client.write.DataPusher.access$500(DataPusher.java:38)
            at 
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:123)
    23/06/13 08:32:11 ERROR Executor: Exception in task 21.27 in stage 54.1 
(TID 17255)
    org.apache.celeborn.common.exception.CelebornIOException: Revive for 
shuffle spark-3df9647407f14c39868a17b7950899c5-12 partition 152 failed.
            at 
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:666)
            at 
org.apache.celeborn.client.ShuffleClientImpl.pushData(ShuffleClientImpl.java:984)
            at 
org.apache.celeborn.client.write.DataPusher.pushData(DataPusher.java:197)
            at 
org.apache.celeborn.client.write.DataPusher.access$500(DataPusher.java:38)
            at 
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:123)
    23/06/13 08:32:11 INFO CoarseGrainedExecutorBackend: Got assigned task 17309
    ```
    
    ### Why are the changes needed?
    To make shuffle client able to revive with empty locations.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    K8S cluster.
    
    Closes #1586 from FMX/CELEBORN-674.
    
    Authored-by: Ethan Feng <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 1fca5da553639598bdaec70a13ac0e0727af2239)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../java/org/apache/celeborn/client/ShuffleClientImpl.java  |  2 +-
 .../scala/org/apache/celeborn/client/CommitManager.scala    |  2 ++
 .../scala/org/apache/celeborn/client/LifecycleManager.scala |  9 +++++++--
 .../celeborn/common/protocol/message/ControlMessages.scala  | 13 ++++++++-----
 4 files changed, 18 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 587aa7746..611e1dd24 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -531,7 +531,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       PartitionLocation oldLocation,
       StatusCode cause) {
     // Add ShuffleClient side blacklist
-    if (shuffleClientPushBlacklistEnabled) {
+    if (shuffleClientPushBlacklistEnabled && oldLocation != null) {
       if (cause == StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER) {
         blacklist.add(oldLocation.hostAndPushPort());
       } else if (cause == StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER) {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 40d1dc70f..0deda78c7 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -231,6 +231,8 @@ class CommitManager(appId: String, val conf: CelebornConf, 
lifecycleManager: Lif
       shuffleId: Int,
       partitionLocation: PartitionLocation,
       cause: Option[StatusCode]): Unit = {
+    // If a partition location is null, then the cause will be 
PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.
+    // So here is no need to check partition location is null or not.
     if (batchHandleCommitPartitionEnabled && cause.isDefined && cause.get == 
StatusCode.HARD_SPLIT) {
       val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
       shuffleCommittedInfo.synchronized {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 792692607..aebf84fa0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -244,10 +244,15 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
       val attemptId = pb.getAttemptId
       val partitionId = pb.getPartitionId
       val epoch = pb.getEpoch
-      val oldPartition = 
PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
+      val oldPartition =
+        if (pb.hasOldPartition) {
+          PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
+        } else {
+          null
+        }
       val cause = Utils.toStatusCode(pb.getStatus)
       logTrace(s"Received Revive request, " +
-        s"$applicationId, $shuffleId, $mapId, $attemptId, ,$partitionId," +
+        s"$applicationId, $shuffleId, $mapId, $attemptId ,$partitionId," +
         s" $epoch, $oldPartition, $cause.")
       handleRevive(
         context,
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 8009bc9fb..438369a42 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -196,17 +196,20 @@ object ControlMessages extends Logging {
         partitionId: Int,
         epoch: Int,
         oldPartition: PartitionLocation,
-        cause: StatusCode): PbRevive =
-      PbRevive.newBuilder()
-        .setApplicationId(appId)
+        cause: StatusCode): PbRevive = {
+      val builder = PbRevive.newBuilder()
+      builder.setApplicationId(appId)
         .setShuffleId(shuffleId)
         .setMapId(mapId)
         .setAttemptId(attemptId)
         .setPartitionId(partitionId)
         .setEpoch(epoch)
-        .setOldPartition(PbSerDeUtils.toPbPartitionLocation(oldPartition))
         .setStatus(cause.getValue)
-        .build()
+      if (oldPartition != null) {
+        
builder.setOldPartition(PbSerDeUtils.toPbPartitionLocation(oldPartition))
+      }
+      builder.build()
+    }
   }
 
   object PartitionSplit {

Reply via email to