This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 85d41757e85 [SPARK-44215][SHUFFLE] If num chunks are 0, then server 
should throw a RuntimeException
85d41757e85 is described below

commit 85d41757e855d97dee0f24f281f82249161c3d29
Author: Chandni Singh <singh.chan...@gmail.com>
AuthorDate: Tue Jul 4 18:46:18 2023 -0500

    [SPARK-44215][SHUFFLE] If num chunks are 0, then server should throw a 
RuntimeException
    
    ### What changes were proposed in this pull request?
    The executor expects `numChunks` to be > 0. If it is zero, then we see that 
the executor fails with
    ```
    23/06/20 19:07:37 ERROR task 2031.0 in stage 47.0 (TID 25018) Executor: 
Exception in task 2031.0 in stage 47.0 (TID 25018)
    java.lang.ArithmeticException: / by zero
            at 
org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
            at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
            at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
            at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
            at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
            at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    ```
    Because this is an `ArithmeticException`, the executor doesn't fallback. 
It's not a `FetchFailure` either, so the stage is not retried and the 
application fails.
    
    ### Why are the changes needed?
    The executor should fallback to fetch original blocks and not fail because 
this suggests that there is an issue with push-merged block.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Modified the existing UTs to validate that RuntimeException is thrown when 
numChunks are 0.
    
    Closes #41762 from otterc/SPARK-44215.
    
    Authored-by: Chandni Singh <singh.chan...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 3e72806bb421b103bf6e73518b80200ccdd58ce5)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   |  4 ++++
 .../shuffle/RemoteBlockPushResolverSuite.java      | 24 ++++++++++++++--------
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index df2d1fa12d1..04935cfd932 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -328,6 +328,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     int size = (int) indexFile.length();
     // First entry is the zero offset
     int numChunks = (size / Long.BYTES) - 1;
+    if (numChunks <= 0) {
+      throw new RuntimeException(String.format(
+          "Merged shuffle index file %s is empty", indexFile.getPath()));
+    }
     File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, 
shuffleMergeId, reduceId);
     if (!metaFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle meta file %s 
not found",
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 2526a94f429..0847121b0cc 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -281,7 +281,7 @@ public class RemoteBlockPushResolverSuite {
     verifyMetrics(4, 0, 0, 0, 0, 0, 4);
   }
 
-  @Test
+  @Test(expected = RuntimeException.class)
   public void testFailureAfterData() throws IOException {
     StreamCallbackWithID stream =
       pushResolver.receiveBlockDataAsStream(
@@ -289,12 +289,16 @@ public class RemoteBlockPushResolverSuite {
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
-    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
-    assertEquals("num-chunks", 0, blockMeta.getNumChunks());
-    verifyMetrics(4, 0, 0, 0, 0, 0, 4);
+    try {
+      pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
+    } catch (RuntimeException e) {
+      assertTrue(e.getMessage().contains("is empty"));
+      verifyMetrics(4, 0, 0, 0, 0, 0, 4);
+      throw e;
+    }
   }
 
-  @Test
+  @Test(expected = RuntimeException.class)
   public void testFailureAfterMultipleDataBlocks() throws IOException {
     StreamCallbackWithID stream =
       pushResolver.receiveBlockDataAsStream(
@@ -304,9 +308,13 @@ public class RemoteBlockPushResolverSuite {
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
-    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
-    assertEquals("num-chunks", 0, blockMeta.getNumChunks());
-    verifyMetrics(9, 0, 0, 0, 0, 0, 9);
+    try {
+      pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
+    } catch (RuntimeException e) {
+      assertTrue(e.getMessage().contains("is empty"));
+      verifyMetrics(9, 0, 0, 0, 0, 0, 9);
+      throw e;
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to