This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 1aa13b2a5 [CELEBORN-2040] Avoid throw FetchFailedException when
GetReducerFileGroupResponse failed via broadcast
1aa13b2a5 is described below
commit 1aa13b2a507a8b18227b25fe9fa0837f190f07ad
Author: caohaotian <[email protected]>
AuthorDate: Sun Jun 22 23:59:29 2025 -0700
[CELEBORN-2040] Avoid throw FetchFailedException when
GetReducerFileGroupResponse failed via broadcast
### What changes were proposed in this pull request?
In our production environment, when obtaining GetReducerFileGroupResponse
via broadcast[CELEBORN-1921], failures may occur due to reasons such
as:Executor preemption or local disk errors when task writing broadcast data.
These scenarios throw a CelebornIOException, which is eventually converted to a
FetchFailedException.
However, I think these errors are not caused by shuffle-related metadata
loss, so a FetchFailedException should not be thrown to trigger a stage retry.
Instead, the task should simply fail and be retried at the task level.
### Why are the changes needed?
To reduce false positive fetch failures.
case 1: During deserialization of the GetReducerFileGroupResponse
broadcast, ExecutorLostFailure happend because Container was preempted, leads
to reporting a fetch failure.
```
25/06/16 08:39:21 INFO Executor task launch worker for task 30724
SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 0
25/06/16 08:39:21 INFO Executor task launch worker for task 30724
TorrentBroadcast: Started reading broadcast variable 7 with 3 pieces (estimated
total size 12.0 MiB)
......
25/06/16 08:39:21 ERROR Executor task launch worker for task 30724
SparkUtils: Failed to deserialize GetReducerFileGroupResponse for shuffle: 0
java.io.IOException: org.apache.spark.SparkException: Exception thrown in
awaitResult:
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1387)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
at
org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
at
org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
at
org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
at
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
at
org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
at
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
at
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.scheduler.Task.run(Task.scala:130)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:105)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
at
org.apache.spark.storage.BlockManagerMaster.getLocationsAndStatus(BlockManagerMaster.scala:93)
at
org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1179)
at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1341)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:180)
at
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:169)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:253)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
... 40 more
Caused by: java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask739bc42
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor66c2c5b0[Terminated, pool size
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at
org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:264)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:552)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:556)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:104)
... 54 more
25/06/16 08:39:21 ERROR Executor task launch worker for task 30723
ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 0.
org.apache.celeborn.common.exception.CelebornIOException: Failed to get
GetReducerFileGroupResponse broadcast for shuffle: 0
at
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
......
25/06/16 08:39:21 WARN Executor task launch worker for task 30724
CelebornShuffleReader: Handle fetch exceptions for 0-0
org.apache.celeborn.common.exception.CelebornIOException: Failed to load
file group of shuffle 0 partition 4643! Failed to get
GetReducerFileGroupResponse broadcast for shuffle: 0
at
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
......
Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed
to get GetReducerFileGroupResponse broadcast for shuffle: 0
at
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
at
org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
at
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
... 27 more
```
case 2: During deserialization of the GetReducerFileGroupResponse
broadcast, a failure to create the local directory leads to reporting a fetch
failure.
```
25/05/27 07:27:03 INFO Executor task launch worker for task 20399
SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 1
25/05/27 07:27:03 INFO Executor task launch worker for task 20399
TorrentBroadcast: Started reading broadcast variable 5 with 1 pieces (estimated
total size 4.0 MiB)
25/05/27 07:27:03 INFO Executor task launch worker for task 20399
TorrentBroadcast: Reading broadcast variable 5 took 0 ms
25/05/27 07:27:03 INFO Executor task launch worker for task 20399
MemoryStore: Block broadcast_5 stored as values in memory (estimated size 980.4
KiB, free 6.3 GiB)
25/05/27 07:27:03 WARN Executor task launch worker for task 20399
BlockManager: Putting block broadcast_5 failed due to exception
java.io.IOException: Failed to create local dir in
/data12/hadoop/yarn/nm-local-dir/usercache/......
25/05/27 07:27:03 WARN Executor task launch worker for task 20399
BlockManager: Block broadcast_5 was not removed normally.
25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 Utils:
Exception encountered
java.io.IOException: Failed to create local dir in
/data12/hadoop/yarn/nm-local-dir/usercache/......
at
org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:93)
at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:114)
at
org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2050)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1574)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1611)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:1467)
at
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1936)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:262)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
at
org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
at
org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
at
org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
at
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
at
org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
at
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877)
at
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
at
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:60)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.scheduler.Task.run(Task.scala:130)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
25/05/27 07:27:03 ERROR Executor task launch worker for task 20399
ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 1.
org.apache.celeborn.common.exception.CelebornIOException: Failed to get
GetReducerFileGroupResponse broadcast for shuffle: 1
......
25/05/27 07:27:03 WARN Executor task launch worker for task 20399
CelebornShuffleReader: Handle fetch exceptions for 1-0
org.apache.celeborn.common.exception.CelebornIOException: Failed to load
file group of shuffle 1 partition 4001! Failed to get
GetReducerFileGroupResponse broadcast for shuffle: 1
at
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
......
Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed
to get GetReducerFileGroupResponse broadcast for shuffle: 1
at
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
......
```
### Does this PR introduce _any_ user-facing change?
When `ShuffleClient.deserializeReducerFileGroupResponse(shuffleId,
response.broadcast())` return null, will not report a fetch failure, instead,
the task will simply fail.
### How was this patch tested?
Long-running Production Validation
Closes #3341 from vastian180/CELEBORN-2040.
Lead-authored-by: caohaotian <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 4d4012e4c330aa828de9d9ee43beff2c81cfc2bd)
Signed-off-by: Wang, Fei <[email protected]>
---
.../shuffle/celeborn/CelebornShuffleReader.scala | 6 +++++-
.../shuffle/celeborn/CelebornShuffleReader.scala | 9 ++++++---
.../apache/celeborn/client/ShuffleClientImpl.java | 3 ++-
.../exception/CelebornBroadcastException.scala | 23 ++++++++++++++++++++++
4 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 2372305c5..192cc647d 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -33,7 +33,7 @@ import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.client.read.CelebornInputStream
import org.apache.celeborn.client.read.MetricsCallback
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException,
CelebornRuntimeException, PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornBroadcastException,
CelebornIOException, CelebornRuntimeException, PartitionUnRetryAbleException}
import
org.apache.celeborn.common.protocol.message.ControlMessages.GetReducerFileGroupResponse
import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}
@@ -122,6 +122,10 @@ class CelebornShuffleReader[K, C](
metricsCallback)
streams.put(partitionId, inputStream)
} catch {
+ case e: CelebornIOException
+ if (e.getCause != null &&
e.getCause.isInstanceOf[CelebornBroadcastException]) =>
+ logError(s"Exception caught when readPartition $partitionId
via broadcast!", e)
+ exceptionRef.compareAndSet(null, new IOException(e.getCause))
case e: IOException =>
logError(s"Exception caught when readPartition $partitionId!",
e)
exceptionRef.compareAndSet(null, e)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 772576617..ff77eecf9 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -40,7 +40,7 @@ import org.apache.celeborn.client.{ClientUtils, ShuffleClient}
import org.apache.celeborn.client.ShuffleClientImpl.ReduceFileGroups
import org.apache.celeborn.client.read.{CelebornInputStream, MetricsCallback}
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException,
CelebornRuntimeException, PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornBroadcastException,
CelebornIOException, CelebornRuntimeException, PartitionUnRetryAbleException}
import org.apache.celeborn.common.network.client.TransportClient
import org.apache.celeborn.common.network.protocol.TransportMessage
import org.apache.celeborn.common.protocol._
@@ -177,6 +177,7 @@ class CelebornShuffleReader[K, C](
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException)
=>
// if a task is interrupted, should not report fetch failure
// if a task update file group timeout, should not report fetch
failure
+ // if a task GetReducerFileGroupResponse failed via broadcast,
should not report fetch failure
checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
}
} while (fileGroups == null)
@@ -499,8 +500,10 @@ class CelebornShuffleReader[K, C](
ce: Throwable): Unit = {
if (ce.getCause != null &&
(ce.getCause.isInstanceOf[InterruptedException] ||
ce.getCause.isInstanceOf[
- TimeoutException])) {
- logWarning(s"fetch shuffle ${celebornShuffleId} timeout or interrupt",
ce)
+ TimeoutException] ||
ce.getCause.isInstanceOf[CelebornBroadcastException])) {
+ logWarning(
+ s"fetch shuffle ${celebornShuffleId} timeout or interrupt or
GetReducerFileGroupResponse failed via broadcast",
+ ce)
throw ce
} else {
handleFetchExceptions(handle.shuffleId, celebornShuffleId, 0, ce)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 903515336..d11455f2e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -43,6 +43,7 @@ import org.apache.celeborn.client.compress.Compressor;
import org.apache.celeborn.client.read.CelebornInputStream;
import org.apache.celeborn.client.read.MetricsCallback;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornBroadcastException;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.identity.UserIdentifier;
@@ -1813,7 +1814,7 @@ public class ShuffleClientImpl extends ShuffleClient {
response =
ShuffleClient.deserializeReducerFileGroupResponse(shuffleId,
response.broadcast());
if (response == null) {
- throw new CelebornIOException(
+ throw new CelebornBroadcastException(
"Failed to get GetReducerFileGroupResponse broadcast for
shuffle: " + shuffleId);
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/exception/CelebornBroadcastException.scala
b/common/src/main/scala/org/apache/celeborn/common/exception/CelebornBroadcastException.scala
new file mode 100644
index 000000000..bb726e8b5
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/exception/CelebornBroadcastException.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.exception
+
+class CelebornBroadcastException(message: String, cause: Throwable)
+ extends CelebornIOException(message, cause) {
+ def this(message: String) = this(message, null)
+}