This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f736c7302 [#2648] fix(spark): Incorrect fetched bytes metric when 
overlapping decompression is enabled (#2650)
f736c7302 is described below

commit f736c730262855bb97ee6fc2f3f5a6a461ae164c
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Nov 11 09:52:41 2025 +0800

    [#2648] fix(spark): Incorrect fetched bytes metric when overlapping 
decompression is enabled (#2650)
    
    ### What changes were proposed in this pull request?
    
    Correct fetched bytes metric when overlapping decompression is enabled
    
    ### Why are the changes needed?
    
    In the current codebase, the shuffle read bytes will be as the uncompressed 
byte size, that is inconsistent with the writer side statistics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal job tests.
---
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |  4 +-
 .../shuffle/reader/RssShuffleDataIterator.java     |  5 +-
 .../shuffle/reader/AbstractRssReaderTest.java      |  3 +-
 .../shuffle/reader/RssShuffleDataIteratorTest.java | 61 ++++++++++++++++++++--
 .../common/shuffle/impl/RssTezFetcherTest.java     |  4 +-
 .../RssTezShuffleDataFetcherTest.java              |  4 +-
 .../uniffle/client/impl/DecompressionWorker.java   |  3 +-
 .../uniffle/client/impl/ShuffleReadClientImpl.java |  2 +-
 .../client/response/CompressedShuffleBlock.java    | 21 +++++---
 .../client/response/DecompressedShuffleBlock.java  | 10 +++-
 .../uniffle/client/response/ShuffleBlock.java      |  2 +
 11 files changed, 99 insertions(+), 20 deletions(-)

diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 85cc09614..674eaba94 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -601,7 +601,9 @@ public class FetcherTest {
       data.forEach(
           bytes -> {
             byte[] compressed = codec.compress(bytes);
-            blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed), 
bytes.length));
+            blocks.add(
+                new CompressedShuffleBlock(
+                    ByteBuffer.wrap(compressed), bytes.length, 
compressed.length));
           });
     }
 
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index be1288488..8dfdbc7bf 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -156,9 +156,8 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
         shuffleReadTaskStats.ifPresent(
             stats -> stats.incPartitionBlock(partitionId, 
shuffleBlock.getTaskAttemptId()));
         // collect metrics from raw data
-        long rawDataLength = rawData.limit() - rawData.position();
-        totalRawBytesLength += rawDataLength;
-        shuffleReadMetrics.incRemoteBytesRead(rawDataLength);
+        totalRawBytesLength += shuffleBlock.getCompressedLength();
+        
shuffleReadMetrics.incRemoteBytesRead(shuffleBlock.getCompressedLength());
 
         long startUncompression = System.currentTimeMillis();
         // get initial data
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index 7099fd9eb..31df4dd54 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -134,7 +134,7 @@ public abstract class AbstractRssReaderTest extends 
HadoopTestBase {
         compress);
   }
 
-  protected void writeTestData(
+  protected List<ShufflePartitionedBlock> writeTestData(
       ShuffleWriteHandler handler,
       int blockNum,
       int recordNum,
@@ -163,6 +163,7 @@ public abstract class AbstractRssReaderTest extends 
HadoopTestBase {
       serializeStream.close();
     }
     handler.write(blocks);
+    return blocks;
   }
 
   protected ShufflePartitionedBlock createShuffleBlock(byte[] data, long 
blockId) {
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 0aff0706c..b97ba3f2d 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -46,6 +46,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
@@ -76,6 +77,46 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
         Arguments.of(BlockIdLayout.DEFAULT), 
Arguments.of(BlockIdLayout.from(20, 21, 22)));
   }
 
+  @Test
+  public void readBytesMetricTestWithOverlappingDecompression() throws 
Exception {
+    String basePath = HDFS_URI + 
"readBytesMetricTestWithOverlappingDecompression";
+    HadoopShuffleWriteHandler writeHandler =
+        new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
+    Map<String, String> expectedData = Maps.newHashMap();
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+    List<ShufflePartitionedBlock> blocksWritten =
+        writeTestData(
+            writeHandler,
+            2,
+            5,
+            BlockIdLayout.DEFAULT,
+            expectedData,
+            blockIdBitmap,
+            "key",
+            KRYO_SERIALIZER,
+            0,
+            true);
+    long writtenBytes =
+        blocksWritten.stream().map(x -> x.getDataLength()).reduce((a, b) -> a 
+ b).get();
+
+    ShuffleReadMetrics readMetrics = new ShuffleReadMetrics();
+    RssShuffleDataIterator rssShuffleDataIterator =
+        getDataIterator(
+            basePath,
+            blockIdBitmap,
+            taskIdBitmap,
+            Lists.newArrayList(ssi1),
+            true,
+            readMetrics,
+            true);
+    validateResult(rssShuffleDataIterator, expectedData, 10);
+
+    // case1: validate the reader side bytes whether to be consistent with the 
written bytes
+    assertEquals(writtenBytes, readMetrics.remoteBytesRead());
+    assertEquals(10, readMetrics.recordsRead());
+  }
+
   @ParameterizedTest
   @MethodSource("testBlockIdLayouts")
   public void readTest1(BlockIdLayout layout) throws Exception {
@@ -116,7 +157,8 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
       Roaring64NavigableMap blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       List<ShuffleServerInfo> serverInfos) {
-    return getDataIterator(basePath, blockIdBitmap, taskIdBitmap, serverInfos, 
true);
+    return getDataIterator(
+        basePath, blockIdBitmap, taskIdBitmap, serverInfos, true, new 
ShuffleReadMetrics(), false);
   }
 
   private RssShuffleDataIterator getDataIterator(
@@ -124,7 +166,9 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
       Roaring64NavigableMap blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       List<ShuffleServerInfo> serverInfos,
-      boolean compress) {
+      boolean compress,
+      ShuffleReadMetrics metrics,
+      boolean isOverlappingDecompression) {
     ShuffleReadClientImpl readClient =
         ShuffleClientFactory.newReadBuilder()
             .clientType(ClientType.GRPC)
@@ -140,6 +184,9 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
             .blockIdBitmap(blockIdBitmap)
             .taskIdBitmap(taskIdBitmap)
             .shuffleServerInfoList(Lists.newArrayList(serverInfos))
+            .overlappingDecompressionEnabled(isOverlappingDecompression)
+            .codec(Codec.newInstance(new RssConf()).get())
+            .overlappingDecompressionThreadNum(1)
             .build();
     RssConf rc;
     if (!compress) {
@@ -149,7 +196,7 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
     } else {
       rc = new RssConf();
     }
-    return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient, new 
ShuffleReadMetrics(), rc);
+    return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient, metrics, 
rc);
   }
 
   @Test
@@ -323,7 +370,13 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
 
     RssShuffleDataIterator rssShuffleDataIterator =
         getDataIterator(
-            basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, 
ssi2), compress);
+            basePath,
+            blockIdBitmap,
+            taskIdBitmap,
+            Lists.newArrayList(ssi1, ssi2),
+            compress,
+            new ShuffleReadMetrics(),
+            false);
     Optional<Codec> codec =
         (Optional<Codec>) FieldUtils.readField(rssShuffleDataIterator, 
"codec", true);
     if (compress) {
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
index fdf1f84cd..d410f1620 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
@@ -339,7 +339,9 @@ public class RssTezFetcherTest {
       data.forEach(
           bytes -> {
             byte[] compressed = codec.compress(bytes);
-            blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed), 
bytes.length));
+            blocks.add(
+                new CompressedShuffleBlock(
+                    ByteBuffer.wrap(compressed), bytes.length, 
compressed.length));
           });
     }
 
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
index 2ea90a923..27efc656e 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
@@ -377,7 +377,9 @@ public class RssTezShuffleDataFetcherTest {
       data.forEach(
           bytes -> {
             byte[] compressed = codec.compress(bytes);
-            blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed), 
bytes.length));
+            blocks.add(
+                new CompressedShuffleBlock(
+                    ByteBuffer.wrap(compressed), bytes.length, 
compressed.length));
           });
     }
 
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index bd50caff2..b256ae30e 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -108,7 +108,8 @@ public class DecompressionWorker {
               f,
               waitMillis -> this.waitMillis.addAndGet(waitMillis),
               bufferSegment.getTaskAttemptId(),
-              fetchSecondsThreshold));
+              fetchSecondsThreshold,
+              bufferSegment.getLength()));
     }
   }
 
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index fe126d531..c8dfa86ea 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -316,7 +316,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
           compressedBuffer.position(bs.getOffset());
           compressedBuffer.limit(bs.getOffset() + bs.getLength());
           return new CompressedShuffleBlock(
-              compressedBuffer, bs.getUncompressLength(), 
bs.getTaskAttemptId());
+              compressedBuffer, bs.getUncompressLength(), 
bs.getTaskAttemptId(), bs.getLength());
         } else {
           DecompressedShuffleBlock block = decompressionWorker.get(batchIndex 
- 1, segmentIndex++);
           if (block == null) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
 
b/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
index 355c024f7..3b69a991b 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
@@ -21,21 +21,30 @@ import java.nio.ByteBuffer;
 
 public class CompressedShuffleBlock extends ShuffleBlock {
   private ByteBuffer byteBuffer;
-  private int uncompressLength;
+  private int uncompressedLength;
+  private int compressedLength;
 
-  public CompressedShuffleBlock(ByteBuffer byteBuffer, int uncompressLength) {
-    this(byteBuffer, uncompressLength, -1);
+  public CompressedShuffleBlock(
+      ByteBuffer byteBuffer, int uncompressedLength, int compressedlength) {
+    this(byteBuffer, uncompressedLength, -1, compressedlength);
   }
 
-  public CompressedShuffleBlock(ByteBuffer byteBuffer, int uncompressLength, 
long taskAttemptId) {
+  public CompressedShuffleBlock(
+      ByteBuffer byteBuffer, int uncompressedLength, long taskAttemptId, int 
compressedlength) {
     super(taskAttemptId);
     this.byteBuffer = byteBuffer;
-    this.uncompressLength = uncompressLength;
+    this.uncompressedLength = uncompressedLength;
+    this.compressedLength = compressedlength;
+  }
+
+  @Override
+  public int getCompressedLength() {
+    return compressedLength;
   }
 
   @Override
   public int getUncompressLength() {
-    return uncompressLength;
+    return uncompressedLength;
   }
 
   @Override
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 6f644ffd8..16a6215a2 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -28,16 +28,24 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
   private CompletableFuture<ByteBuffer> f;
   private Consumer<Long> waitMillisCallback;
   private final int fetchSecondsThreshold;
+  private final int compressedLength;
 
   public DecompressedShuffleBlock(
       CompletableFuture<ByteBuffer> f,
       Consumer<Long> consumer,
       long taskAttemptId,
-      int fetchSecondsThreshold) {
+      int fetchSecondsThreshold,
+      int compressedLength) {
     super(taskAttemptId);
     this.f = f;
     this.waitMillisCallback = consumer;
     this.fetchSecondsThreshold = fetchSecondsThreshold;
+    this.compressedLength = compressedLength;
+  }
+
+  @Override
+  public int getCompressedLength() {
+    return compressedLength;
   }
 
   @Override
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java 
b/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
index e240f4f3d..1c030752e 100644
--- a/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
+++ b/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
@@ -26,6 +26,8 @@ public abstract class ShuffleBlock {
     this.taskAttemptId = taskAttemptId;
   }
 
+  public abstract int getCompressedLength();
+
   public abstract int getUncompressLength();
 
   public abstract ByteBuffer getByteBuffer();

Reply via email to