This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 2fb62e0 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started
error message in BlockPushErrorHandler in client
2fb62e0 is described below
commit 2fb62e0e3d40835a3e61fcf210e0772cd0d21a68
Author: zhuqi-lucas <[email protected]>
AuthorDate: Mon Aug 16 13:58:48 2021 -0500
[SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message
in BlockPushErrorHandler in client
### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates
the PushblockStream message is received after a new application attempt has
started. This error message should be correctly handled in client without
retrying the block push.
### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will
not retry pushing the block nor log the exception on the client side.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add the corresponding unit test.
Closes #33617 from zhuqi-lucas/master.
Authored-by: zhuqi-lucas <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 05cd5f97c3dea25dacdbdb319243cdab9667c774)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../network/server/BlockPushNonFatalFailure.java | 22 +++++++++++++++-
.../network/server/TransportRequestHandler.java | 13 ++++++++--
.../apache/spark/network/shuffle/ErrorHandler.java | 7 ++---
.../network/shuffle/RemoteBlockPushResolver.java | 30 ++++++++++++----------
.../spark/network/shuffle/ErrorHandlerSuite.java | 4 +++
.../shuffle/RemoteBlockPushResolverSuite.java | 17 +++++++-----
.../apache/spark/shuffle/ShuffleBlockPusher.scala | 10 +++-----
.../spark/shuffle/ShuffleBlockPusherSuite.scala | 6 +++++
8 files changed, 77 insertions(+), 32 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
index 5906fa2..4bb22b2 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends
RuntimeException {
" is received after merged shuffle is finalized";
/**
+ * String constant used for generating exception messages indicating the
application attempt is
+ * not the latest attempt on the server side. When we get a block push
failure because of the too
+ * old attempt, we will not retry pushing the block nor log the exception on
the client side.
+ */
+ public static final String TOO_OLD_ATTEMPT_SUFFIX =
+ " is from an older app attempt";
+
+ /**
* String constant used for generating exception messages indicating a block
to be merged
* is a stale block push in the case of indeterminate stage retries on the
server side.
* When we get a block push failure because of the block push being stale,
we will not
@@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends
RuntimeException {
* indeterminate stage retries. When the client receives this code, it
will not retry
* pushing the block.
*/
- STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+ STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
+ /**
+ * Indicate the application attempt is not the latest attempt on the
server side.
+ * When the client gets this code, it will not retry pushing the block.
+ */
+ TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);
private final byte id;
// Error message suffix used to generate an error message for a given
ReturnCode and
@@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends
RuntimeException {
case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
case 3: return ReturnCode.STALE_BLOCK_PUSH;
+ case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
default: throw new IllegalArgumentException("Unknown block push return
code: " + id);
}
}
+ public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
+ return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+ returnCode == ReturnCode.STALE_BLOCK_PUSH ||
+ returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
+ }
+
public static String getErrorMsg(String blockId, ReturnCode errorCode) {
Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
return "Block " + blockId + errorCode.errorMsgSuffix;
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 5c07f20..bc99248 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -249,8 +249,17 @@ public class TransportRequestHandler extends
MessageHandler<RequestMessage> {
wrappedCallback.onComplete(wrappedCallback.getID());
}
} catch (Exception e) {
- logger.error("Error while invoking RpcHandler#receive() on RPC id " +
req.requestId, e);
- respond(new RpcFailure(req.requestId,
Throwables.getStackTraceAsString(e)));
+ if (e instanceof BlockPushNonFatalFailure) {
+ // Thrown by rpcHandler.receiveStream(reverseClient, meta, callback),
the same as
+ // onComplete method. Respond an RPC message with the error code to
client instead of
+ // using exceptions encoded in the RPCFailure. Using a proper
RPCResponse is more
+ // efficient, and now only include the too old attempt case here.
+ respond(new RpcResponse(req.requestId,
+ new NioManagedBuffer(((BlockPushNonFatalFailure) e).getResponse())));
+ } else {
+ logger.error("Error while invoking RpcHandler#receive() on RPC id " +
req.requestId, e);
+ respond(new RpcFailure(req.requestId,
Throwables.getStackTraceAsString(e)));
+ }
// We choose to totally fail the channel, rather than trying to recover
as we do in other
// cases. We don't know how many bytes of the stream the client has
already sent for the
// stream, it's not worth trying to recover.
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 271d762..9136ff6 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -87,10 +87,11 @@ public interface ErrorHandler {
return false;
}
- // If the block is too late or stale block push, there is no need to
retry it
+ // If the block is too late or the invalid block push or the attempt is
not the latest one,
+ // there is no need to retry it
return !(t instanceof BlockPushNonFatalFailure &&
- (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH
||
- ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
+ BlockPushNonFatalFailure
+ .shouldNotRetryErrorCode(((BlockPushNonFatalFailure)
t).getReturnCode()));
}
@Override
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 80174d1..d04db67 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -401,19 +401,17 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
- if (appShuffleInfo.attemptId != msg.appAttemptId) {
- // If this Block belongs to a former application attempt, it is
considered late,
- // as only the blocks from the current application attempt will be merged
- // TODO: [SPARK-35548] Client should be updated to handle this error.
- throw new IllegalArgumentException(
- String.format("The attempt id %s in this PushBlockStream message does
not match "
- + "with the current attempt id %s stored in shuffle service for
application %s",
- msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
- }
// Use string concatenation here to avoid the overhead with String.format
on every
// pushed block.
final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX +
"_"
+ msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" +
msg.reduceId;
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ // If this Block belongs to a former application attempt, it is
considered late,
+ // as only the blocks from the current application attempt will be merged
+ throw new BlockPushNonFatalFailure(new BlockPushReturnCode(ReturnCode
+ .TOO_OLD_ATTEMPT_PUSH.id(), streamId).toByteBuffer(),
+ BlockPushNonFatalFailure.getErrorMsg(streamId,
ReturnCode.TOO_OLD_ATTEMPT_PUSH));
+ }
// Retrieve merged shuffle file metadata
AppShufflePartitionInfo partitionInfoBeforeCheck;
BlockPushNonFatalFailure failure = null;
@@ -519,12 +517,18 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
- // If this Block belongs to a former application attempt, it is
considered late,
- // as only the blocks from the current application attempt will be merged
- // TODO: [SPARK-35548] Client should be updated to handle this error.
+ // If finalizeShuffleMerge from a former application attempt, it is
considered late,
+ // as only the finalizeShuffleMerge request from the current application
attempt
+ // will be merged. Too old app attempt only being seen by an already
failed
+ // app attempt, and no need use callback to return to client now, because
+ // the finalizeShuffleMerge in DAGScheduler has no retry policy, and
don't
+ // use the BlockPushNonFatalFailure because it's the finalizeShuffleMerge
+ // related case, not the block push case, just throw it in server side
now.
+ // TODO we may use a new exception class to include the
finalizeShuffleMerge
+ // related case just as the BlockPushNonFatalFailure contains the block
push cases.
throw new IllegalArgumentException(
String.format("The attempt id %s in this FinalizeShuffleMerge message
does not match "
- + "with the current attempt id %s stored in shuffle service for
application %s",
+ + "with the current attempt id %s stored in shuffle service for
application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>>
shuffleMergePartitionsRef =
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
index 56c9a97..246fda6 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -37,6 +37,8 @@ public class ErrorHandlerSuite {
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+ assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new RuntimeException(new
ConnectException())));
assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
@@ -54,6 +56,8 @@ public class ErrorHandlerSuite {
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+ assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index d7881f0..3324b4e 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -112,6 +112,8 @@ public class RemoteBlockPushResolverSuite {
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+ BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+ assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED,
"")));
@@ -939,7 +941,7 @@ public class RemoteBlockPushResolverSuite {
}
}
- @Test(expected = IllegalArgumentException.class)
+ @Test(expected = BlockPushNonFatalFailure.class)
public void testPushBlockFromPreviousAttemptIsRejected()
throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
@@ -998,11 +1000,12 @@ public class RemoteBlockPushResolverSuite {
try {
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, 1, 0, 0, 1, 0, 0));
- } catch (IllegalArgumentException re) {
- assertEquals(
- "The attempt id 1 in this PushBlockStream message does not match " +
- "with the current attempt id 2 stored in shuffle service for
application " +
- testApp, re.getMessage());
+ } catch (BlockPushNonFatalFailure re) {
+ BlockPushReturnCode errorCode =
+ (BlockPushReturnCode)
BlockTransferMessage.Decoder.fromByteBuffer(re.getResponse());
+
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(),
+ errorCode.returnCode);
+ assertEquals(errorCode.failureBlockId, stream2.getID());
throw re;
}
}
@@ -1034,7 +1037,7 @@ public class RemoteBlockPushResolverSuite {
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
String.format("The attempt id %s in this FinalizeShuffleMerge message
does not " +
- "match with the current attempt id %s stored in shuffle service for
application %s",
+ "match with the current attempt id %s stored in shuffle service
for application %s",
ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
throw e;
}
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index e6af767..bb260f8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -32,7 +32,6 @@ import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer,
ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.BlockPushNonFatalFailure
-import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
import org.apache.spark.network.shuffle.BlockPushingListener
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
import org.apache.spark.network.util.TransportConf
@@ -78,12 +77,11 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
if (t.getCause != null &&
t.getCause.isInstanceOf[FileNotFoundException]) {
return false
}
- // If the block is too late or the invalid block push, there is no
need to retry it
+ // If the block is too late or the invalid block push or the attempt
is not the latest one,
+ // there is no need to retry it
!(t.isInstanceOf[BlockPushNonFatalFailure] &&
- (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
- == ReturnCode.TOO_LATE_BLOCK_PUSH ||
- t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
- == ReturnCode.STALE_BLOCK_PUSH))
+ BlockPushNonFatalFailure.
+
shouldNotRetryErrorCode(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode));
}
}
}
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 6f9b5e4..298ba50 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -224,6 +224,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with
BeforeAndAfterEach {
ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
assert(
!errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+ assert(
+ !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")))
assert(errorHandler.shouldRetryError(new RuntimeException(new
ConnectException())))
assert(
@@ -240,6 +243,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with
BeforeAndAfterEach {
ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
assert(
!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+ ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+ assert(
+ !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")))
assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]