This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 270afaa78 [MINOR] improve(server): Add debug log when cacheShuffleData
(#2156)
270afaa78 is described below
commit 270afaa78453ac161078f84f29b1010de6776e99
Author: maobaolong <[email protected]>
AuthorDate: Sat Oct 12 14:17:00 2024 +0800
[MINOR] improve(server): Add debug log when cacheShuffleData (#2156)
### What changes were proposed in this pull request?
Add debug log when cacheShuffleData
### Why are the changes needed?
Supply a way to know the cacheShuffleData detail, include the partitionId,
blockCount, we can know well from this debug level log.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need.
---
.../uniffle/common/ShufflePartitionedBlock.java | 38 ++++++++++++++--------
.../uniffle/common/ShufflePartitionedData.java | 29 +++++++++++------
.../common/ShufflePartitionedBlockTest.java | 6 ++--
.../uniffle/server/ShuffleServerGrpcService.java | 29 ++++++++---------
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +--
.../server/buffer/AbstractShuffleBuffer.java | 8 ++---
.../server/buffer/ShuffleBufferManager.java | 9 ++++-
.../server/buffer/ShuffleBufferWithLinkedList.java | 10 +++---
.../server/buffer/ShuffleBufferWithSkipList.java | 10 +++---
.../server/netty/ShuffleServerNettyHandler.java | 31 ++++++++++--------
.../server/storage/SingleStorageManager.java | 2 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 6 ++--
.../buffer/ShuffleBufferWithLinkedListTest.java | 10 +++---
.../handler/impl/HadoopShuffleWriteHandler.java | 2 +-
.../handler/impl/LocalFileWriteHandler.java | 2 +-
.../storage/HadoopShuffleHandlerTestBase.java | 4 +--
.../handler/impl/HadoopShuffleReadHandlerTest.java | 2 +-
.../impl/LocalFileServerReadHandlerTest.java | 4 +--
18 files changed, 117 insertions(+), 89 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index 1ce68b6b6..e476e37f3 100644
---
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -24,7 +24,7 @@ import io.netty.buffer.Unpooled;
public class ShufflePartitionedBlock {
- private int length;
+ private int dataLength;
private long crc;
private long blockId;
private int uncompressLength;
@@ -32,8 +32,13 @@ public class ShufflePartitionedBlock {
private long taskAttemptId;
public ShufflePartitionedBlock(
- int length, int uncompressLength, long crc, long blockId, long
taskAttemptId, byte[] data) {
- this.length = length;
+ int dataLength,
+ int uncompressLength,
+ long crc,
+ long blockId,
+ long taskAttemptId,
+ byte[] data) {
+ this.dataLength = dataLength;
this.crc = crc;
this.blockId = blockId;
this.uncompressLength = uncompressLength;
@@ -42,8 +47,13 @@ public class ShufflePartitionedBlock {
}
public ShufflePartitionedBlock(
- int length, int uncompressLength, long crc, long blockId, long
taskAttemptId, ByteBuf data) {
- this.length = length;
+ int dataLength,
+ int uncompressLength,
+ long crc,
+ long blockId,
+ long taskAttemptId,
+ ByteBuf data) {
+ this.dataLength = dataLength;
this.crc = crc;
this.blockId = blockId;
this.uncompressLength = uncompressLength;
@@ -53,8 +63,8 @@ public class ShufflePartitionedBlock {
// calculate the data size for this block in memory including metadata which
are
// blockId, crc, taskAttemptId, length, uncompressLength
- public long getSize() {
- return length + 3 * 8 + 2 * 4;
+ public long getEncodedLength() {
+ return dataLength + 3 * 8 + 2 * 4;
}
@Override
@@ -66,7 +76,7 @@ public class ShufflePartitionedBlock {
return false;
}
ShufflePartitionedBlock that = (ShufflePartitionedBlock) o;
- return length == that.length
+ return dataLength == that.dataLength
&& crc == that.crc
&& blockId == that.blockId
&& data.equals(that.data);
@@ -74,15 +84,15 @@ public class ShufflePartitionedBlock {
@Override
public int hashCode() {
- return Objects.hash(length, crc, blockId, data);
+ return Objects.hash(dataLength, crc, blockId, data);
}
- public int getLength() {
- return length;
+ public int getDataLength() {
+ return dataLength;
}
- public void setLength(int length) {
- this.length = length;
+ public void setDataLength(int dataLength) {
+ this.dataLength = dataLength;
}
public long getCrc() {
@@ -126,7 +136,7 @@ public class ShufflePartitionedBlock {
return "ShufflePartitionedBlock{blockId["
+ blockId
+ "], length["
- + length
+ + dataLength
+ "], uncompressLength["
+ uncompressLength
+ "], crc["
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index 6793eccd0..fbc3c2797 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -20,7 +20,6 @@ package org.apache.uniffle.common;
import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.Pair;
public class ShufflePartitionedData {
@@ -28,23 +27,29 @@ public class ShufflePartitionedData {
new ShufflePartitionedBlock[] {};
private int partitionId;
private final ShufflePartitionedBlock[] blockList;
- private final long totalBlockSize;
+ private final long totalBlockEncodedLength;
+ private final long totalBlockDataLength;
- public ShufflePartitionedData(int partitionId, Pair<Long,
ShufflePartitionedBlock[]> pair) {
+ public ShufflePartitionedData(
+ int partitionId, long encodedLength, long dataLength,
ShufflePartitionedBlock[] blockList) {
this.partitionId = partitionId;
- this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST :
pair.getRight();
- totalBlockSize = pair.getLeft();
+ this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
+ totalBlockEncodedLength = encodedLength;
+ totalBlockDataLength = dataLength;
}
@VisibleForTesting
public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[]
blockList) {
this.partitionId = partitionId;
this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
- long size = 0L;
+ long encodedLength = 0L;
+ long dataLength = 0L;
for (ShufflePartitionedBlock block : this.blockList) {
- size += block.getSize();
+ encodedLength += block.getEncodedLength();
+ dataLength += block.getDataLength();
}
- totalBlockSize = size;
+ totalBlockEncodedLength = encodedLength;
+ totalBlockDataLength = dataLength;
}
@Override
@@ -68,7 +73,11 @@ public class ShufflePartitionedData {
return blockList;
}
- public long getTotalBlockSize() {
- return totalBlockSize;
+ public long getTotalBlockEncodedLength() {
+ return totalBlockEncodedLength;
+ }
+
+ public long getTotalBlockDataLength() {
+ return totalBlockDataLength;
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index 3f894ab9e..eb79ede63 100644
---
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -37,7 +37,7 @@ public class ShufflePartitionedBlockTest {
new Random().nextBytes(buf);
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 1, 2, 3, 1,
buf);
- assertEquals(1, b1.getLength());
+ assertEquals(1, b1.getDataLength());
assertEquals(2, b1.getCrc());
assertEquals(3, b1.getBlockId());
@@ -73,7 +73,7 @@ public class ShufflePartitionedBlockTest {
"ShufflePartitionedBlock{blockId["
+ b1.getBlockId()
+ "], length["
- + b1.getLength()
+ + b1.getDataLength()
+ "], uncompressLength["
+ b1.getUncompressLength()
+ "], crc["
@@ -87,6 +87,6 @@ public class ShufflePartitionedBlockTest {
@Test
public void testSize() {
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5,
new byte[6]);
- assertEquals(b1.getSize(), b1.getLength() + 3 * Long.BYTES + 2 *
Integer.BYTES);
+ assertEquals(b1.getEncodedLength(), b1.getDataLength() + 3 * Long.BYTES +
2 * Integer.BYTES);
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index c9375123f..d43d1e71f 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -34,7 +34,6 @@ import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -467,10 +466,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
return;
}
final long start = System.currentTimeMillis();
- List<ShufflePartitionedData> shufflePartitionedData =
toPartitionedData(req);
+ List<ShufflePartitionedData> shufflePartitionedDataList =
toPartitionedDataList(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
- for (ShufflePartitionedData spd : shufflePartitionedData) {
+ for (ShufflePartitionedData spd : shufflePartitionedDataList) {
String shuffleDataInfo =
"appId["
+ appId
@@ -496,7 +495,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
// TODO: Use ShuffleBufferWithSkipList to avoid caching block
here.
shuffleServer.getShuffleMergeManager().cacheBlock(appId,
shuffleId, spd);
}
- long toReleasedSize = spd.getTotalBlockSize();
+ long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
@@ -529,7 +528,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
if (hasFailureOccurred) {
shuffleServer
.getShuffleBufferManager()
- .releaseMemory(spd.getTotalBlockSize(), false, false);
+ .releaseMemory(spd.getTotalBlockEncodedLength(), false,
false);
}
}
}
@@ -559,7 +558,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
+ "], cost "
+ costTime
+ " ms with "
- + shufflePartitionedData.size()
+ + shufflePartitionedDataList.size()
+ " blocks and "
+ requireSize
+ " bytes");
@@ -1660,24 +1659,23 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
}
- private List<ShufflePartitionedData>
toPartitionedData(SendShuffleDataRequest req) {
+ private List<ShufflePartitionedData>
toPartitionedDataList(SendShuffleDataRequest req) {
List<ShufflePartitionedData> ret = Lists.newArrayList();
for (ShuffleData data : req.getShuffleDataList()) {
- ret.add(
- new ShufflePartitionedData(
- data.getPartitionId(), toPartitionedBlock(data.getBlockList())));
+ ret.add(toPartitionedData(data.getPartitionId(), data.getBlockList()));
}
return ret;
}
- private Pair<Long, ShufflePartitionedBlock[]>
toPartitionedBlock(List<ShuffleBlock> blocks) {
+ private ShufflePartitionedData toPartitionedData(int partitionId,
List<ShuffleBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
- return Pair.of(0L, new ShufflePartitionedBlock[] {});
+ return new ShufflePartitionedData(partitionId, 0L, 0L, new
ShufflePartitionedBlock[] {});
}
ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
- long size = 0L;
+ long encodedLength = 0L;
+ long dataLength = 0L;
int i = 0;
for (ShuffleBlock block : blocks) {
ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
@@ -1689,10 +1687,11 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
block.getBlockId(),
block.getTaskAttemptId(),
data);
- size += ret[i].getSize();
+ encodedLength += ret[i].getEncodedLength();
+ dataLength += ret[i].getDataLength();
i++;
}
- return Pair.of(size, ret);
+ return new ShufflePartitionedData(partitionId, encodedLength, dataLength,
ret);
}
private Map<Integer, long[]> toPartitionBlocksMap(List<PartitionToBlockIds>
partitionToBlockIds) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 8f7cfd4d4..e51ab5352 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -332,7 +332,7 @@ public class ShuffleTaskManager {
String appId, int shuffleId, boolean isPreAllocated,
ShufflePartitionedData spd) {
refreshAppId(appId);
long partitionSize = getPartitionDataSize(appId, shuffleId,
spd.getPartitionId());
- long deltaSize = spd.getTotalBlockSize();
+ long deltaSize = spd.getTotalBlockEncodedLength();
partitionSize += deltaSize;
// We do not need to check the huge partition size here, after old client
upgraded to this
// version,
@@ -506,7 +506,7 @@ public class ShuffleTaskManager {
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
- size += spb.getSize();
+ size += spb.getEncodedLength();
}
}
long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId,
partitionId, size);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 62e2728bf..f520603da 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -126,7 +126,7 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
} catch (Exception e) {
LOG.error(
"Unexpected exception for System.arraycopy, length["
- + block.getLength()
+ + block.getDataLength()
+ "], offset["
+ offset
+ "], dataLength["
@@ -135,7 +135,7 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
e);
throw e;
}
- offset += block.getLength();
+ offset += block.getDataLength();
}
}
@@ -168,13 +168,13 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
new BufferSegment(
block.getBlockId(),
currentOffset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
- currentOffset += block.getLength();
+ currentOffset += block.getDataLength();
// check if length >= request buffer size
if (currentOffset >= readBufferSize) {
break;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 18dd94c19..aac9be42e 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -226,10 +226,17 @@ public class ShuffleBufferManager {
Arrays.stream(spd.getBlockList())
.forEach(
b -> {
- int blockSize = b.getLength();
+ int blockSize = b.getDataLength();
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
});
}
+ LOG.debug(
+ "cache shuffle data, size: {}, blockCount: {}, appId: {}, shuffleId:
{}, partitionId: {}",
+ spd.getTotalBlockDataLength(),
+ spd.getBlockList().length,
+ appId,
+ shuffleId,
+ spd.getPartitionId());
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
flushSingleBufferIfNecessary(
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 6f94d7a50..9597cbfe3 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -58,7 +58,7 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
// block would gc without release. Here we must release the duplicated
block.
if (blocks.add(block)) {
- size += block.getSize();
+ size += block.getEncodedLength();
} else {
block.getData().release();
}
@@ -126,10 +126,10 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
for (ShufflePartitionedBlock spb : blocks) {
try {
spb.getData().release();
- releasedSize += spb.getSize();
+ releasedSize += spb.getEncodedLength();
} catch (Throwable t) {
lastException = t;
- failedToReleaseSize += spb.getSize();
+ failedToReleaseSize += spb.getEncodedLength();
}
}
if (lastException != null) {
@@ -261,13 +261,13 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
new BufferSegment(
block.getBlockId(),
currentOffset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
- currentOffset += block.getLength();
+ currentOffset += block.getDataLength();
if (currentOffset >= readBufferSize) {
break;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index d8ed3b2db..50e6d686f 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -68,7 +68,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
if (!blocksMap.containsKey(block.getBlockId())) {
blocksMap.put(block.getBlockId(), block);
blockCount++;
- size += block.getSize();
+ size += block.getEncodedLength();
} else {
block.getData().release();
}
@@ -127,10 +127,10 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
for (ShufflePartitionedBlock spb : blocksMap.values()) {
try {
spb.getData().release();
- releasedSize += spb.getSize();
+ releasedSize += spb.getEncodedLength();
} catch (Throwable t) {
lastException = t;
- failedToReleaseSize += spb.getSize();
+ failedToReleaseSize += spb.getEncodedLength();
}
}
if (lastException != null) {
@@ -249,13 +249,13 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
new BufferSegment(
block.getBlockId(),
currentOffset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId()));
readBlocks.add(block);
// update offset
- currentOffset += block.getLength();
+ currentOffset += block.getDataLength();
if (currentOffset >= readBufferSize) {
break;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index e40bf4d87..3c7bb736f 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -30,7 +30,6 @@ import io.netty.channel.ChannelFutureListener;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,7 +166,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ ", requireBlocksSize="
+ requireBlocksSize
+ ", stageAttemptNumber="
- + stageAttemptNumber);
+ + stageAttemptNumber
+ + ", partitionCount="
+ + req.getPartitionToBlocks().size());
ShuffleTaskInfo taskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
if (taskInfo == null) {
@@ -274,10 +275,10 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
final long start = System.currentTimeMillis();
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
- List<ShufflePartitionedData> shufflePartitionedData =
toPartitionedData(req);
+ List<ShufflePartitionedData> shufflePartitionedDataList =
toPartitionedDataList(req);
long alreadyReleasedSize = 0;
boolean hasFailureOccurred = false;
- for (ShufflePartitionedData spd : shufflePartitionedData) {
+ for (ShufflePartitionedData spd : shufflePartitionedDataList) {
String shuffleDataInfo =
"appId["
+ appId
@@ -301,7 +302,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
responseMessage = errorMsg;
hasFailureOccurred = true;
} else {
- long toReleasedSize = spd.getTotalBlockSize();
+ long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
@@ -333,7 +334,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
// Once the cache failure occurs, we should explicitly release
data held by byteBuf
if (hasFailureOccurred) {
Arrays.stream(spd.getBlockList()).forEach(block ->
block.getData().release());
- shuffleBufferManager.releaseMemory(spd.getTotalBlockSize(),
false, false);
+
shuffleBufferManager.releaseMemory(spd.getTotalBlockEncodedLength(), false,
false);
}
}
}
@@ -359,7 +360,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ "], cost "
+ costTime
+ " ms with "
- + shufflePartitionedData.size()
+ + shufflePartitionedDataList.size()
+ " blocks and "
+ requireBlocksSize
+ " bytes");
@@ -750,21 +751,22 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
}
- private List<ShufflePartitionedData>
toPartitionedData(SendShuffleDataRequest req) {
+ private List<ShufflePartitionedData>
toPartitionedDataList(SendShuffleDataRequest req) {
List<ShufflePartitionedData> ret = Lists.newArrayList();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry :
req.getPartitionToBlocks().entrySet()) {
- ret.add(new ShufflePartitionedData(entry.getKey(),
toPartitionedBlock(entry.getValue())));
+ ret.add(toPartitionedData(entry.getKey(), entry.getValue()));
}
return ret;
}
- private Pair<Long, ShufflePartitionedBlock[]>
toPartitionedBlock(List<ShuffleBlockInfo> blocks) {
+ private ShufflePartitionedData toPartitionedData(int partitionId,
List<ShuffleBlockInfo> blocks) {
if (blocks == null || blocks.size() == 0) {
- return Pair.of(0L, new ShufflePartitionedBlock[] {});
+ return new ShufflePartitionedData(partitionId, 0L, 0L, new
ShufflePartitionedBlock[] {});
}
ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
- long size = 0L;
+ long encodedLength = 0L;
+ long dataLength = 0L;
int i = 0;
for (ShuffleBlockInfo block : blocks) {
ret[i] =
@@ -775,10 +777,11 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
block.getBlockId(),
block.getTaskAttemptId(),
block.getData());
- size += ret[i].getSize();
+ encodedLength += ret[i].getEncodedLength();
+ dataLength += ret[i].getDataLength();
i++;
}
- return Pair.of(size, ret);
+ return new ShufflePartitionedData(partitionId, encodedLength, dataLength,
ret);
}
private StatusCode verifyRequest(String appId) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index fb974d5f5..936a7550f 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -137,7 +137,7 @@ public abstract class SingleStorageManager implements
StorageManager {
long length = 0;
long blockNum = 0;
for (ShufflePartitionedBlock block : event.getShuffleBlocks()) {
- length += block.getLength();
+ length += block.getDataLength();
blockNum++;
}
List<Integer> partitions = Lists.newArrayList();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index a0c06f541..2a67c548e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -295,14 +295,14 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
// case1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
- long size1 = partitionedData0.getTotalBlockSize();
+ long size1 = partitionedData0.getTotalBlockEncodedLength();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
assertEquals(size1,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
// case2
partitionedData0 = createPartitionedData(1, 1, 35);
- long size2 = partitionedData0.getTotalBlockSize();
+ long size2 = partitionedData0.getTotalBlockEncodedLength();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
assertEquals(size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
assertEquals(
@@ -1159,7 +1159,7 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
for (ShufflePartitionedBlock block : blocks) {
for (BufferSegment bs : bufferSegments) {
if (bs.getBlockId() == block.getBlockId()) {
- assertEquals(block.getLength(), bs.getLength());
+ assertEquals(block.getDataLength(), bs.getLength());
assertEquals(block.getCrc(), bs.getCrc());
matchNum++;
break;
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index f2037b58f..151232af2 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -599,14 +599,14 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
private byte[] getExpectedData(ShufflePartitionedData... spds) {
int size = 0;
for (ShufflePartitionedData spd : spds) {
- size += spd.getBlockList()[0].getLength();
+ size += spd.getBlockList()[0].getDataLength();
}
byte[] expectedData = new byte[size];
int offset = 0;
for (ShufflePartitionedData spd : spds) {
ShufflePartitionedBlock block = spd.getBlockList()[0];
- ByteBufUtils.readBytes(block.getData(), expectedData, offset,
block.getLength());
- offset += block.getLength();
+ ByteBufUtils.readBytes(block.getData(), expectedData, offset,
block.getDataLength());
+ offset += block.getDataLength();
}
return expectedData;
}
@@ -623,10 +623,10 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
ShufflePartitionedBlock spb = blocks.get(i);
BufferSegment segment = bufferSegments.get(segmentIndex);
assertEquals(spb.getBlockId(), segment.getBlockId());
- assertEquals(spb.getLength(), segment.getLength());
+ assertEquals(spb.getDataLength(), segment.getLength());
assertEquals(spb.getCrc(), segment.getCrc());
assertEquals(offset, segment.getOffset());
- offset += spb.getLength();
+ offset += spb.getDataLength();
segmentIndex++;
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index eb81c4089..d6339975a 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -132,7 +132,7 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
new FileBasedShuffleSegment(
blockId,
startOffset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
crc,
block.getTaskAttemptId());
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 4b06e5aa9..a00e33d2f 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -109,7 +109,7 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
new FileBasedShuffleSegment(
blockId,
startOffset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
crc,
block.getTaskAttemptId());
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
index ea29a4768..1e936b404 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
@@ -96,11 +96,11 @@ public class HadoopShuffleHandlerTestBase {
new FileBasedShuffleSegment(
spb.getBlockId(),
offset,
- spb.getLength(),
+ spb.getDataLength(),
spb.getUncompressLength(),
spb.getCrc(),
1);
- offset += spb.getLength();
+ offset += spb.getDataLength();
segments.add(segment);
if (doWrite) {
writer.writeData(ByteBufUtils.readBytes(spb.getData()));
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index d1b663f1f..1b2b7e93b 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -212,7 +212,7 @@ public class HadoopShuffleReadHandlerTest extends
HadoopTestBase {
new FileBasedShuffleSegment(
blockId,
startOffset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
crc,
block.getTaskAttemptId());
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
index 2a55ae4de..5b2db2daf 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -65,11 +65,11 @@ public class LocalFileServerReadHandlerTest {
new FileBasedShuffleSegment(
block.getBlockId(),
offset,
- block.getLength(),
+ block.getDataLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId());
- offset += block.getLength();
+ offset += block.getDataLength();
LocalFileHandlerTestBase.writeIndex(byteBuffer, segment);
}
},