This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e1c61  [SPARK-36255][SHUFFLE][CORE] Stop pushing and retrying on 
FileNotFound exceptions
09e1c61 is described below

commit 09e1c612729292a9a000a2d64267ab874d71a1e5
Author: Chandni Singh <singh.chan...@gmail.com>
AuthorDate: Sat Jul 24 21:09:11 2021 +0800

    [SPARK-36255][SHUFFLE][CORE] Stop pushing and retrying on FileNotFound 
exceptions
    
    ### What changes were proposed in this pull request?
    Once the shuffle is cleaned up by the `ContextCleaner`, the shuffle files 
are deleted by the executors. In this case, the push of the shuffle data by the 
executors can throw `FileNotFoundException`s because the shuffle files are 
deleted. When this exception is thrown from the `shuffle-block-push-thread`, it 
causes the executor to exit. Both the `shuffle-block-push` threads and the 
netty event-loops will encounter `FileNotFoundException`s in this case.  The 
fix here stops these threads [...]
    
    ### Why are the changes needed?
    This fixes the bug which causes executor to exits when they are instructed 
to clean up shuffle data.
    Below is the stacktrace of this exception:
    ```
    21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught 
exception in thread Thread[block-push-thread-1,5,main]
    java.lang.Error: java.io.IOException: Error in opening 
FileSegmentManagedBuffer
    
    
{file=********/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer\{file=*******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
    
    at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
    at 
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
    at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
    at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
    at 
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    ... 2 more
    Caused by: java.io.FileNotFoundException: 
******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
 (No such file or directory)
    at java.io.RandomAccessFile.open0(Native Method)
    at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
    at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
    at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a unit to verify no more data is pushed when `FileNotFoundException` 
is encountered. Have also verified in our environment.
    
    Closes #33477 from otterc/SPARK-36255.
    
    Authored-by: Chandni Singh <singh.chan...@gmail.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../apache/spark/network/shuffle/ErrorHandler.java |  9 +++++--
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  | 27 +++++++++++++++++----
 .../spark/shuffle/ShuffleBlockPusherSuite.scala    | 28 ++++++++++++++++++++--
 3 files changed, 56 insertions(+), 8 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 968777f..2e15671 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.FileNotFoundException;
 import java.net.ConnectException;
 
 import com.google.common.base.Throwables;
@@ -82,8 +83,12 @@ public interface ErrorHandler {
 
     @Override
     public boolean shouldRetryError(Throwable t) {
-      // If it is a connection time out or a connection closed exception, no 
need to retry.
-      if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+      // If it is a connection time-out or a connection closed exception, no 
need to retry.
+      // If it is a FileNotFoundException originating from the client while 
pushing the shuffle
+      // blocks to the server, even then there is no need to retry. We will 
still log this exception
+      // once which helps with debugging.
+      if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
+          t.getCause() instanceof FileNotFoundException)) {
         return false;
       }
       // If the block is too late, there is no need to retry it
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 88d084c..53687bb 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle
 
-import java.io.File
+import java.io.{File, FileNotFoundException}
 import java.net.ConnectException
 import java.nio.ByteBuffer
 import java.util.concurrent.ExecutorService
@@ -71,6 +71,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
       // blocks to just that host and continue push blocks to other hosts. So, 
here push of
       // all blocks will only stop when it is "Too Late". Also see 
updateStateAndCheckIfPushMore.
       override def shouldRetryError(t: Throwable): Boolean = {
+        // If it is a FileNotFoundException originating from the client while 
pushing the shuffle
+        // blocks to the server, then we stop pushing all the blocks because 
this indicates the
+        // shuffle files are deleted and subsequent block push will also fail.
+        if (t.getCause != null && 
t.getCause.isInstanceOf[FileNotFoundException]) {
+          return false
+        }
         // If the block is too late, there is no need to retry it
         
!Throwables.getStackTraceAsString(t).contains(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)
       }
@@ -100,10 +106,22 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
     pushRequests ++= Utils.randomize(requests)
 
     submitTask(() => {
-      pushUpToMax()
+      tryPushUpToMax()
     })
   }
 
+  private[shuffle] def tryPushUpToMax(): Unit = {
+    try {
+      pushUpToMax()
+    } catch {
+      case e: FileNotFoundException =>
+        logWarning("The shuffle files got deleted when this 
shuffle-block-push-thread " +
+          "was reading from them which could happen when the job finishes and 
the driver " +
+          "instructs the executor to cleanup the shuffle. In this case, push 
of the blocks " +
+          "belonging to this shuffle will stop.", e)
+    }
+  }
+
   /**
    * Triggers the push. It's a separate method for testing.
    * VisibleForTesting
@@ -201,7 +219,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         submitTask(() => {
           if (updateStateAndCheckIfPushMore(
             sizeMap(result.blockId), address, remainingBlocks, result)) {
-            pushUpToMax()
+            tryPushUpToMax()
           }
         })
       }
@@ -297,7 +315,8 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
       }
     }
     if (pushResult.failure != null && 
!errorHandler.shouldRetryError(pushResult.failure)) {
-      logDebug(s"Received after merge is finalized from $address. Not pushing 
any more blocks.")
+      logDebug(s"Encountered an exception from $address which indicates that 
push needs to " +
+        s"stop.")
       return false
     } else {
       remainingBlocks.isEmpty && (pushRequests.nonEmpty || 
deferredPushRequests.nonEmpty)
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index de6a2c9..6a07fef 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle
 
-import java.io.File
+import java.io.{File, FileNotFoundException, IOException}
 import java.net.ConnectException
 import java.nio.ByteBuffer
 import java.util.concurrent.LinkedBlockingQueue
@@ -324,8 +324,32 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(pusher.unreachableBlockMgrs.size == 2)
   }
 
+  test("SPARK-36255: FileNotFoundException stops the push") {
+    when(dependency.getMergerLocs).thenReturn(
+      Seq(BlockManagerId("client1", "client1", 1), BlockManagerId("client2", 
"client2", 2)))
+    conf.set("spark.reducer.maxReqsInFlight", "1")
+    val pusher = new TestShuffleBlockPusher(conf)
+    when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
+      .thenAnswer((invocation: InvocationOnMock) => {
+        val pushedBlocks = 
invocation.getArguments()(2).asInstanceOf[Array[String]]
+        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        pushedBlocks.foreach(blockId => {
+          blockFetchListener.onBlockFetchFailure(
+            blockId, new IOException("Failed to send RPC",
+              new FileNotFoundException("file not found")))
+        })
+      })
+    pusher.initiateBlockPush(
+      mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 
2 }, dependency, 0)
+    pusher.runPendingTasks()
+    verify(shuffleClient, times(1))
+      .pushBlocks(any(), any(), any(), any(), any())
+    assert(pusher.tasks.isEmpty)
+    ShuffleBlockPusher.stop()
+  }
+
   private class TestShuffleBlockPusher(conf: SparkConf) extends 
ShuffleBlockPusher(conf) {
-   private[this] val tasks = new LinkedBlockingQueue[Runnable]
+    val tasks = new LinkedBlockingQueue[Runnable]
 
     override protected def submitTask(task: Runnable): Unit = {
       tasks.add(task)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to