This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new eafa56aff [CELEBORN-674] Support revive for empty locations
eafa56aff is described below
commit eafa56affd7469e8941567040df3aa8bbc628b3d
Author: Ethan Feng <[email protected]>
AuthorDate: Tue Jun 13 20:35:04 2023 +0800
[CELEBORN-674] Support revive for empty locations
### What changes were proposed in this pull request?
If some task retry or scheduled by speculation, the executors will keep
failing because revive does not support old partition is empty.
Celeborn will trigger stage end if all mapper task calls mapper end, this
is not what spark thinks a stage ends. So in this moment, kill spark executors
will cause the spark task to rerun the current stage. Shuffle client will need
to register shuffle first but get empty partition locations, and it will need
to revive to get the latest location with empty locations.
Here are logs example
```
23/06/13 08:32:11 ERROR ShuffleClientImpl: Exception raised while reviving
for shuffle 12 map 970 attempt 27 partition 152 epoch -1.
java.lang.NullPointerException
at
org.apache.celeborn.common.util.PbSerDeUtils$.toPbPartitionLocation(PbSerDeUtils.scala:258)
at
org.apache.celeborn.common.protocol.message.ControlMessages$Revive$.apply(ControlMessages.scala:207)
at
org.apache.celeborn.client.ShuffleClientImpl.revive(ShuffleClientImpl.java:573)
at
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:656)
at
org.apache.celeborn.client.ShuffleClientImpl.pushData(ShuffleClientImpl.java:984)
at
org.apache.celeborn.client.write.DataPusher.pushData(DataPusher.java:197)
at
org.apache.celeborn.client.write.DataPusher.access$500(DataPusher.java:38)
at
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:123)
23/06/13 08:32:11 ERROR Executor: Exception in task 21.27 in stage 54.1
(TID 17255)
org.apache.celeborn.common.exception.CelebornIOException: Revive for
shuffle spark-3df9647407f14c39868a17b7950899c5-12 partition 152 failed.
at
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:666)
at
org.apache.celeborn.client.ShuffleClientImpl.pushData(ShuffleClientImpl.java:984)
at
org.apache.celeborn.client.write.DataPusher.pushData(DataPusher.java:197)
at
org.apache.celeborn.client.write.DataPusher.access$500(DataPusher.java:38)
at
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:123)
23/06/13 08:32:11 INFO CoarseGrainedExecutorBackend: Got assigned task 17309
```
### Why are the changes needed?
To make shuffle client able to revive with empty locations.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
K8S cluster.
Closes #1586 from FMX/CELEBORN-674.
Authored-by: Ethan Feng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 1fca5da553639598bdaec70a13ac0e0727af2239)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 2 +-
.../scala/org/apache/celeborn/client/CommitManager.scala | 2 ++
.../scala/org/apache/celeborn/client/LifecycleManager.scala | 9 +++++++--
.../celeborn/common/protocol/message/ControlMessages.scala | 13 ++++++++-----
4 files changed, 18 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 587aa7746..611e1dd24 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -531,7 +531,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PartitionLocation oldLocation,
StatusCode cause) {
// Add ShuffleClient side blacklist
- if (shuffleClientPushBlacklistEnabled) {
+ if (shuffleClientPushBlacklistEnabled && oldLocation != null) {
if (cause == StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER) {
blacklist.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER) {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 40d1dc70f..0deda78c7 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -231,6 +231,8 @@ class CommitManager(appId: String, val conf: CelebornConf,
lifecycleManager: Lif
shuffleId: Int,
partitionLocation: PartitionLocation,
cause: Option[StatusCode]): Unit = {
+ // If a partition location is null, then the cause will be
PUSH_DATA_FAIL_NON_CRITICAL_CAUSE.
+ // So here is no need to check partition location is null or not.
if (batchHandleCommitPartitionEnabled && cause.isDefined && cause.get ==
StatusCode.HARD_SPLIT) {
val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
shuffleCommittedInfo.synchronized {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 792692607..aebf84fa0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -244,10 +244,15 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
val attemptId = pb.getAttemptId
val partitionId = pb.getPartitionId
val epoch = pb.getEpoch
- val oldPartition =
PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
+ val oldPartition =
+ if (pb.hasOldPartition) {
+ PbSerDeUtils.fromPbPartitionLocation(pb.getOldPartition)
+ } else {
+ null
+ }
val cause = Utils.toStatusCode(pb.getStatus)
logTrace(s"Received Revive request, " +
- s"$applicationId, $shuffleId, $mapId, $attemptId, ,$partitionId," +
+ s"$applicationId, $shuffleId, $mapId, $attemptId ,$partitionId," +
s" $epoch, $oldPartition, $cause.")
handleRevive(
context,
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 8009bc9fb..438369a42 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -196,17 +196,20 @@ object ControlMessages extends Logging {
partitionId: Int,
epoch: Int,
oldPartition: PartitionLocation,
- cause: StatusCode): PbRevive =
- PbRevive.newBuilder()
- .setApplicationId(appId)
+ cause: StatusCode): PbRevive = {
+ val builder = PbRevive.newBuilder()
+ builder.setApplicationId(appId)
.setShuffleId(shuffleId)
.setMapId(mapId)
.setAttemptId(attemptId)
.setPartitionId(partitionId)
.setEpoch(epoch)
- .setOldPartition(PbSerDeUtils.toPbPartitionLocation(oldPartition))
.setStatus(cause.getValue)
- .build()
+ if (oldPartition != null) {
+
builder.setOldPartition(PbSerDeUtils.toPbPartitionLocation(oldPartition))
+ }
+ builder.build()
+ }
}
object PartitionSplit {