This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2eae3db [SPARK-38613][CORE] Change the exception type thrown by
`PushBlockStreamCallback#abortIfNecessary` from `RuntimeException` to
`IllegalStateException`
2eae3db is described below
commit 2eae3dbdab0dbdea09928ab0252f0dfa13a94259
Author: yangjie01 <[email protected]>
AuthorDate: Wed Mar 23 09:59:02 2022 -0500
[SPARK-38613][CORE] Change the exception type thrown by
`PushBlockStreamCallback#abortIfNecessary` from `RuntimeException` to
`IllegalStateException`
### What changes were proposed in this pull request?
This pr change the exception type thrown by
`PushBlockStreamCallback#abortIfNecessary` from `RuntimeException` to
`IllegalStateException` and fixed the corresponding test case.
In addition, this PR fixes the bug of
`testWritingPendingBufsIsAbortedImmediatelyDuringComplete` in
`RemoteBlockPushResolverSuite`. `RuntimeException` throw by
https://github.com/apache/spark/blob/acb50d95a4952dea1cbbc27d4ddcc0b3432a13cf/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java#L818-L820
before this pr, but the test case expects it to be thrown by
https://github.com/apache/spark/blob/2ca5d1857a551ca4b11bdf8166beb0861cf4e3b6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java#L834-L840
### Why are the changes needed?
The `RuntimeException` is too broad, it should be specific.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA and fix related UTs
Closes #35923 from LuciferYang/SPARK-38613.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/network/shuffle/RemoteBlockPushResolver.java | 4 ++--
.../network/shuffle/RemoteBlockPushResolverSuite.java | 14 +++++++-------
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 62ab340..626a725 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -756,12 +756,12 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
/**
- * This throws RuntimeException if the number of IOExceptions have
exceeded threshold.
+ * @throws IllegalStateException if the number of IOExceptions have
exceeded threshold.
*/
private void abortIfNecessary() {
if
(partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
deferredBufs = null;
- throw new RuntimeException(String.format("%s when merging %s",
+ throw new IllegalStateException(String.format("%s when merging %s",
ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX,
streamId));
}
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 603b20c..f76afae 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -673,7 +673,7 @@ public class RemoteBlockPushResolverSuite {
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[] {4, 5}, new int[][]
{{0}, {1}});
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = IllegalStateException.class)
public void testIOExceptionsExceededThreshold() throws IOException {
RemoteBlockPushResolver.PushBlockStreamCallback callback =
(RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
@@ -708,7 +708,7 @@ public class RemoteBlockPushResolverSuite {
}
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = IllegalStateException.class)
public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws
IOException {
useTestFiles(true, false);
RemoteBlockPushResolver.PushBlockStreamCallback callback =
@@ -743,7 +743,7 @@ public class RemoteBlockPushResolverSuite {
}
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = IllegalStateException.class)
public void testRequestForAbortedShufflePartitionThrowsException() {
try {
testIOExceptionsDuringMetaUpdateIncreasesExceptionCount();
@@ -760,7 +760,7 @@ public class RemoteBlockPushResolverSuite {
}
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = IllegalStateException.class)
public void testPendingBlockIsAbortedImmediately() throws IOException {
useTestFiles(true, false);
RemoteBlockPushResolver.PushBlockStreamCallback callback =
@@ -793,7 +793,7 @@ public class RemoteBlockPushResolverSuite {
}
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = IllegalStateException.class)
public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete()
throws IOException {
useTestFiles(true, false);
RemoteBlockPushResolver.PushBlockStreamCallback callback =
@@ -817,7 +817,7 @@ public class RemoteBlockPushResolverSuite {
assertEquals(4, partitionInfo.getNumIOExceptions());
RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
(RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(TEST_APP, 1, 0, 0, 5, 0, 0));
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 5, 0, 0));
callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
// This is deferred
callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
@@ -834,7 +834,7 @@ public class RemoteBlockPushResolverSuite {
try {
callback.onComplete(callback.getID());
} catch (Throwable t) {
- assertEquals("IOExceptions exceeded the threshold when merging
shufflePush_0_0_0",
+ assertEquals("IOExceptions exceeded the threshold when merging
shufflePush_0_0_0_0",
t.getMessage());
throw t;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]