This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf4f6f1e [#1596] fix(netty): Use a ChannelFutureListener callback 
mechanism to release readMemory (#1605)
cbf4f6f1e is described below

commit cbf4f6f1e2e0d7b0b5e03b169f62c602fac30278
Author: RickyMa <[email protected]>
AuthorDate: Thu Mar 28 14:35:56 2024 +0800

    [#1596] fix(netty): Use a ChannelFutureListener callback mechanism to 
release readMemory (#1605)
    
    ### What changes were proposed in this pull request?
    
    1. Add a `ChannelFutureListener` and use its callback mechanism to release 
`readMemory` only after the `writeAndFlush` method is truly completed.
    2. Change the descriptions of configurations 
`rss.server.buffer.capacity.ratio` and `rss.server.read.buffer.capacity.ratio`.
    
    ### Why are the changes needed?
    
    This is actually a bug, which was introduced by PR 
https://github.com/apache/incubator-uniffle/pull/879. The issue has been 
present since the very beginning when the Netty feature was first integrated.
    Fix https://github.com/apache/incubator-uniffle/issues/1596.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I don't think we need new tests. Tested in our env.
    The new log will be:
    ```
    [2024-03-26 23:11:51.039] [epollEventLoopGroup-3-158] [INFO] 
ShuffleServerNettyHandler.operationComplete - Successfully executed 
getLocalShuffleData for appId[application_1703049085550_7359933_1711463990606], 
shuffleId[0], partitionId[1328], offset[0], length[14693742]. Took 1457 ms and 
retrieved 14693742 bytes of data
    [2024-03-26 23:11:51.040] [epollEventLoopGroup-3-130] [INFO] 
ShuffleServerNettyHandler.operationComplete - Successfully executed 
getMemoryShuffleData for 
appId[application_1703049085550_7359933_1711463990606], shuffleId[0], 
partitionId[1262]. Took 1 ms and retrieved 0 bytes of data
    [2024-03-26 23:11:51.068] [epollEventLoopGroup-3-177] [INFO] 
ShuffleServerNettyHandler.operationComplete - Successfully executed 
getLocalShuffleIndex for 
appId[application_1703049085550_7359933_1711463990606], shuffleId[0], 
partitionId[1366]. Took 918 ms and retrieved 1653600 bytes of data
    ```
---
 .../netty/protocol/GetLocalShuffleDataRequest.java |   5 +
 .../protocol/GetLocalShuffleIndexRequest.java      |   5 +
 .../protocol/GetMemoryShuffleDataRequest.java      |   5 +
 .../common/netty/protocol/RequestMessage.java      |   2 +
 .../netty/protocol/SendShuffleDataRequest.java     |   5 +
 docs/server_guide.md                               |   6 +-
 .../apache/uniffle/server/ShuffleServerConf.java   |   4 +-
 .../server/buffer/ShuffleBufferManager.java        |   7 +-
 .../server/netty/ShuffleServerNettyHandler.java    | 167 ++++++++++++++++-----
 9 files changed, 160 insertions(+), 46 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
index d80d0aa6d..b96c028fb 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
@@ -131,4 +131,9 @@ public class GetLocalShuffleDataRequest extends 
RequestMessage {
   public long getTimestamp() {
     return timestamp;
   }
+
+  @Override
+  public String getOperationType() {
+    return "getLocalShuffleData";
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
index 1ccdfae10..105fea051 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
@@ -93,4 +93,9 @@ public class GetLocalShuffleIndexRequest extends 
RequestMessage {
   public int getPartitionNum() {
     return partitionNum;
   }
+
+  @Override
+  public String getOperationType() {
+    return "getLocalShuffleIndex";
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
index d358cf7cd..13a241241 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
@@ -148,4 +148,9 @@ public class GetMemoryShuffleDataRequest extends 
RequestMessage {
   public Roaring64NavigableMap getExpectedTaskIdsBitmap() {
     return expectedTaskIdsBitmap;
   }
+
+  @Override
+  public String getOperationType() {
+    return "getMemoryShuffleData";
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
index cfa55287c..946f906cc 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
@@ -35,4 +35,6 @@ public abstract class RequestMessage extends Message {
   public long getRequestId() {
     return requestId;
   }
+
+  public abstract String getOperationType();
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
index 492b5b64b..a77b0d3c7 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
@@ -145,4 +145,9 @@ public class SendShuffleDataRequest extends RequestMessage {
   public void setTimestamp(long timestamp) {
     this.timestamp = timestamp;
   }
+
+  @Override
+  public String getOperationType() {
+    return "sendShuffleData";
+  }
 }
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 25224d6d6..efe0856a9 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -83,11 +83,11 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.server.netty.receive.buf                            | 0                  
                                                    | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system 
automatically estimates the receive buffer size based on default settings.      
                               [...]
 | rss.server.netty.send.buf                               | 0                  
                                                    | Send buffer size 
(SO_SNDBUF).                                                                    
                                                                                
                                                                                
                                                                                
                       [...]
 | rss.server.buffer.capacity                              | -1                 
                                                    | Max memory of buffer 
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used   
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.server.buffer.capacity.ratio                        | 0.8                
                                                    | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * 
ratio                                                                           
                                                                                
                                                                                
                                      [...]
+| rss.server.buffer.capacity.ratio                        | 0.8                
                                                    | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or 
off-heap size(when enabling Netty) * ratio                                      
                                                                                
                                                                                
                                     [...]
 | rss.server.memory.shuffle.highWaterMark.percentage      | 75.0               
                                                    | Threshold of spill data 
to storage, percentage of rss.server.buffer.capacity                            
                                                                                
                                                                                
                                                                                
                [...]
 | rss.server.memory.shuffle.lowWaterMark.percentage       | 25.0               
                                                    | Threshold of keep data in 
memory, percentage of rss.server.buffer.capacity                                
                                                                                
                                                                                
                                                                                
              [...]
 | rss.server.read.buffer.capacity                         | -1                 
                                                    | Max size of buffer for 
reading data. If negative, JVM heap size * read.buffer.ratio is used            
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.read.buffer.capacity.ratio                   | 0.4                
                                                    | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size * ratio                                                                    
                                                                                
                                                                                
                                       [...]
+| rss.server.read.buffer.capacity.ratio                   | 0.4                
                                                    | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size or off-heap size(when enabling Netty) * ratio                              
                                                                                
                                                                                
                                       [...]
 | rss.server.heartbeat.interval                           | 10000              
                                                    | Heartbeat interval to 
Coordinator (ms)                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 | rss.server.flush.localfile.threadPool.size              | 10                 
                                                    | Thread pool for flush 
data to local file                                                              
                                                                                
                                                                                
                                                                                
                  [...]
 | rss.server.flush.hadoop.threadPool.size                 | 60                 
                                                    | Thread pool for flush 
data to hadoop storage                                                          
                                                                                
                                                                                
                                                                                
                  [...]
@@ -104,7 +104,7 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.server.max.concurrency.of.per-partition.write       | 30                 
                                                    | The max concurrency of 
single partition writer, the data partition file number is equal to this value. 
Default value is 1. This config could improve the writing speed, especially for 
huge partition.                                                                 
                                                                                
                 [...]
 | rss.server.max.concurrency.limit.of.per-partition.write | -                  
                                                    | The limit for max 
concurrency per-partition write specified by client, this won't be enabled by 
default.                                                                        
                                                                                
                                                                                
                        [...]
 | rss.metrics.reporter.class                              | -                  
                                                    | The class of metrics 
reporter.                                                                       
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.server.hybrid.storage.manager.selector.class       | 
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                                
[...]
+| rss.server.hybrid.storage.manager.selector.class        | 
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                               
[...]
 | rss.server.disk-capacity.watermark.check.enabled        | false              
                                                    | If it is co-located with 
other services, the high-low watermark check based on the uniffle used is not 
correct. Due to this, the whole disk capacity watermark check is necessary, 
which will reuse the current watermark value. It will be disabled by default.   
                                                                                
                     [...]
 
 ### Advanced Configurations
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index e72510eae..9ea2e84f2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -42,7 +42,7 @@ public class ShuffleServerConf extends RssBaseConf {
           .doubleType()
           .defaultValue(0.6)
           .withDescription(
-              "JVM heap size * ratio for the maximum memory of buffer manager 
for shuffle server, this "
+              "JVM heap size or off-heap size(when enabling Netty) * ratio for 
the maximum memory of buffer manager for shuffle server, this "
                   + "is only effective when `rss.server.buffer.capacity` is 
not explicitly set");
 
   public static final ConfigOption<Long> SERVER_READ_BUFFER_CAPACITY =
@@ -56,7 +56,7 @@ public class ShuffleServerConf extends RssBaseConf {
           .doubleType()
           .defaultValue(0.2)
           .withDescription(
-              "JVM heap size * ratio for read buffer size, this is only 
effective when "
+              "JVM heap size or off-heap size(when enabling Netty) * ratio for 
read buffer size, this is only effective when "
                   + "`rss.server.reader.buffer.capacity.ratio` is not 
explicitly set");
 
   public static final ConfigOption<Long> SERVER_HEARTBEAT_DELAY =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ceca59211..4d42b0576 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -98,7 +98,12 @@ public class ShuffleBufferManager {
     this.readCapacity = 
conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
     if (this.readCapacity < 0) {
       this.readCapacity =
-          (long) (heapSize * 
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
+          nettyServerEnabled
+              ? (long)
+                  (NettyUtils.getMaxDirectMemory()
+                      * 
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO))
+              : (long)
+                  (heapSize * 
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
     }
     LOG.info(
         "Init shuffle buffer manager with capacity: {}, read buffer capacity: 
{}.",
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ac8973ecc..2e0c070e9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -253,7 +255,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), 
transportTime);
       }
     }
-    long start = System.currentTimeMillis();
+    final long start = System.currentTimeMillis();
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetMemoryShuffleDataResponse response;
@@ -262,8 +264,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
 
     // todo: if can get the exact memory size?
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize))
 {
+      ShuffleDataResult shuffleDataResult = null;
       try {
-        ShuffleDataResult shuffleDataResult =
+        shuffleDataResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getInMemoryShuffleData(
@@ -281,19 +284,18 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
           ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
         }
-        long costTime = System.currentTimeMillis() - start;
-        shuffleServer
-            .getNettyMetrics()
-            .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(), 
costTime);
-        LOG.info(
-            "Successfully getInMemoryShuffleData cost {} ms with {} bytes 
shuffle" + " data for {}",
-            costTime,
-            data.size(),
-            requestInfo);
-
         response =
             new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, 
bufferSegments, data);
+        ReleaseMemoryAndRecordReadTimeListener listener =
+            new ReleaseMemoryAndRecordReadTimeListener(
+                start, readBufferSize, data.size(), requestInfo, req, client);
+        client.getChannel().writeAndFlush(response).addListener(listener);
+        return;
       } catch (Exception e) {
+        
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
+        if (shuffleDataResult != null) {
+          shuffleDataResult.release();
+        }
         status = StatusCode.INTERNAL_ERROR;
         msg =
             "Error happened when get in memory shuffle data for "
@@ -304,8 +306,6 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         response =
             new GetMemoryShuffleDataResponse(
                 req.getRequestId(), status, msg, Lists.newArrayList(), 
Unpooled.EMPTY_BUFFER);
-      } finally {
-        
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
       }
     } else {
       status = StatusCode.INTERNAL_ERROR;
@@ -348,9 +348,10 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             .getShuffleServerConf()
             .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize))
 {
+      ShuffleIndexResult shuffleIndexResult = null;
       try {
         final long start = System.currentTimeMillis();
-        ShuffleIndexResult shuffleIndexResult =
+        shuffleIndexResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getShuffleIndex(appId, shuffleId, partitionId, 
partitionNumPerRange, partitionNum);
@@ -361,13 +362,16 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, data, 
shuffleIndexResult.getDataFileLen());
-        long readTime = System.currentTimeMillis() - start;
-        LOG.info(
-            "Successfully getShuffleIndex cost {} ms for {}" + " bytes with 
{}",
-            readTime,
-            data.size(),
-            requestInfo);
+        ReleaseMemoryAndRecordReadTimeListener listener =
+            new ReleaseMemoryAndRecordReadTimeListener(
+                start, assumedFileSize, data.size(), requestInfo, req, client);
+        client.getChannel().writeAndFlush(response).addListener(listener);
+        return;
       } catch (FileNotFoundException indexFileNotFoundException) {
+        
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
+        if (shuffleIndexResult != null) {
+          shuffleIndexResult.release();
+        }
         LOG.warn(
             "Index file for {} is not found, maybe the data has been flushed 
to cold storage.",
             requestInfo,
@@ -376,14 +380,16 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
       } catch (Exception e) {
+        
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
+        if (shuffleIndexResult != null) {
+          shuffleIndexResult.release();
+        }
         status = StatusCode.INTERNAL_ERROR;
         msg = "Error happened when get shuffle index for " + requestInfo + ", 
" + e.getMessage();
         LOG.error(msg, e);
         response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
-      } finally {
-        
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
       }
     } else {
       status = StatusCode.INTERNAL_ERROR;
@@ -418,7 +424,6 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetLocalShuffleDataResponse response;
-    ShuffleDataResult sdr;
     String requestInfo =
         "appId["
             + appId
@@ -426,11 +431,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             + shuffleId
             + "], partitionId["
             + partitionId
-            + "]"
-            + "offset["
+            + "], offset["
             + offset
-            + "]"
-            + "length["
+            + "], length["
             + length
             + "]";
 
@@ -445,8 +448,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     }
 
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) {
+      ShuffleDataResult sdr = null;
       try {
-        long start = System.currentTimeMillis();
+        final long start = System.currentTimeMillis();
         sdr =
             shuffleServer
                 .getShuffleTaskManager()
@@ -459,29 +463,27 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                     storageType,
                     offset,
                     length);
-        long readTime = System.currentTimeMillis() - start;
-        ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
         ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
         
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
-        shuffleServer
-            .getNettyMetrics()
-            .recordProcessTime(GetLocalShuffleDataRequest.class.getName(), 
readTime);
-        LOG.info(
-            "Successfully getShuffleData cost {} ms for shuffle" + " data with 
{}",
-            readTime,
-            requestInfo);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, sdr.getManagedBuffer());
+        ReleaseMemoryAndRecordReadTimeListener listener =
+            new ReleaseMemoryAndRecordReadTimeListener(
+                start, length, sdr.getDataLength(), requestInfo, req, client);
+        client.getChannel().writeAndFlush(response).addListener(listener);
+        return;
       } catch (Exception e) {
+        shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
+        if (sdr != null) {
+          sdr.release();
+        }
         status = StatusCode.INTERNAL_ERROR;
         msg = "Error happened when get shuffle data for " + requestInfo + ", " 
+ e.getMessage();
         LOG.error(msg, e);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, new 
NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
-      } finally {
-        shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
       }
     } else {
       status = StatusCode.INTERNAL_ERROR;
@@ -522,4 +524,89 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     }
     return ret;
   }
+
+  class ReleaseMemoryAndRecordReadTimeListener implements 
ChannelFutureListener {
+    private final long readStartedTime;
+    private final long readBufferSize;
+    private final long dataSize;
+    private final String requestInfo;
+    private final RequestMessage request;
+    private final TransportClient client;
+
+    ReleaseMemoryAndRecordReadTimeListener(
+        long readStartedTime,
+        long readBufferSize,
+        long dataSize,
+        String requestInfo,
+        RequestMessage request,
+        TransportClient client) {
+      this.readStartedTime = readStartedTime;
+      this.readBufferSize = readBufferSize;
+      this.dataSize = dataSize;
+      this.requestInfo = requestInfo;
+      this.request = request;
+      this.client = client;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) {
+      
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
+      long readTime = System.currentTimeMillis() - readStartedTime;
+      ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
+      
shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), 
readTime);
+      if (!future.isSuccess()) {
+        Throwable cause = future.cause();
+        String errorMsg =
+            "Error happened when executing "
+                + request.getOperationType()
+                + " for "
+                + requestInfo
+                + ", "
+                + cause.getMessage();
+        LOG.error(errorMsg, future.cause());
+        RpcResponse errorResponse;
+        if (request instanceof GetLocalShuffleDataRequest) {
+          errorResponse =
+              new GetLocalShuffleDataResponse(
+                  request.getRequestId(),
+                  StatusCode.INTERNAL_ERROR,
+                  errorMsg,
+                  new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
+        } else if (request instanceof GetLocalShuffleIndexRequest) {
+          errorResponse =
+              new GetLocalShuffleIndexResponse(
+                  request.getRequestId(),
+                  StatusCode.INTERNAL_ERROR,
+                  errorMsg,
+                  Unpooled.EMPTY_BUFFER,
+                  0L);
+        } else if (request instanceof GetMemoryShuffleDataRequest) {
+          errorResponse =
+              new GetMemoryShuffleDataResponse(
+                  request.getRequestId(),
+                  StatusCode.INTERNAL_ERROR,
+                  errorMsg,
+                  Lists.newArrayList(),
+                  Unpooled.EMPTY_BUFFER);
+        } else {
+          LOG.error("Cannot handle request {}", request.type());
+          return;
+        }
+        client.getChannel().writeAndFlush(errorResponse);
+        LOG.error(
+            "Failed to execute {} for {}. Took {} ms and could not retrieve {} 
bytes of data",
+            request.getOperationType(),
+            requestInfo,
+            readTime,
+            dataSize);
+      } else {
+        LOG.info(
+            "Successfully executed {} for {}. Took {} ms and retrieved {} 
bytes of data",
+            request.getOperationType(),
+            requestInfo,
+            readTime,
+            dataSize);
+      }
+    }
+  }
 }

Reply via email to