This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 914251635 [#1472][part-5] Use UnpooledByteBufAllocator to fix 
inaccurate usedMemory issue causing OOM (#1534)
914251635 is described below

commit 914251635e582ec786c4b6579d09907246e178c5
Author: RickyMa <[email protected]>
AuthorDate: Fri Feb 23 18:27:21 2024 +0800

    [#1472][part-5] Use UnpooledByteBufAllocator to fix inaccurate usedMemory 
issue causing OOM (#1534)
    
    ### What changes were proposed in this pull request?
    
    When we use `UnpooledByteBufAllocator` to allocate off-heap `ByteBuf`, 
Netty directly requests off-heap memory from the operating system instead of 
allocating it according to `pageSize` and `chunkSize`. This way, we can obtain 
the exact `ByteBuf` size during the pre-allocation of memory, avoiding 
distortion of metrics such as `usedMemory`.
    
    Moreover, we have restored the code submission of the PR 
[#1521](https://github.com/apache/incubator-uniffle/pull/1521). We ensure that 
there is sufficient direct memory for the Netty server during decoding 
`sendShuffleDataRequest` by taking into account the `encodedLength` of 
`ByteBuf` in advance during the pre-allocation of memory, thus avoiding OOM 
during decoding `sendShuffleDataRequest`.
    
    Since we are not using `PooledByteBufAllocator`, the PR 
[#1524](https://github.com/apache/incubator-uniffle/pull/1524) is no longer 
needed.
    
    ### Why are the changes needed?
    
    A sub PR for: https://github.com/apache/incubator-uniffle/pull/1519
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../uniffle/common/netty/protocol/Decoders.java    |  4 +-
 .../netty/protocol/SendShuffleDataRequest.java     |  4 ++
 .../org/apache/uniffle/common/util/NettyUtils.java | 24 ++++++------
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    | 19 +++++-----
 .../uniffle/server/NettyDirectMemoryTracker.java   | 12 +-----
 .../uniffle/server/ShuffleServerGrpcService.java   |  6 +++
 .../uniffle/server/ShuffleServerMetrics.java       |  4 --
 .../server/buffer/ShuffleBufferManager.java        | 44 ++++++++++++++++++++--
 .../server/netty/ShuffleServerNettyHandler.java    | 33 +++++++++-------
 9 files changed, 97 insertions(+), 53 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
index 54924788c..b8c687ce7 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java
@@ -28,6 +28,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.ByteBufUtils;
+import org.apache.uniffle.common.util.NettyUtils;
 
 public class Decoders {
   public static ShuffleServerInfo decodeShuffleServerInfo(ByteBuf byteBuf) {
@@ -46,7 +47,8 @@ public class Decoders {
     long crc = byteBuf.readLong();
     long taskAttemptId = byteBuf.readLong();
     int dataLength = byteBuf.readInt();
-    ByteBuf data = byteBuf.retain().readSlice(dataLength);
+    ByteBuf data = 
NettyUtils.getNettyBufferAllocator().directBuffer(dataLength);
+    data.writeBytes(byteBuf, dataLength);
     int lengthOfShuffleServers = byteBuf.readInt();
     List<ShuffleServerInfo> serverInfos = Lists.newArrayList();
     for (int k = 0; k < lengthOfShuffleServers; k++) {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
index 317f4e7af..cab47693b 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
@@ -130,6 +130,10 @@ public class SendShuffleDataRequest extends RequestMessage 
{
     return requireId;
   }
 
+  public void setRequireId(long requireId) {
+    this.requireId = requireId;
+  }
+
   public Map<Integer, List<ShuffleBlockInfo>> getPartitionToBlocks() {
     return partitionToBlocks;
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
index 5f1c87ced..468d2a7a3 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java
@@ -19,8 +19,10 @@ package org.apache.uniffle.common.util;
 
 import java.util.concurrent.ThreadFactory;
 
+import io.netty.buffer.AbstractByteBufAllocator;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
@@ -39,6 +41,8 @@ import org.apache.uniffle.common.netty.protocol.Message;
 public class NettyUtils {
   private static final Logger logger = 
LoggerFactory.getLogger(NettyUtils.class);
 
+  private static final long MAX_DIRECT_MEMORY_IN_BYTES = 
PlatformDependent.maxDirectMemory();
+
   /** Creates a Netty EventLoopGroup based on the IOMode. */
   public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, 
String threadPrefix) {
     ThreadFactory threadFactory = 
ThreadUtils.getNettyThreadFactory(threadPrefix);
@@ -114,22 +118,18 @@ public class NettyUtils {
   }
 
   private static class AllocatorHolder {
-    private static final PooledByteBufAllocator INSTANCE = createAllocator();
+    private static final AbstractByteBufAllocator INSTANCE = 
createUnpooledByteBufAllocator(true);
   }
 
-  public static PooledByteBufAllocator getNettyBufferAllocator() {
+  public static AbstractByteBufAllocator getNettyBufferAllocator() {
     return AllocatorHolder.INSTANCE;
   }
 
-  private static PooledByteBufAllocator createAllocator() {
-    return new PooledByteBufAllocator(
-        true,
-        PooledByteBufAllocator.defaultNumHeapArena(),
-        PooledByteBufAllocator.defaultNumDirectArena(),
-        PooledByteBufAllocator.defaultPageSize(),
-        PooledByteBufAllocator.defaultMaxOrder(),
-        0,
-        0,
-        PooledByteBufAllocator.defaultUseCacheForAllThreads());
+  public static UnpooledByteBufAllocator 
createUnpooledByteBufAllocator(boolean preferDirect) {
+    return new UnpooledByteBufAllocator(preferDirect);
+  }
+
+  public static long getMaxDirectMemory() {
+    return MAX_DIRECT_MEMORY_IN_BYTES;
   }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index a8ee15490..7949ca925 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -103,7 +103,15 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
         }
       }
 
-      int allocateSize = size;
+      SendShuffleDataRequest sendShuffleDataRequest =
+          new SendShuffleDataRequest(
+              requestId(),
+              request.getAppId(),
+              shuffleId,
+              0L,
+              stb.getValue(),
+              System.currentTimeMillis());
+      int allocateSize = size + sendShuffleDataRequest.encodedLength();
       int finalBlockNum = blockNum;
       try {
         RetryUtils.retryWithCondition(
@@ -122,14 +130,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
                         allocateSize, host, port));
               }
 
-              SendShuffleDataRequest sendShuffleDataRequest =
-                  new SendShuffleDataRequest(
-                      requestId(),
-                      request.getAppId(),
-                      shuffleId,
-                      requireId,
-                      stb.getValue(),
-                      System.currentTimeMillis());
+              sendShuffleDataRequest.setRequireId(requireId);
               long start = System.currentTimeMillis();
               RpcResponse rpcResponse =
                   transportClient.sendRpcSync(sendShuffleDataRequest, 
rpcTimeout);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java 
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
index c3a31fd52..96206cc65 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
@@ -25,7 +25,6 @@ import io.netty.util.internal.PlatformDependent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.common.util.NettyUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
 public class NettyDirectMemoryTracker {
@@ -55,19 +54,10 @@ public class NettyDirectMemoryTracker {
         () -> {
           try {
             long usedDirectMemory = PlatformDependent.usedDirectMemory();
-            long allocatedDirectMemory =
-                
NettyUtils.getNettyBufferAllocator().metric().usedDirectMemory();
-            long pinnedDirectMemory = 
NettyUtils.getNettyBufferAllocator().pinnedDirectMemory();
             if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Current usedDirectMemory:{}, allocatedDirectMemory:{}, 
pinnedDirectMemory:{}",
-                  usedDirectMemory,
-                  allocatedDirectMemory,
-                  pinnedDirectMemory);
+              LOG.debug("Current usedDirectMemory:{}", usedDirectMemory);
             }
             
ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(usedDirectMemory);
-            
ShuffleServerMetrics.gaugeAllocatedDirectMemorySize.set(allocatedDirectMemory);
-            
ShuffleServerMetrics.gaugePinnedDirectMemorySize.set(pinnedDirectMemory);
           } catch (Throwable t) {
             LOG.error("Failed to report direct memory.", t);
           }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index ac9b95c34..ce7d2d62f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -256,6 +256,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       final long start = System.currentTimeMillis();
       List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
       long alreadyReleasedSize = 0;
+      boolean hasFailureOccurred = false;
       for (ShufflePartitionedData spd : shufflePartitionedData) {
         String shuffleDataInfo =
             "appId["
@@ -275,6 +276,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                     + ret;
             LOG.error(errorMsg);
             responseMessage = errorMsg;
+            hasFailureOccurred = true;
             break;
           } else {
             long toReleasedSize = spd.getTotalBlockSize();
@@ -293,9 +295,13 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
           ret = StatusCode.INTERNAL_ERROR;
           responseMessage = errorMsg;
           LOG.error(errorMsg);
+          hasFailureOccurred = true;
           break;
         }
       }
+      if (hasFailureOccurred) {
+        
shuffleServer.getShuffleBufferManager().releaseMemory(info.getRequireSize(), 
false, false);
+      }
       // since the required buffer id is only used once, the shuffle client 
would try to require
       // another buffer whether
       // current connection succeeded or not. Therefore, the 
preAllocatedBuffer is first get and
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 649e5044b..274cde008 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -186,8 +186,6 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeUsedBufferSize;
   public static Gauge.Child gaugeReadBufferUsedSize;
   public static Gauge.Child gaugeUsedDirectMemorySize;
-  public static Gauge.Child gaugeAllocatedDirectMemorySize;
-  public static Gauge.Child gaugePinnedDirectMemorySize;
   public static Gauge.Child gaugeWriteHandler;
   public static Gauge.Child gaugeEventQueueSize;
   public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize;
@@ -384,8 +382,6 @@ public class ShuffleServerMetrics {
     gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
     gaugeReadBufferUsedSize = 
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
     gaugeUsedDirectMemorySize = 
metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
-    gaugeAllocatedDirectMemorySize = 
metricsManager.addLabeledGauge(ALLOCATED_DIRECT_MEMORY_SIZE);
-    gaugePinnedDirectMemorySize = 
metricsManager.addLabeledGauge(PINNED_DIRECT_MEMORY_SIZE);
     gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
     gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
     gaugeHadoopFlushThreadPoolQueueSize =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 9f23f0fda..863670636 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -32,15 +32,18 @@ import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
 import com.google.common.collect.Sets;
 import com.google.common.collect.TreeRangeMap;
+import io.netty.util.internal.PlatformDependent;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.NettyUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleFlushManager;
@@ -68,6 +71,7 @@ public class ShuffleBufferManager {
   // Huge partition vars
   private long hugePartitionSizeThreshold;
   private long hugePartitionMemoryLimitSize;
+  private boolean nettyServerEnabled;
 
   protected long bufferSize = 0;
   protected AtomicLong preAllocatedSize = new AtomicLong(0L);
@@ -80,11 +84,16 @@ public class ShuffleBufferManager {
   protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = 
JavaUtils.newConcurrentMap();
 
   public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager 
shuffleFlushManager) {
+    this.nettyServerEnabled = conf.get(ShuffleServerConf.RPC_SERVER_TYPE) == 
ServerType.GRPC_NETTY;
     long heapSize = Runtime.getRuntime().maxMemory();
     this.capacity = 
conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
     if (this.capacity < 0) {
       this.capacity =
-          (long) (heapSize * 
conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
+          nettyServerEnabled
+              ? (long)
+                  (NettyUtils.getMaxDirectMemory()
+                      * 
conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO))
+              : (long) (heapSize * 
conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
     }
     this.readCapacity = 
conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
     if (this.readCapacity < 0) {
@@ -321,6 +330,25 @@ public class ShuffleBufferManager {
       if (isPreAllocated) {
         requirePreAllocatedSize(size);
       }
+      if (LOG.isDebugEnabled()) {
+        long usedDirectMemory = PlatformDependent.usedDirectMemory();
+        long usedHeapMemory =
+            Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+        LOG.debug(
+            "Require memory succeeded with "
+                + size
+                + " bytes, usedMemory["
+                + usedMemory.get()
+                + "] include preAllocation["
+                + preAllocatedSize.get()
+                + "], inFlushSize["
+                + inFlushSize.get()
+                + "], usedDirectMemory["
+                + usedDirectMemory
+                + "], usedHeapMemory["
+                + usedHeapMemory
+                + "]");
+      }
       return true;
     }
     if (LOG.isDebugEnabled()) {
@@ -372,7 +400,7 @@ public class ShuffleBufferManager {
               + inFlushSize.get()
               + "] is less than released["
               + size
-              + "], set allocated memory to 0");
+              + "], set in flush memory to 0");
       inFlushSize.set(0L);
     }
     ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
@@ -465,7 +493,17 @@ public class ShuffleBufferManager {
   }
 
   public void releasePreAllocatedSize(long delta) {
-    preAllocatedSize.addAndGet(-delta);
+    if (preAllocatedSize.get() >= delta) {
+      preAllocatedSize.addAndGet(-delta);
+    } else {
+      LOG.warn(
+          "Current pre-allocated memory["
+              + preAllocatedSize.get()
+              + "] is less than released["
+              + delta
+              + "], set pre-allocated memory to 0");
+      preAllocatedSize.set(0L);
+    }
     ShuffleServerMetrics.gaugeAllocatedBufferSize.set(preAllocatedSize.get());
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 184cde000..ac8973ecc 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -56,6 +56,7 @@ import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskManager;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -114,11 +115,13 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       }
     }
     int requireSize = 
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
+    int requireBlocksSize =
+        requireSize - req.encodedLength() < 0 ? 0 : requireSize - 
req.encodedLength();
 
     StatusCode ret = StatusCode.SUCCESS;
     String responseMessage = "OK";
     if (req.getPartitionToBlocks().size() > 0) {
-      ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
+      ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
       ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
       PreAllocatedBufferInfo info = 
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
       boolean isPreAllocated = info != null;
@@ -134,18 +137,21 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                 + appId
                 + "], shuffleId["
                 + shuffleId
-                + "]";
+                + "], probably because the pre-allocated buffer has expired. "
+                + "Please increase the expiration time using "
+                + ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED.key()
+                + " in ShuffleServer's configuration";
         LOG.warn(errorMsg);
-        responseMessage = errorMsg;
-        rpcResponse =
-            new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, 
responseMessage);
+        rpcResponse = new RpcResponse(req.getRequestId(), 
StatusCode.INTERNAL_ERROR, errorMsg);
         client.getChannel().writeAndFlush(rpcResponse);
         return;
       }
       final long start = System.currentTimeMillis();
+      ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
+      shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
       List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
       long alreadyReleasedSize = 0;
-      boolean isFailureOccurs = false;
+      boolean hasFailureOccurred = false;
       for (ShufflePartitionedData spd : shufflePartitionedData) {
         String shuffleDataInfo =
             "appId["
@@ -156,7 +162,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                 + spd.getPartitionId()
                 + "]";
         try {
-          if (isFailureOccurs) {
+          if (hasFailureOccurred) {
             continue;
           }
           ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, 
spd);
@@ -168,7 +174,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                     + ret;
             LOG.error(errorMsg);
             responseMessage = errorMsg;
-            isFailureOccurs = true;
+            hasFailureOccurred = true;
           } else {
             long toReleasedSize = spd.getTotalBlockSize();
             // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
@@ -186,11 +192,12 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           ret = StatusCode.INTERNAL_ERROR;
           responseMessage = errorMsg;
           LOG.error(errorMsg);
-          isFailureOccurs = true;
+          hasFailureOccurred = true;
         } finally {
           // Once the cache failure occurs, we should explicitly release data 
held by byteBuf
-          if (isFailureOccurs) {
+          if (hasFailureOccurred) {
             Arrays.stream(spd.getBlockList()).forEach(block -> 
block.getData().release());
+            shuffleBufferManager.releaseMemory(spd.getTotalBlockSize(), false, 
false);
           }
         }
       }
@@ -199,8 +206,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       // current connection succeeded or not. Therefore, the 
preAllocatedBuffer is first get and
       // removed, then after
       // cacheShuffleData finishes, the preAllocatedSize should be updated 
accordingly.
-      if (info.getRequireSize() > alreadyReleasedSize) {
-        manager.releasePreAllocatedSize(info.getRequireSize() - 
alreadyReleasedSize);
+      if (requireBlocksSize > alreadyReleasedSize) {
+        manager.releasePreAllocatedSize(requireBlocksSize - 
alreadyReleasedSize);
       }
       rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
       long costTime = System.currentTimeMillis() - start;
@@ -218,7 +225,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                 + " ms with "
                 + shufflePartitionedData.size()
                 + " blocks and "
-                + requireSize
+                + requireBlocksSize
                 + " bytes");
       }
     } else {

Reply via email to