This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 610f40b39 [CELEBORN-1855] LifecycleManager return appshuffleId for non
barrier stage when fetch fail has been reported
610f40b39 is described below
commit 610f40b392a2d64518cde713e3b31f296ec7baf0
Author: lijianfu03 <[email protected]>
AuthorDate: Tue May 13 16:14:03 2025 +0800
[CELEBORN-1855] LifecycleManager return appshuffleId for non barrier stage
when fetch fail has been reported
for non barrier shuffle read stage,
LifecycleManager#handleGetShuffleIdForApp always return appshuffleId whether
fetch status is true or not.
As described in
[jira](https://issues.apache.org/jira/browse/CELEBORN-1855), If
LifecycleManager only returns appshuffleId whose fetch status is success, the
task will fail directly to "there is no finished map stage associated with",
but previous fetch fail event reported may not be fatal.So just give it a chance
(cherry picked from commit 045411ac34c749d8ca2d7bed8d0b9c3554f71287)
Signed-off-by: Shuang <[email protected]>
---
.../apache/spark/shuffle/celeborn/SparkUtils.java | 17 ++++++++++++-----
.../shuffle/celeborn/CelebornShuffleReader.scala | 22 +++++++++++++++++++---
.../apache/spark/shuffle/celeborn/SparkUtils.java | 17 ++++++++++++-----
.../shuffle/celeborn/CelebornShuffleReader.scala | 22 ++++++++++++++++++++--
.../apache/celeborn/client/DummyShuffleClient.java | 6 ++++--
.../org/apache/celeborn/client/ShuffleClient.java | 4 +++-
.../apache/celeborn/client/ShuffleClientImpl.java | 7 ++++---
.../apache/celeborn/client/LifecycleManager.scala | 18 ++++++++++++------
common/src/main/proto/TransportMessages.proto | 1 +
9 files changed, 87 insertions(+), 27 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4f38c9815..6cc4dc5b8 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.util.Utils;
public class SparkUtils {
@@ -131,11 +132,17 @@ public class SparkUtils {
Boolean isWriter) {
if (handle.throwsFetchFailure()) {
String appShuffleIdentifier =
getAppShuffleIdentifier(handle.shuffleId(), context);
- return client.getShuffleId(
- handle.shuffleId(),
- appShuffleIdentifier,
- isWriter,
- context instanceof BarrierTaskContext);
+ Tuple2<Integer, Boolean> res =
+ client.getShuffleId(
+ handle.shuffleId(),
+ appShuffleIdentifier,
+ isWriter,
+ context instanceof BarrierTaskContext);
+ if (!res._2) {
+ throw new CelebornRuntimeException(String.format("Get invalid shuffle
id %s", res._1));
+ } else {
+ return res._1;
+ }
} else {
return handle.shuffleId();
}
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 4a3092275..890971b45 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -32,7 +32,7 @@ import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.client.read.CelebornInputStream
import org.apache.celeborn.client.read.MetricsCallback
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException,
PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornIOException,
CelebornRuntimeException, PartitionUnRetryAbleException}
import org.apache.celeborn.common.util.ThreadUtils
class CelebornShuffleReader[K, C](
@@ -61,8 +61,24 @@ class CelebornShuffleReader[K, C](
override def read(): Iterator[Product2[K, C]] = {
val serializerInstance = dep.serializer.newInstance()
-
- val shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle,
context, false)
+ val shuffleId =
+ try {
+ SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
+ } catch {
+ case e: CelebornRuntimeException =>
+ logError(s"Failed to get shuffleId for appShuffleId
${handle.shuffleId}", e)
+ if (handle.throwsFetchFailure) {
+ throw new FetchFailedException(
+ null,
+ handle.shuffleId,
+ -1,
+ startPartition,
+ SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId,
+ e)
+ } else {
+ throw e
+ }
+ }
shuffleIdTracker.track(handle.shuffleId, shuffleId)
logDebug(
s"get shuffleId $shuffleId for appShuffleId ${handle.shuffleId}
attemptNum ${context.stageAttemptNumber()}")
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 47317474e..638d14149 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.reflect.DynConstructors;
import org.apache.celeborn.reflect.DynFields;
import org.apache.celeborn.reflect.DynMethods;
@@ -108,11 +109,17 @@ public class SparkUtils {
Boolean isWriter) {
if (handle.throwsFetchFailure()) {
String appShuffleIdentifier =
getAppShuffleIdentifier(handle.shuffleId(), context);
- return client.getShuffleId(
- handle.shuffleId(),
- appShuffleIdentifier,
- isWriter,
- context instanceof BarrierTaskContext);
+ Tuple2<Integer, Boolean> res =
+ client.getShuffleId(
+ handle.shuffleId(),
+ appShuffleIdentifier,
+ isWriter,
+ context instanceof BarrierTaskContext);
+ if (!res._2) {
+ throw new CelebornRuntimeException(String.format("Get invalid shuffle
id %s", res._1));
+ } else {
+ return res._1;
+ }
} else {
return handle.shuffleId();
}
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 99a5bb0fc..0433401ca 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -39,7 +39,7 @@ import org.apache.celeborn.client.{DummyShuffleClient,
ShuffleClient}
import org.apache.celeborn.client.ShuffleClientImpl.ReduceFileGroups
import org.apache.celeborn.client.read.{CelebornInputStream, MetricsCallback}
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException,
PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornIOException,
CelebornRuntimeException, PartitionUnRetryAbleException}
import org.apache.celeborn.common.network.client.TransportClient
import org.apache.celeborn.common.network.protocol.TransportMessage
import org.apache.celeborn.common.protocol._
@@ -77,7 +77,25 @@ class CelebornShuffleReader[K, C](
val startTime = System.currentTimeMillis()
val serializerInstance = newSerializerInstance(dep)
- val shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle,
context, false)
+ val shuffleId =
+ try {
+ SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
+ } catch {
+ case e: CelebornRuntimeException =>
+ logError(s"Failed to get shuffleId for appShuffleId
${handle.shuffleId}", e)
+ if (throwsFetchFailure) {
+ throw new FetchFailedException(
+ null,
+ handle.shuffleId,
+ -1,
+ -1,
+ startPartition,
+ SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" +
handle.shuffleId,
+ e)
+ } else {
+ throw e
+ }
+ }
shuffleIdTracker.track(handle.shuffleId, shuffleId)
logDebug(
s"get shuffleId $shuffleId for appShuffleId ${handle.shuffleId}
attemptNum ${context.stageAttemptNumber()}")
diff --git
a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
index e2f60aee9..5ecc3a5fe 100644
--- a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -31,6 +31,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import scala.Tuple2;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -176,9 +178,9 @@ public class DummyShuffleClient extends ShuffleClient {
}
@Override
- public int getShuffleId(
+ public Tuple2<Integer, Boolean> getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean
isBarrierStage) {
- return appShuffleId;
+ return Tuple2.apply(appShuffleId, true);
}
@Override
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index b4f96a0e0..8154ca57b 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
+import scala.Tuple2;
+
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -266,7 +268,7 @@ public abstract class ShuffleClient {
public abstract PushState getPushState(String mapKey);
- public abstract int getShuffleId(
+ public abstract Tuple2<Integer, Boolean> getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean
isBarrierStage);
/**
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b22cce93d..43f658988 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -100,7 +100,7 @@ public class ShuffleClientImpl extends ShuffleClient {
protected byte[] extension;
// key: appShuffleIdentifier, value: shuffleId
- protected Map<String, Integer> shuffleIdCache =
JavaUtils.newConcurrentHashMap();
+ protected Map<String, Tuple2<Integer, Boolean>> shuffleIdCache =
JavaUtils.newConcurrentHashMap();
// key: shuffleId, value: (partitionId, PartitionLocation)
final Map<Integer, ConcurrentHashMap<Integer, PartitionLocation>>
reducePartitionMap =
@@ -591,7 +591,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
@Override
- public int getShuffleId(
+ public Tuple2<Integer, Boolean> getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean
isBarrierStage) {
return shuffleIdCache.computeIfAbsent(
appShuffleIdentifier,
@@ -608,7 +608,8 @@ public class ShuffleClientImpl extends ShuffleClient {
pbGetShuffleId,
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
- return pbGetShuffleIdResponse.getShuffleId();
+ return Tuple2.apply(
+ pbGetShuffleIdResponse.getShuffleId(),
pbGetShuffleIdResponse.getSuccess());
});
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9630bbba0..814fbe922 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -827,7 +827,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
if (shuffleIds == null) {
logWarning(s"unknown appShuffleId $appShuffleId, maybe no shuffle data
for this shuffle")
val pbGetShuffleIdResponse =
-
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).build()
+
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).setSuccess(
+ true).build()
context.reply(pbGetShuffleIdResponse)
return
}
@@ -841,7 +842,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
shuffleIds.get(appShuffleIdentifier) match {
case Some((shuffleId, _)) =>
val pbGetShuffleIdResponse =
-
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
+
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
context.reply(pbGetShuffleIdResponse)
case None =>
Option(appShuffleDeterminateMap.get(appShuffleId)).map {
determinate =>
@@ -874,7 +875,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
newShuffleId
}
val pbGetShuffleIdResponse =
-
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
+
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
context.reply(pbGetShuffleIdResponse)
}.orElse(
throw new UnsupportedOperationException(
@@ -887,12 +888,17 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
val pbGetShuffleIdResponse = {
logDebug(
s"get shuffleId $shuffleId for appShuffleId $appShuffleId
appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
-
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
+
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
}
context.reply(pbGetShuffleIdResponse)
case None =>
- throw new UnsupportedOperationException(
- s"unexpected! there is no finished map stage associated with
appShuffleId $appShuffleId")
+ val pbGetShuffleIdResponse = {
+ logInfo(
+ s"there is no finished map stage associated with appShuffleId
$appShuffleId")
+
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).setSuccess(
+ false).build()
+ }
+ context.reply(pbGetShuffleIdResponse)
}
}
}
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 90040663f..72f16314e 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -375,6 +375,7 @@ message PbGetShuffleId {
message PbGetShuffleIdResponse {
int32 shuffleId = 1;
+ bool success = 2;
}
message PbReportShuffleFetchFailure {