This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 2f11896886e [SPARK-33573][CORE][FOLLOW-UP] Enhance ignoredBlockBytes
in pushMergeMetrics to cover more scenarios
2f11896886e is described below
commit 2f11896886ec9a784182d3510632f4d5efcb5b99
Author: Minchu Yang <[email protected]>
AuthorDate: Fri Jan 27 04:15:15 2023 -0600
[SPARK-33573][CORE][FOLLOW-UP] Enhance ignoredBlockBytes in
pushMergeMetrics to cover more scenarios
### What changes were proposed in this pull request?
Currently, the `ignoredBlockBytes` of server side metrics for push-based
shuffle does not fully capture the block bytes that were received by ESS but
didn't get merged. This PR tries to enhance the logic of incrementing
`ignoredBlockBytes`, to capture those bytes being ignored by ESS. Specifically,
the bytes being considered as ignored should be 1. received after the shuffle
file is finalized; 2. when a request is for a duplicate block; 3. the bytes ESS
received but failed to write.
### Why are the changes needed?
This would enhance the `ignoredBlockBytes` of server side metrics for
push-based shuffle to be more accurate.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified existing UTs to capture `ignoredBlockBytes`.
Closes #39725 from rmcyang/SPARK-33573-FOLLOWUP.
Authored-by: Minchu Yang <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit ddd867056f9f4f81d158f2699533152d6dc4c3d7)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 28 +++++++++++++++++++---
.../shuffle/RemoteBlockPushResolverSuite.java | 18 ++++++++------
docs/monitoring.md | 17 ++++++-------
3 files changed, 45 insertions(+), 18 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index a2e8219228a..df2d1fa12d1 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -701,7 +701,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
public void onData(String streamId, ByteBuffer buf) {
// Ignore the requests. It reaches here either when a request is
received after the
// shuffle file is finalized or when a request is for a duplicate
block.
- pushMergeMetrics.ignoredBlockBytes.mark(buf.limit());
+ pushMergeMetrics.ignoredBlockBytes.mark(buf.remaining());
}
@Override
@@ -1211,6 +1211,10 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// Use on-heap instead of direct ByteBuffer since these buffers will be
GC'ed very quickly
private List<ByteBuffer> deferredBufs;
+ // This collects the total pushed block bytes received in the onData
method. Once these bytes
+ // are not being used, we add them to the ignoredBlockBytes of the
pushMergeMetrics.
+ private long receivedBytes = 0;
+
private PushBlockStreamCallback(
RemoteBlockPushResolver mergeManager,
AppShuffleInfo appShuffleInfo,
@@ -1322,6 +1326,16 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
}
+ /**
+ * Update ignoredBlockBytes in pushMergeMetrics.
+ */
+ private void updateIgnoredBlockBytes() {
+ if (receivedBytes > 0) {
+ mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(receivedBytes);
+ receivedBytes = 0;
+ }
+ }
+
/**
* This increments the number of IOExceptions and throws RuntimeException
if it exceeds the
* threshold which will abort the merge of a particular shuffle partition.
@@ -1358,6 +1372,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
+ receivedBytes += buf.remaining();
// When handling the block data using StreamInterceptor, it can help to
reduce the amount
// of data that needs to be buffered in memory since it does not wait
till the completion
// of the frame before handling the message, thus releasing the ByteBuf
earlier. However,
@@ -1481,6 +1496,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
freeDeferredBufs();
+ // Since we just return without throwing exception, and the
received bytes are ignored,
+ // thus we need to add them to ignoredBlockBytes in
pushMergeMetrics.
+ updateIgnoredBlockBytes();
return;
}
if (partitionInfo.getCurrentMapIndex() < 0) {
@@ -1538,6 +1556,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
} else {
logger.debug("Encountered issue when merging {}", streamId, throwable);
}
+ // The block was received by ESS but didn't get merged, so it is
considered as "ignored".
+ // Capturing them in ignoredBlockBytes would help measure any server
side improvement.
+ updateIgnoredBlockBytes();
// Only update partitionInfo if the failure corresponds to a valid
request. If the
// request is too late, i.e. received after shuffle merge finalize or
stale block push,
// #onFailure will also be triggered, and we can just ignore. Also, if
we couldn't find
@@ -2126,8 +2147,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
// staleBlockPushes tracks the number of stale shuffle block push requests
static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";
- // ignoredBlockBytes tracks the size of the blocks that are ignored after
the shuffle file is
- // finalized or when a request is for a duplicate block
+ // ignoredBlockBytes tracks the size of the blocks that are ignored. The
pushed block data are
+ // considered as ignored for these cases: 1. received after the shuffle
file is finalized;
+ // 2. when a request is for a duplicate block; 3. the part that ESS failed
to write.
static final String IGNORED_BLOCK_BYTES_METRIC = "ignoredBlockBytes";
private final Map<String, Metric> allMetrics;
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 630a651d243..2526a94f429 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -278,7 +278,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
- verifyMetrics(4, 0, 0, 0, 0, 0, 0);
+ verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}
@Test
@@ -291,7 +291,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
- verifyMetrics(4, 0, 0, 0, 0, 0, 0);
+ verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}
@Test
@@ -306,7 +306,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
- verifyMetrics(9, 0, 0, 0, 0, 0, 0);
+ verifyMetrics(9, 0, 0, 0, 0, 0, 9);
}
@Test
@@ -322,7 +322,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new
int[][]{{0}});
- verifyMetrics(9, 0, 0, 0, 0, 0, 0);
+ verifyMetrics(9, 0, 0, 0, 0, 0, 9);
}
@Test
@@ -385,7 +385,7 @@ public class RemoteBlockPushResolverSuite {
FileSegmentManagedBuffer mb =
(FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0,
0, 0, 0);
assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
- verifyMetrics(14, 0, 0, 0, 0, 0, 0);
+ verifyMetrics(14, 0, 0, 0, 0, 0, 10);
}
@Test
@@ -1066,6 +1066,8 @@ public class RemoteBlockPushResolverSuite {
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class,
() -> stream1.onComplete(stream1.getID()));
+ // Trigger onFailure so that the stale bytes would be added into
ignoredBytes
+ stream1.onFailure(stream1.getID(), new RuntimeException("Forced Failure"));
BlockPushReturnCode errorCode =
(BlockPushReturnCode)
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
@@ -1076,7 +1078,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 2));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
- verifyMetrics(6, 0, 0, 0, 0, 2, 0);
+ verifyMetrics(6, 0, 0, 0, 0, 2, 4);
}
@Test
@@ -1094,6 +1096,8 @@ public class RemoteBlockPushResolverSuite {
// stream 1 push should be rejected as it is from an older shuffleMergeId
BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class,
() -> stream1.onComplete(stream1.getID()));
+ // Trigger onFailure so that the stale bytes would be added into
ignoredBytes
+ stream1.onFailure(stream1.getID(), new RuntimeException("Forced Failure"));
BlockPushReturnCode errorCode =
(BlockPushReturnCode)
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
@@ -1111,7 +1115,7 @@ public class RemoteBlockPushResolverSuite {
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
- verifyMetrics(6, 0, 0, 0, 0, 2, 0);
+ verifyMetrics(6, 0, 0, 0, 0, 2, 4);
}
@Test
diff --git a/docs/monitoring.md b/docs/monitoring.md
index dbc2c14aea6..1f7acf4dece 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -1425,16 +1425,17 @@ Note: applies to the shuffle service
- **note:** the metrics below apply when the server side configuration
`spark.shuffle.push.server.mergedShuffleFileManagerImpl` is set to
`org.apache.spark.network.shuffle.MergedShuffleFileManager` for Push-Based
Shuffle
-- blockBytesWritten - the size of the pushed block data written to file in
bytes
-- blockAppendCollisions - the number of shuffle push blocks collided in
shuffle services
+- blockBytesWritten - size of the pushed block data written to file in bytes
+- blockAppendCollisions - number of shuffle push blocks collided in shuffle
services
as another block for the same reduce partition were being written
-- lateBlockPushes - the number of shuffle push blocks that are received in
shuffle service
+- lateBlockPushes - number of shuffle push blocks that are received in shuffle
service
after the specific shuffle merge has been finalized
-- deferredBlocks - the number of the current deferred block parts buffered in
memory
-- deferredBlockBytes - the size of the current deferred block parts buffered
in memory
-- staleBlockPushes - the number of stale shuffle block push requests
-- ignoredBlockBytes - the size of the pushed block data that are ignored after
the shuffle
- file is finalized or when a request is for a duplicate block
+- deferredBlocks - number of the current deferred block parts buffered in
memory
+- deferredBlockBytes - size of the current deferred block parts buffered in
memory
+- staleBlockPushes - number of stale shuffle block push requests
+- ignoredBlockBytes - size of the pushed block data that was transferred to
ESS, but ignored.
+ The pushed block data are considered as ignored when: 1. it was received
after the shuffle
+ was finalized; 2. when a push request is for a duplicate block; 3. ESS was
unable to write the block.
# Advanced Instrumentation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]