This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 995ed46e6 [#825][followup] fix(spark): Fix without returning an
exception. (#1402)
995ed46e6 is described below
commit 995ed46e6e85c143dcf2b86c9fa0e05e903f6415
Author: yl09099 <[email protected]>
AuthorDate: Thu Dec 28 19:25:57 2023 +0800
[#825][followup] fix(spark): Fix without returning an exception. (#1402)
### What changes were proposed in this pull request?
Fixes exceptions that do not need to be returned.
### Why are the changes needed?
Fix: #825
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
---
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 8 ++++----
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 10 +++++-----
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 6c95de78a..4c9a62d1c 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -235,7 +235,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
} catch (Exception e) {
taskFailureCallback.apply(taskId);
if (shuffleManager.isRssResubmitStage()) {
- throw throwFetchFailedIfNecessary(e);
+ throwFetchFailedIfNecessary(e);
} else {
throw e;
}
@@ -466,7 +466,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
return
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc,
host, port);
}
- private RssException throwFetchFailedIfNecessary(Exception e) {
+ private void throwFetchFailedIfNecessary(Exception e) {
// The shuffleServer is registered only when a Block fails to be sent
if (e instanceof RssSendFailedException) {
Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
@@ -506,12 +506,12 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
shuffleId, -1, taskContext.stageAttemptNumber(), e);
- return new RssException(ffe);
+ throw new RssException(ffe);
}
} catch (IOException ioe) {
LOG.info("Error closing shuffle manager client with error:", ioe);
}
}
- return new RssException(e);
+ throw new RssException(e);
}
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4929e650c..2c0977071 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -231,13 +231,13 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
@Override
- public void write(Iterator<Product2<K, V>> records) throws IOException {
+ public void write(Iterator<Product2<K, V>> records) {
try {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
if (shuffleManager.isRssResubmitStage()) {
- throw throwFetchFailedIfNecessary(e);
+ throwFetchFailedIfNecessary(e);
} else {
throw e;
}
@@ -501,7 +501,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
return
ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc,
host, port);
}
- private RssException throwFetchFailedIfNecessary(Exception e) {
+ private void throwFetchFailedIfNecessary(Exception e) {
// The shuffleServer is registered only when a Block fails to be sent
if (e instanceof RssSendFailedException) {
Map<Long, BlockingQueue<ShuffleServerInfo>> failedBlockIds =
@@ -541,12 +541,12 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
shuffleId, -1, taskContext.stageAttemptNumber(), e);
- return new RssException(ffe);
+ throw new RssException(ffe);
}
} catch (IOException ioe) {
LOG.info("Error closing shuffle manager client with error:", ioe);
}
}
- return new RssException(e);
+ throw new RssException(e);
}
}