This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 96944ac [SPARK-36255][SHUFFLE][CORE] Stop pushing and retrying on
FileNotFound exceptions
96944ac is described below
commit 96944ac17d209b467bb51f71ff6f099d28ef0f3b
Author: Chandni Singh <[email protected]>
AuthorDate: Sat Jul 24 21:09:11 2021 +0800
[SPARK-36255][SHUFFLE][CORE] Stop pushing and retrying on FileNotFound
exceptions
### What changes were proposed in this pull request?
Once the shuffle is cleaned up by the `ContextCleaner`, the shuffle files
are deleted by the executors. In this case, the push of the shuffle data by the
executors can throw `FileNotFoundException`s because the shuffle files are
deleted. When this exception is thrown from the `shuffle-block-push-thread`, it
causes the executor to exit. Both the `shuffle-block-push` threads and the
netty event-loops will encounter `FileNotFoundException`s in this case. The
fix here stops these threads [...]
### Why are the changes needed?
This fixes the bug which causes executor to exits when they are instructed
to clean up shuffle data.
Below is the stacktrace of this exception:
```
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught
exception in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening
FileSegmentManagedBuffer
{file=********/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
offset=10640, length=190}
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening
FileSegmentManagedBuffer\{file=*******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
offset=10640, length=190}
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException:
******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
(No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a unit to verify no more data is pushed when `FileNotFoundException`
is encountered. Have also verified in our environment.
Closes #33477 from otterc/SPARK-36255.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: yi.wu <[email protected]>
(cherry picked from commit 09e1c612729292a9a000a2d64267ab874d71a1e5)
Signed-off-by: yi.wu <[email protected]>
---
.../apache/spark/network/shuffle/ErrorHandler.java | 9 +++++--
.../apache/spark/shuffle/ShuffleBlockPusher.scala | 27 +++++++++++++++++----
.../spark/shuffle/ShuffleBlockPusherSuite.scala | 28 ++++++++++++++++++++--
3 files changed, 56 insertions(+), 8 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 968777f..2e15671 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -17,6 +17,7 @@
package org.apache.spark.network.shuffle;
+import java.io.FileNotFoundException;
import java.net.ConnectException;
import com.google.common.base.Throwables;
@@ -82,8 +83,12 @@ public interface ErrorHandler {
@Override
public boolean shouldRetryError(Throwable t) {
- // If it is a connection time out or a connection closed exception, no
need to retry.
- if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+ // If it is a connection time-out or a connection closed exception, no
need to retry.
+ // If it is a FileNotFoundException originating from the client while
pushing the shuffle
+ // blocks to the server, even then there is no need to retry. We will
still log this exception
+ // once which helps with debugging.
+ if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
+ t.getCause() instanceof FileNotFoundException)) {
return false;
}
// If the block is too late, there is no need to retry it
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 88d084c..53687bb 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
-import java.io.File
+import java.io.{File, FileNotFoundException}
import java.net.ConnectException
import java.nio.ByteBuffer
import java.util.concurrent.ExecutorService
@@ -71,6 +71,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
// blocks to just that host and continue push blocks to other hosts. So,
here push of
// all blocks will only stop when it is "Too Late". Also see
updateStateAndCheckIfPushMore.
override def shouldRetryError(t: Throwable): Boolean = {
+ // If it is a FileNotFoundException originating from the client while
pushing the shuffle
+ // blocks to the server, then we stop pushing all the blocks because
this indicates the
+ // shuffle files are deleted and subsequent block push will also fail.
+ if (t.getCause != null &&
t.getCause.isInstanceOf[FileNotFoundException]) {
+ return false
+ }
// If the block is too late, there is no need to retry it
!Throwables.getStackTraceAsString(t).contains(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)
}
@@ -100,10 +106,22 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
pushRequests ++= Utils.randomize(requests)
submitTask(() => {
- pushUpToMax()
+ tryPushUpToMax()
})
}
+ private[shuffle] def tryPushUpToMax(): Unit = {
+ try {
+ pushUpToMax()
+ } catch {
+ case e: FileNotFoundException =>
+ logWarning("The shuffle files got deleted when this
shuffle-block-push-thread " +
+ "was reading from them which could happen when the job finishes and
the driver " +
+ "instructs the executor to cleanup the shuffle. In this case, push
of the blocks " +
+ "belonging to this shuffle will stop.", e)
+ }
+ }
+
/**
* Triggers the push. It's a separate method for testing.
* VisibleForTesting
@@ -201,7 +219,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
submitTask(() => {
if (updateStateAndCheckIfPushMore(
sizeMap(result.blockId), address, remainingBlocks, result)) {
- pushUpToMax()
+ tryPushUpToMax()
}
})
}
@@ -297,7 +315,8 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
}
}
if (pushResult.failure != null &&
!errorHandler.shouldRetryError(pushResult.failure)) {
- logDebug(s"Received after merge is finalized from $address. Not pushing
any more blocks.")
+ logDebug(s"Encountered an exception from $address which indicates that
push needs to " +
+ s"stop.")
return false
} else {
remainingBlocks.isEmpty && (pushRequests.nonEmpty ||
deferredPushRequests.nonEmpty)
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index de6a2c9..6a07fef 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
-import java.io.File
+import java.io.{File, FileNotFoundException, IOException}
import java.net.ConnectException
import java.nio.ByteBuffer
import java.util.concurrent.LinkedBlockingQueue
@@ -324,8 +324,32 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with
BeforeAndAfterEach {
assert(pusher.unreachableBlockMgrs.size == 2)
}
+ test("SPARK-36255: FileNotFoundException stops the push") {
+ when(dependency.getMergerLocs).thenReturn(
+ Seq(BlockManagerId("client1", "client1", 1), BlockManagerId("client2",
"client2", 2)))
+ conf.set("spark.reducer.maxReqsInFlight", "1")
+ val pusher = new TestShuffleBlockPusher(conf)
+ when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val pushedBlocks =
invocation.getArguments()(2).asInstanceOf[Array[String]]
+ val blockFetchListener =
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+ pushedBlocks.foreach(blockId => {
+ blockFetchListener.onBlockFetchFailure(
+ blockId, new IOException("Failed to send RPC",
+ new FileNotFoundException("file not found")))
+ })
+ })
+ pusher.initiateBlockPush(
+ mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) {
2 }, dependency, 0)
+ pusher.runPendingTasks()
+ verify(shuffleClient, times(1))
+ .pushBlocks(any(), any(), any(), any(), any())
+ assert(pusher.tasks.isEmpty)
+ ShuffleBlockPusher.stop()
+ }
+
private class TestShuffleBlockPusher(conf: SparkConf) extends
ShuffleBlockPusher(conf) {
- private[this] val tasks = new LinkedBlockingQueue[Runnable]
+ val tasks = new LinkedBlockingQueue[Runnable]
override protected def submitTask(task: Runnable): Unit = {
tasks.add(task)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]