This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 1aa13b2a5 [CELEBORN-2040] Avoid throw FetchFailedException when 
GetReducerFileGroupResponse failed via broadcast
1aa13b2a5 is described below

commit 1aa13b2a507a8b18227b25fe9fa0837f190f07ad
Author: caohaotian <[email protected]>
AuthorDate: Sun Jun 22 23:59:29 2025 -0700

    [CELEBORN-2040] Avoid throw FetchFailedException when 
GetReducerFileGroupResponse failed via broadcast
    
    ### What changes were proposed in this pull request?
    
    In our production environment, when obtaining GetReducerFileGroupResponse 
via broadcast[CELEBORN-1921], failures may occur due to reasons such 
as:Executor preemption or local disk errors when task writing broadcast data. 
These scenarios throw a CelebornIOException, which is eventually converted to a 
FetchFailedException.
    
    However, I think these errors are not caused by shuffle-related metadata 
loss, so a FetchFailedException should not be thrown to trigger a stage retry. 
Instead, the task should simply fail and be retried at the task level.
    
    ### Why are the changes needed?
    
    To reduce false positive fetch failures.
    
    case 1: During deserialization of the GetReducerFileGroupResponse 
broadcast, ExecutorLostFailure happend because Container was preempted,  leads 
to reporting a fetch failure.
    ```
    25/06/16 08:39:21 INFO Executor task launch worker for task 30724 
SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 0
    25/06/16 08:39:21 INFO Executor task launch worker for task 30724 
TorrentBroadcast: Started reading broadcast variable 7 with 3 pieces (estimated 
total size 12.0 MiB)
    ......
    
    25/06/16 08:39:21 ERROR Executor task launch worker for task 30724 
SparkUtils: Failed to deserialize GetReducerFileGroupResponse for shuffle: 0
    java.io.IOException: org.apache.spark.SparkException: Exception thrown in 
awaitResult:
            at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1387)
            at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
            at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
            at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
            at 
org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
            at 
org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
            at 
org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
            at 
org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
            at 
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
            at 
org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
            at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
            at 
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
            at 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
            at org.apache.spark.scheduler.Task.run(Task.scala:130)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
            at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
            at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:105)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
            at 
org.apache.spark.storage.BlockManagerMaster.getLocationsAndStatus(BlockManagerMaster.scala:93)
            at 
org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1179)
            at 
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1341)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:180)
            at 
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
            at scala.collection.immutable.List.foreach(List.scala:392)
            at 
org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:169)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:253)
            at scala.Option.getOrElse(Option.scala:189)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
            at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
            at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
            ... 40 more
    Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask739bc42 
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor66c2c5b0[Terminated, pool size 
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]
            at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
            at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
            at 
org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:264)
            at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:552)
            at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:556)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:104)
            ... 54 more
    
    25/06/16 08:39:21 ERROR Executor task launch worker for task 30723 
ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 0.
    org.apache.celeborn.common.exception.CelebornIOException: Failed to get 
GetReducerFileGroupResponse broadcast for shuffle: 0
            at 
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
    ......
    
    25/06/16 08:39:21 WARN Executor task launch worker for task 30724 
CelebornShuffleReader: Handle fetch exceptions for 0-0
    org.apache.celeborn.common.exception.CelebornIOException: Failed to load 
file group of shuffle 0 partition 4643! Failed to get 
GetReducerFileGroupResponse broadcast for shuffle: 0
            at 
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
    ......
    Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed 
to get GetReducerFileGroupResponse broadcast for shuffle: 0
            at 
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
            at 
org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
            at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
            at 
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
            ... 27 more
    ```
    case 2: During deserialization of the GetReducerFileGroupResponse 
broadcast, a failure to create the local directory leads to reporting a fetch 
failure.
    ```
    25/05/27 07:27:03 INFO Executor task launch worker for task 20399 
SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 1
    25/05/27 07:27:03 INFO Executor task launch worker for task 20399 
TorrentBroadcast: Started reading broadcast variable 5 with 1 pieces (estimated 
total size 4.0 MiB)
    25/05/27 07:27:03 INFO Executor task launch worker for task 20399 
TorrentBroadcast: Reading broadcast variable 5 took 0 ms
    25/05/27 07:27:03 INFO Executor task launch worker for task 20399 
MemoryStore: Block broadcast_5 stored as values in memory (estimated size 980.4 
KiB, free 6.3 GiB)
    25/05/27 07:27:03 WARN Executor task launch worker for task 20399 
BlockManager: Putting block broadcast_5 failed due to exception 
java.io.IOException: Failed to create local dir in 
/data12/hadoop/yarn/nm-local-dir/usercache/......
    25/05/27 07:27:03 WARN Executor task launch worker for task 20399 
BlockManager: Block broadcast_5 was not removed normally.
    25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 Utils: 
Exception encountered
    java.io.IOException: Failed to create local dir in 
/data12/hadoop/yarn/nm-local-dir/usercache/......
            at 
org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:93)
            at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:114)
            at 
org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2050)
            at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1574)
            at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1611)
            at 
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:1467)
            at 
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1936)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:262)
            at scala.Option.getOrElse(Option.scala:189)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
            at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
            at 
org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
            at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
            at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
            at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
            at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
            at 
org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
            at 
org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
            at 
org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
            at 
org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
            at 
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
            at 
org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
            at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877)
            at 
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
            at 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:60)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
            at org.apache.spark.scheduler.Task.run(Task.scala:130)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    
    25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 
ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 1.
    org.apache.celeborn.common.exception.CelebornIOException: Failed to get 
GetReducerFileGroupResponse broadcast for shuffle: 1
    ......
    
    25/05/27 07:27:03 WARN Executor task launch worker for task 20399 
CelebornShuffleReader: Handle fetch exceptions for 1-0
    org.apache.celeborn.common.exception.CelebornIOException: Failed to load 
file group of shuffle 1 partition 4001! Failed to get 
GetReducerFileGroupResponse broadcast for shuffle: 1
            at 
org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
    ......
    Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed 
to get GetReducerFileGroupResponse broadcast for shuffle: 1
            at 
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
    ......
    
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    When `ShuffleClient.deserializeReducerFileGroupResponse(shuffleId, 
response.broadcast())` return null, will not report a fetch failure, instead, 
the task will simply fail.
    
    ### How was this patch tested?
    Long-running Production Validation
    
    Closes #3341 from vastian180/CELEBORN-2040.
    
    Lead-authored-by: caohaotian <[email protected]>
    Co-authored-by: Fei Wang <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 4d4012e4c330aa828de9d9ee43beff2c81cfc2bd)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../shuffle/celeborn/CelebornShuffleReader.scala   |  6 +++++-
 .../shuffle/celeborn/CelebornShuffleReader.scala   |  9 ++++++---
 .../apache/celeborn/client/ShuffleClientImpl.java  |  3 ++-
 .../exception/CelebornBroadcastException.scala     | 23 ++++++++++++++++++++++
 4 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 2372305c5..192cc647d 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -33,7 +33,7 @@ import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.client.read.CelebornInputStream
 import org.apache.celeborn.client.read.MetricsCallback
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException, 
CelebornRuntimeException, PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornBroadcastException, 
CelebornIOException, CelebornRuntimeException, PartitionUnRetryAbleException}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.GetReducerFileGroupResponse
 import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}
 
@@ -122,6 +122,10 @@ class CelebornShuffleReader[K, C](
                 metricsCallback)
               streams.put(partitionId, inputStream)
             } catch {
+              case e: CelebornIOException
+                  if (e.getCause != null && 
e.getCause.isInstanceOf[CelebornBroadcastException]) =>
+                logError(s"Exception caught when readPartition $partitionId 
via broadcast!", e)
+                exceptionRef.compareAndSet(null, new IOException(e.getCause))
               case e: IOException =>
                 logError(s"Exception caught when readPartition $partitionId!", 
e)
                 exceptionRef.compareAndSet(null, e)
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 772576617..ff77eecf9 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -40,7 +40,7 @@ import org.apache.celeborn.client.{ClientUtils, ShuffleClient}
 import org.apache.celeborn.client.ShuffleClientImpl.ReduceFileGroups
 import org.apache.celeborn.client.read.{CelebornInputStream, MetricsCallback}
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException, 
CelebornRuntimeException, PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornBroadcastException, 
CelebornIOException, CelebornRuntimeException, PartitionUnRetryAbleException}
 import org.apache.celeborn.common.network.client.TransportClient
 import org.apache.celeborn.common.network.protocol.TransportMessage
 import org.apache.celeborn.common.protocol._
@@ -177,6 +177,7 @@ class CelebornShuffleReader[K, C](
         case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) 
=>
           // if a task is interrupted, should not report fetch failure
           // if a task update file group timeout, should not report fetch 
failure
+          // if a task GetReducerFileGroupResponse failed via broadcast, 
should not report fetch failure
           checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
       }
     } while (fileGroups == null)
@@ -499,8 +500,10 @@ class CelebornShuffleReader[K, C](
       ce: Throwable): Unit = {
     if (ce.getCause != null &&
       (ce.getCause.isInstanceOf[InterruptedException] || 
ce.getCause.isInstanceOf[
-        TimeoutException])) {
-      logWarning(s"fetch shuffle ${celebornShuffleId} timeout or interrupt", 
ce)
+        TimeoutException] || 
ce.getCause.isInstanceOf[CelebornBroadcastException])) {
+      logWarning(
+        s"fetch shuffle ${celebornShuffleId} timeout or interrupt or 
GetReducerFileGroupResponse failed via broadcast",
+        ce)
       throw ce
     } else {
       handleFetchExceptions(handle.shuffleId, celebornShuffleId, 0, ce)
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 903515336..d11455f2e 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -43,6 +43,7 @@ import org.apache.celeborn.client.compress.Compressor;
 import org.apache.celeborn.client.read.CelebornInputStream;
 import org.apache.celeborn.client.read.MetricsCallback;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornBroadcastException;
 import org.apache.celeborn.common.exception.CelebornIOException;
 import org.apache.celeborn.common.exception.CelebornRuntimeException;
 import org.apache.celeborn.common.identity.UserIdentifier;
@@ -1813,7 +1814,7 @@ public class ShuffleClientImpl extends ShuffleClient {
             response =
                 ShuffleClient.deserializeReducerFileGroupResponse(shuffleId, 
response.broadcast());
             if (response == null) {
-              throw new CelebornIOException(
+              throw new CelebornBroadcastException(
                   "Failed to get GetReducerFileGroupResponse broadcast for 
shuffle: " + shuffleId);
             }
           }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/exception/CelebornBroadcastException.scala
 
b/common/src/main/scala/org/apache/celeborn/common/exception/CelebornBroadcastException.scala
new file mode 100644
index 000000000..bb726e8b5
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/exception/CelebornBroadcastException.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.exception
+
+class CelebornBroadcastException(message: String, cause: Throwable)
+  extends CelebornIOException(message, cause) {
+  def this(message: String) = this(message, null)
+}

Reply via email to