This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 995ed46e6 [#825][followup] fix(spark): Fix without returning an 
exception. (#1402)
995ed46e6 is described below

commit 995ed46e6e85c143dcf2b86c9fa0e05e903f6415
Author: yl09099 <[email protected]>
AuthorDate: Thu Dec 28 19:25:57 2023 +0800

    [#825][followup] fix(spark): Fix without returning an exception. (#1402)
    
    ### What changes were proposed in this pull request?
    
    Fixes exceptions that do not need to be returned.
    
    ### Why are the changes needed?
    
    Fix: #825
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT.
---
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java |  8 ++++----
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 10 +++++-----
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 6c95de78a..4c9a62d1c 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -235,7 +235,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     } catch (Exception e) {
       taskFailureCallback.apply(taskId);
       if (shuffleManager.isRssResubmitStage()) {
-        throw throwFetchFailedIfNecessary(e);
+        throwFetchFailedIfNecessary(e);
       } else {
         throw e;
       }
@@ -466,7 +466,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     return 
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc, 
host, port);
   }
 
-  private RssException throwFetchFailedIfNecessary(Exception e) {
+  private void throwFetchFailedIfNecessary(Exception e) {
     // The shuffleServer is registered only when a Block fails to be sent
     if (e instanceof RssSendFailedException) {
       Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
@@ -506,12 +506,12 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
           FetchFailedException ffe =
               RssSparkShuffleUtils.createFetchFailedException(
                   shuffleId, -1, taskContext.stageAttemptNumber(), e);
-          return new RssException(ffe);
+          throw new RssException(ffe);
         }
       } catch (IOException ioe) {
         LOG.info("Error closing shuffle manager client with error:", ioe);
       }
     }
-    return new RssException(e);
+    throw new RssException(e);
   }
 }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4929e650c..2c0977071 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -231,13 +231,13 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   @Override
-  public void write(Iterator<Product2<K, V>> records) throws IOException {
+  public void write(Iterator<Product2<K, V>> records) {
     try {
       writeImpl(records);
     } catch (Exception e) {
       taskFailureCallback.apply(taskId);
       if (shuffleManager.isRssResubmitStage()) {
-        throw throwFetchFailedIfNecessary(e);
+        throwFetchFailedIfNecessary(e);
       } else {
         throw e;
       }
@@ -501,7 +501,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     return 
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc, 
host, port);
   }
 
-  private RssException throwFetchFailedIfNecessary(Exception e) {
+  private void throwFetchFailedIfNecessary(Exception e) {
     // The shuffleServer is registered only when a Block fails to be sent
     if (e instanceof RssSendFailedException) {
       Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
@@ -541,12 +541,12 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
           FetchFailedException ffe =
               RssSparkShuffleUtils.createFetchFailedException(
                   shuffleId, -1, taskContext.stageAttemptNumber(), e);
-          return new RssException(ffe);
+          throw new RssException(ffe);
         }
       } catch (IOException ioe) {
         LOG.info("Error closing shuffle manager client with error:", ioe);
       }
     }
-    return new RssException(e);
+    throw new RssException(e);
   }
 }

Reply via email to