rickyma commented on code in PR #1617:
URL:
https://github.com/apache/incubator-uniffle/pull/1617#discussion_r1552023842
##########
server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java:
##########
@@ -98,6 +99,24 @@ public void exceptionCaught(Throwable cause, TransportClient
client) {
public void handleSendShuffleDataRequest(TransportClient client,
SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();
+
+ if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
Review Comment:
I think the code could be:
```
public void handleSendShuffleDataRequest(TransportClient client,
SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
// The codes have been moved here
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (isPreAllocated) {
ShuffleBufferManager shuffleBufferManager =
shuffleServer.getShuffleBufferManager();
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
}
// Your new code should be put here
...
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact
of data size on transport time.
* The amount of data will not cause great fluctuations in latency.
For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may
rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the
client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is
accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(SendShuffleDataRequest.class.getName(),
transportTime);
}
}
int requireSize =
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize -
req.encodedLength();
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getPartitionToBlocks().size() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
// Previous codes have been moved to the top
if (!isPreAllocated) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
.forEach(block -> block.getData().release());
}
...
}
```
You can check the logic again. The main purpose is to release the extra
memory reserved during the pre-allocation phase immediately upon entering the
`handleSendShuffleDataRequest` method, because this part of memory has already
been released when decoding the `SendShuffleDataRequest` request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]