This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f736c7302 [#2648] fix(spark): Incorrect fetched bytes metric when
overlapping decompression is enabled (#2650)
f736c7302 is described below
commit f736c730262855bb97ee6fc2f3f5a6a461ae164c
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Nov 11 09:52:41 2025 +0800
[#2648] fix(spark): Incorrect fetched bytes metric when overlapping
decompression is enabled (#2650)
### What changes were proposed in this pull request?
Correct fetched bytes metric when overlapping decompression is enabled
### Why are the changes needed?
In the current codebase, the shuffle read bytes will be as the uncompressed
byte size, that is inconsistent with the writer side statistics.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal job tests.
---
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 4 +-
.../shuffle/reader/RssShuffleDataIterator.java | 5 +-
.../shuffle/reader/AbstractRssReaderTest.java | 3 +-
.../shuffle/reader/RssShuffleDataIteratorTest.java | 61 ++++++++++++++++++++--
.../common/shuffle/impl/RssTezFetcherTest.java | 4 +-
.../RssTezShuffleDataFetcherTest.java | 4 +-
.../uniffle/client/impl/DecompressionWorker.java | 3 +-
.../uniffle/client/impl/ShuffleReadClientImpl.java | 2 +-
.../client/response/CompressedShuffleBlock.java | 21 +++++---
.../client/response/DecompressedShuffleBlock.java | 10 +++-
.../uniffle/client/response/ShuffleBlock.java | 2 +
11 files changed, 99 insertions(+), 20 deletions(-)
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 85cc09614..674eaba94 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -601,7 +601,9 @@ public class FetcherTest {
data.forEach(
bytes -> {
byte[] compressed = codec.compress(bytes);
- blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed),
bytes.length));
+ blocks.add(
+ new CompressedShuffleBlock(
+ ByteBuffer.wrap(compressed), bytes.length,
compressed.length));
});
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index be1288488..8dfdbc7bf 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -156,9 +156,8 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
shuffleReadTaskStats.ifPresent(
stats -> stats.incPartitionBlock(partitionId,
shuffleBlock.getTaskAttemptId()));
// collect metrics from raw data
- long rawDataLength = rawData.limit() - rawData.position();
- totalRawBytesLength += rawDataLength;
- shuffleReadMetrics.incRemoteBytesRead(rawDataLength);
+ totalRawBytesLength += shuffleBlock.getCompressedLength();
+
shuffleReadMetrics.incRemoteBytesRead(shuffleBlock.getCompressedLength());
long startUncompression = System.currentTimeMillis();
// get initial data
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index 7099fd9eb..31df4dd54 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -134,7 +134,7 @@ public abstract class AbstractRssReaderTest extends
HadoopTestBase {
compress);
}
- protected void writeTestData(
+ protected List<ShufflePartitionedBlock> writeTestData(
ShuffleWriteHandler handler,
int blockNum,
int recordNum,
@@ -163,6 +163,7 @@ public abstract class AbstractRssReaderTest extends
HadoopTestBase {
serializeStream.close();
}
handler.write(blocks);
+ return blocks;
}
protected ShufflePartitionedBlock createShuffleBlock(byte[] data, long
blockId) {
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 0aff0706c..b97ba3f2d 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -46,6 +46,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
@@ -76,6 +77,46 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
Arguments.of(BlockIdLayout.DEFAULT),
Arguments.of(BlockIdLayout.from(20, 21, 22)));
}
+ @Test
+ public void readBytesMetricTestWithOverlappingDecompression() throws
Exception {
+ String basePath = HDFS_URI +
"readBytesMetricTestWithOverlappingDecompression";
+ HadoopShuffleWriteHandler writeHandler =
+ new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
+ Map<String, String> expectedData = Maps.newHashMap();
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ List<ShufflePartitionedBlock> blocksWritten =
+ writeTestData(
+ writeHandler,
+ 2,
+ 5,
+ BlockIdLayout.DEFAULT,
+ expectedData,
+ blockIdBitmap,
+ "key",
+ KRYO_SERIALIZER,
+ 0,
+ true);
+ long writtenBytes =
+ blocksWritten.stream().map(x -> x.getDataLength()).reduce((a, b) -> a
+ b).get();
+
+ ShuffleReadMetrics readMetrics = new ShuffleReadMetrics();
+ RssShuffleDataIterator rssShuffleDataIterator =
+ getDataIterator(
+ basePath,
+ blockIdBitmap,
+ taskIdBitmap,
+ Lists.newArrayList(ssi1),
+ true,
+ readMetrics,
+ true);
+ validateResult(rssShuffleDataIterator, expectedData, 10);
+
+ // case1: validate the reader side bytes whether to be consistent with the
written bytes
+ assertEquals(writtenBytes, readMetrics.remoteBytesRead());
+ assertEquals(10, readMetrics.recordsRead());
+ }
+
@ParameterizedTest
@MethodSource("testBlockIdLayouts")
public void readTest1(BlockIdLayout layout) throws Exception {
@@ -116,7 +157,8 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> serverInfos) {
- return getDataIterator(basePath, blockIdBitmap, taskIdBitmap, serverInfos,
true);
+ return getDataIterator(
+ basePath, blockIdBitmap, taskIdBitmap, serverInfos, true, new
ShuffleReadMetrics(), false);
}
private RssShuffleDataIterator getDataIterator(
@@ -124,7 +166,9 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> serverInfos,
- boolean compress) {
+ boolean compress,
+ ShuffleReadMetrics metrics,
+ boolean isOverlappingDecompression) {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
@@ -140,6 +184,9 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.shuffleServerInfoList(Lists.newArrayList(serverInfos))
+ .overlappingDecompressionEnabled(isOverlappingDecompression)
+ .codec(Codec.newInstance(new RssConf()).get())
+ .overlappingDecompressionThreadNum(1)
.build();
RssConf rc;
if (!compress) {
@@ -149,7 +196,7 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
} else {
rc = new RssConf();
}
- return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient, new
ShuffleReadMetrics(), rc);
+ return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient, metrics,
rc);
}
@Test
@@ -323,7 +370,13 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
RssShuffleDataIterator rssShuffleDataIterator =
getDataIterator(
- basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1,
ssi2), compress);
+ basePath,
+ blockIdBitmap,
+ taskIdBitmap,
+ Lists.newArrayList(ssi1, ssi2),
+ compress,
+ new ShuffleReadMetrics(),
+ false);
Optional<Codec> codec =
(Optional<Codec>) FieldUtils.readField(rssShuffleDataIterator,
"codec", true);
if (compress) {
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
index fdf1f84cd..d410f1620 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
@@ -339,7 +339,9 @@ public class RssTezFetcherTest {
data.forEach(
bytes -> {
byte[] compressed = codec.compress(bytes);
- blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed),
bytes.length));
+ blocks.add(
+ new CompressedShuffleBlock(
+ ByteBuffer.wrap(compressed), bytes.length,
compressed.length));
});
}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
index 2ea90a923..27efc656e 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
@@ -377,7 +377,9 @@ public class RssTezShuffleDataFetcherTest {
data.forEach(
bytes -> {
byte[] compressed = codec.compress(bytes);
- blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed),
bytes.length));
+ blocks.add(
+ new CompressedShuffleBlock(
+ ByteBuffer.wrap(compressed), bytes.length,
compressed.length));
});
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index bd50caff2..b256ae30e 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -108,7 +108,8 @@ public class DecompressionWorker {
f,
waitMillis -> this.waitMillis.addAndGet(waitMillis),
bufferSegment.getTaskAttemptId(),
- fetchSecondsThreshold));
+ fetchSecondsThreshold,
+ bufferSegment.getLength()));
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index fe126d531..c8dfa86ea 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -316,7 +316,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
compressedBuffer.position(bs.getOffset());
compressedBuffer.limit(bs.getOffset() + bs.getLength());
return new CompressedShuffleBlock(
- compressedBuffer, bs.getUncompressLength(),
bs.getTaskAttemptId());
+ compressedBuffer, bs.getUncompressLength(),
bs.getTaskAttemptId(), bs.getLength());
} else {
DecompressedShuffleBlock block = decompressionWorker.get(batchIndex
- 1, segmentIndex++);
if (block == null) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
b/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
index 355c024f7..3b69a991b 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/CompressedShuffleBlock.java
@@ -21,21 +21,30 @@ import java.nio.ByteBuffer;
public class CompressedShuffleBlock extends ShuffleBlock {
private ByteBuffer byteBuffer;
- private int uncompressLength;
+ private int uncompressedLength;
+ private int compressedLength;
- public CompressedShuffleBlock(ByteBuffer byteBuffer, int uncompressLength) {
- this(byteBuffer, uncompressLength, -1);
+ public CompressedShuffleBlock(
+ ByteBuffer byteBuffer, int uncompressedLength, int compressedlength) {
+ this(byteBuffer, uncompressedLength, -1, compressedlength);
}
- public CompressedShuffleBlock(ByteBuffer byteBuffer, int uncompressLength,
long taskAttemptId) {
+ public CompressedShuffleBlock(
+ ByteBuffer byteBuffer, int uncompressedLength, long taskAttemptId, int
compressedlength) {
super(taskAttemptId);
this.byteBuffer = byteBuffer;
- this.uncompressLength = uncompressLength;
+ this.uncompressedLength = uncompressedLength;
+ this.compressedLength = compressedlength;
+ }
+
+ @Override
+ public int getCompressedLength() {
+ return compressedLength;
}
@Override
public int getUncompressLength() {
- return uncompressLength;
+ return uncompressedLength;
}
@Override
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 6f644ffd8..16a6215a2 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -28,16 +28,24 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
private CompletableFuture<ByteBuffer> f;
private Consumer<Long> waitMillisCallback;
private final int fetchSecondsThreshold;
+ private final int compressedLength;
public DecompressedShuffleBlock(
CompletableFuture<ByteBuffer> f,
Consumer<Long> consumer,
long taskAttemptId,
- int fetchSecondsThreshold) {
+ int fetchSecondsThreshold,
+ int compressedLength) {
super(taskAttemptId);
this.f = f;
this.waitMillisCallback = consumer;
this.fetchSecondsThreshold = fetchSecondsThreshold;
+ this.compressedLength = compressedLength;
+ }
+
+ @Override
+ public int getCompressedLength() {
+ return compressedLength;
}
@Override
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
b/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
index e240f4f3d..1c030752e 100644
--- a/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
+++ b/client/src/main/java/org/apache/uniffle/client/response/ShuffleBlock.java
@@ -26,6 +26,8 @@ public abstract class ShuffleBlock {
this.taskAttemptId = taskAttemptId;
}
+ public abstract int getCompressedLength();
+
public abstract int getUncompressLength();
public abstract ByteBuffer getByteBuffer();