This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 2f11896886e [SPARK-33573][CORE][FOLLOW-UP] Enhance ignoredBlockBytes 
in pushMergeMetrics to cover more scenarios
2f11896886e is described below

commit 2f11896886ec9a784182d3510632f4d5efcb5b99
Author: Minchu Yang <miny...@minyang-mn1.linkedin.biz>
AuthorDate: Fri Jan 27 04:15:15 2023 -0600

    [SPARK-33573][CORE][FOLLOW-UP] Enhance ignoredBlockBytes in 
pushMergeMetrics to cover more scenarios
    
    ### What changes were proposed in this pull request?
    
    Currently, the `ignoredBlockBytes` of server side metrics for push-based 
shuffle does not fully capture the block bytes that were received by ESS but 
didn't get merged. This PR tries to enhance the logic of  incrementing 
`ignoredBlockBytes`, to capture those bytes being ignored by ESS. Specifically, 
the bytes being considered as ignored should be 1. received after the shuffle 
file is finalized; 2. when a request is for a duplicate block; 3. the bytes ESS 
received but failed to write.
    
    ### Why are the changes needed?
    
    This would enhance the `ignoredBlockBytes` of server side metrics for 
push-based shuffle to be more accurate.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified existing UTs to capture `ignoredBlockBytes`.
    
    Closes #39725 from rmcyang/SPARK-33573-FOLLOWUP.
    
    Authored-by: Minchu Yang <miny...@minyang-mn1.linkedin.biz>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit ddd867056f9f4f81d158f2699533152d6dc4c3d7)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   | 28 +++++++++++++++++++---
 .../shuffle/RemoteBlockPushResolverSuite.java      | 18 ++++++++------
 docs/monitoring.md                                 | 17 ++++++-------
 3 files changed, 45 insertions(+), 18 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index a2e8219228a..df2d1fa12d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -701,7 +701,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         public void onData(String streamId, ByteBuffer buf) {
           // Ignore the requests. It reaches here either when a request is 
received after the
           // shuffle file is finalized or when a request is for a duplicate 
block.
-          pushMergeMetrics.ignoredBlockBytes.mark(buf.limit());
+          pushMergeMetrics.ignoredBlockBytes.mark(buf.remaining());
         }
 
         @Override
@@ -1211,6 +1211,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     // Use on-heap instead of direct ByteBuffer since these buffers will be 
GC'ed very quickly
     private List<ByteBuffer> deferredBufs;
 
+    // This collects the total pushed block bytes received in the onData 
method. Once these bytes
+    // are not being used, we add them to the ignoredBlockBytes of the 
pushMergeMetrics.
+    private long receivedBytes = 0;
+
     private PushBlockStreamCallback(
         RemoteBlockPushResolver mergeManager,
         AppShuffleInfo appShuffleInfo,
@@ -1322,6 +1326,16 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       }
     }
 
+    /**
+     * Update ignoredBlockBytes in pushMergeMetrics.
+     */
+    private void updateIgnoredBlockBytes() {
+      if (receivedBytes > 0) {
+        mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(receivedBytes);
+        receivedBytes = 0;
+      }
+    }
+
     /**
      * This increments the number of IOExceptions and throws RuntimeException 
if it exceeds the
      * threshold which will abort the merge of a particular shuffle partition.
@@ -1358,6 +1372,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
     @Override
     public void onData(String streamId, ByteBuffer buf) throws IOException {
+      receivedBytes += buf.remaining();
       // When handling the block data using StreamInterceptor, it can help to 
reduce the amount
       // of data that needs to be buffered in memory since it does not wait 
till the completion
       // of the frame before handling the message, thus releasing the ByteBuf 
earlier. However,
@@ -1481,6 +1496,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           // the client in cases of duplicate even though no data is written.
           if (isDuplicateBlock()) {
             freeDeferredBufs();
+            // Since we just return without throwing exception, and the 
received bytes are ignored,
+            // thus we need to add them to ignoredBlockBytes in 
pushMergeMetrics.
+            updateIgnoredBlockBytes();
             return;
           }
           if (partitionInfo.getCurrentMapIndex() < 0) {
@@ -1538,6 +1556,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       } else {
         logger.debug("Encountered issue when merging {}", streamId, throwable);
       }
+      // The block was received by ESS but didn't get merged, so it is 
considered as "ignored".
+      // Capturing them in ignoredBlockBytes would help measure any server 
side improvement.
+      updateIgnoredBlockBytes();
       // Only update partitionInfo if the failure corresponds to a valid 
request. If the
       // request is too late, i.e. received after shuffle merge finalize or 
stale block push,
       // #onFailure will also be triggered, and we can just ignore. Also, if 
we couldn't find
@@ -2126,8 +2147,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
     // staleBlockPushes tracks the number of stale shuffle block push requests
     static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";
-    // ignoredBlockBytes tracks the size of the blocks that are ignored after 
the shuffle file is
-    // finalized or when a request is for a duplicate block
+    // ignoredBlockBytes tracks the size of the blocks that are ignored. The 
pushed block data are
+    // considered as ignored for these cases: 1. received after the shuffle 
file is finalized;
+    // 2. when a request is for a duplicate block; 3. the part that ESS failed 
to write.
     static final String IGNORED_BLOCK_BYTES_METRIC = "ignoredBlockBytes";
 
     private final Map<String, Metric> allMetrics;
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 630a651d243..2526a94f429 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -278,7 +278,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
-    verifyMetrics(4, 0, 0, 0, 0, 0, 0);
+    verifyMetrics(4, 0, 0, 0, 0, 0, 4);
   }
 
   @Test
@@ -291,7 +291,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
-    verifyMetrics(4, 0, 0, 0, 0, 0, 0);
+    verifyMetrics(4, 0, 0, 0, 0, 0, 4);
   }
 
   @Test
@@ -306,7 +306,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
-    verifyMetrics(9, 0, 0, 0, 0, 0, 0);
+    verifyMetrics(9, 0, 0, 0, 0, 0, 9);
   }
 
   @Test
@@ -322,7 +322,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
-    verifyMetrics(9, 0, 0, 0, 0, 0, 0);
+    verifyMetrics(9, 0, 0, 0, 0, 0, 9);
   }
 
   @Test
@@ -385,7 +385,7 @@ public class RemoteBlockPushResolverSuite {
     FileSegmentManagedBuffer mb =
       (FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0, 
0, 0, 0);
     assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
-    verifyMetrics(14, 0, 0, 0, 0, 0, 0);
+    verifyMetrics(14, 0, 0, 0, 0, 0, 10);
   }
 
   @Test
@@ -1066,6 +1066,8 @@ public class RemoteBlockPushResolverSuite {
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class,
       () -> stream1.onComplete(stream1.getID()));
+    // Trigger onFailure so that the stale bytes would be added into 
ignoredBytes
+    stream1.onFailure(stream1.getID(), new RuntimeException("Forced Failure"));
     BlockPushReturnCode errorCode =
       (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
     assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
@@ -1076,7 +1078,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 2));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
     validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
-    verifyMetrics(6, 0, 0, 0, 0, 2, 0);
+    verifyMetrics(6, 0, 0, 0, 0, 2, 4);
   }
 
   @Test
@@ -1094,6 +1096,8 @@ public class RemoteBlockPushResolverSuite {
     // stream 1 push should be rejected as it is from an older shuffleMergeId
     BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class,
       () -> stream1.onComplete(stream1.getID()));
+    // Trigger onFailure so that the stale bytes would be added into 
ignoredBytes
+    stream1.onFailure(stream1.getID(), new RuntimeException("Forced Failure"));
     BlockPushReturnCode errorCode =
       (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
     assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
@@ -1111,7 +1115,7 @@ public class RemoteBlockPushResolverSuite {
 
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
     validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
-    verifyMetrics(6, 0, 0, 0, 0, 2, 0);
+    verifyMetrics(6, 0, 0, 0, 0, 2, 4);
   }
 
   @Test
diff --git a/docs/monitoring.md b/docs/monitoring.md
index dbc2c14aea6..1f7acf4dece 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -1425,16 +1425,17 @@ Note: applies to the shuffle service
 - **note:** the metrics below apply when the server side configuration
   `spark.shuffle.push.server.mergedShuffleFileManagerImpl` is set to
   `org.apache.spark.network.shuffle.MergedShuffleFileManager` for Push-Based 
Shuffle
-- blockBytesWritten - the size of the pushed block data written to file in 
bytes
-- blockAppendCollisions - the number of shuffle push blocks collided in 
shuffle services
+- blockBytesWritten - size of the pushed block data written to file in bytes
+- blockAppendCollisions - number of shuffle push blocks collided in shuffle 
services
   as another block for the same reduce partition were being written
-- lateBlockPushes - the number of shuffle push blocks that are received in 
shuffle service
+- lateBlockPushes - number of shuffle push blocks that are received in shuffle 
service
   after the specific shuffle merge has been finalized
-- deferredBlocks - the number of the current deferred block parts buffered in 
memory
-- deferredBlockBytes - the size of the current deferred block parts buffered 
in memory
-- staleBlockPushes - the number of stale shuffle block push requests
-- ignoredBlockBytes - the size of the pushed block data that are ignored after 
the shuffle
-  file is finalized or when a request is for a duplicate block
+- deferredBlocks - number of the current deferred block parts buffered in 
memory
+- deferredBlockBytes - size of the current deferred block parts buffered in 
memory
+- staleBlockPushes - number of stale shuffle block push requests
+- ignoredBlockBytes - size of the pushed block data that was transferred to 
ESS, but ignored.
+  The pushed block data are considered as ignored when: 1. it was received 
after the shuffle
+  was finalized; 2. when a push request is for a duplicate block; 3. ESS was 
unable to write the block.
 
 # Advanced Instrumentation
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to