rickyma commented on code in PR #1617:
URL: 
https://github.com/apache/incubator-uniffle/pull/1617#discussion_r1552023842


##########
server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java:
##########
@@ -98,6 +99,24 @@ public void exceptionCaught(Throwable cause, TransportClient 
client) {
   public void handleSendShuffleDataRequest(TransportClient client, 
SendShuffleDataRequest req) {
     RpcResponse rpcResponse;
     String appId = req.getAppId();
+
+    if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE

Review Comment:
   I think the code could be:
   ```
   public void handleSendShuffleDataRequest(TransportClient client, 
SendShuffleDataRequest req) {
       RpcResponse rpcResponse;
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       long requireBufferId = req.getRequireId();
   
       // The codes have been moved here
       ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
       PreAllocatedBufferInfo info = 
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
       boolean isPreAllocated = info != null;
       if (isPreAllocated) {
         ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
         shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
       }
   
       // Your new code should be put here
       ...
   
       long timestamp = req.getTimestamp();
   
       if (timestamp > 0) {
         /*
          * Here we record the transport time, but we don't consider the impact 
of data size on transport time.
          * The amount of data will not cause great fluctuations in latency. 
For example, 100K costs 1ms,
          * and 1M costs 10ms. This seems like a normal fluctuation, but it may 
rise to 10s when the server load is high.
          * In addition, we need to pay attention to that the time of the 
client machine and the machine
          * time of the Shuffle Server should be kept in sync. TransportTime is 
accurate only if this condition is met.
          * */
         long transportTime = System.currentTimeMillis() - timestamp;
         if (transportTime > 0) {
           shuffleServer
               .getNettyMetrics()
               .recordTransportTime(SendShuffleDataRequest.class.getName(), 
transportTime);
         }
       }
       int requireSize = 
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
       int requireBlocksSize =
           requireSize - req.encodedLength() < 0 ? 0 : requireSize - 
req.encodedLength();
   
       StatusCode ret = StatusCode.SUCCESS;
       String responseMessage = "OK";
       if (req.getPartitionToBlocks().size() > 0) {
         
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
         
         // Previous codes have been moved to the top
   
         if (!isPreAllocated) {
           req.getPartitionToBlocks().values().stream()
               .flatMap(Collection::stream)
               .forEach(block -> block.getData().release());
         }
   
         ...
   }
   ```
   
   You can check the logic again. The main purpose is to release the extra 
memory reserved during the pre-allocation phase immediately upon entering the 
`handleSendShuffleDataRequest` method, because this part of memory has already 
been released when decoding the `SendShuffleDataRequest` request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to