This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e4cde78 [#1888] feat(server): Reject requireBuffer/sendShuffleData 
for an application if one of the partitions exceeds the limit (#1889)
69e4cde78 is described below

commit 69e4cde78b0177376bcb79443642c4986de00ebd
Author: maobaolong <[email protected]>
AuthorDate: Mon Aug 5 16:21:46 2024 +0800

    [#1888] feat(server): Reject requireBuffer/sendShuffleData for an 
application if one of the partitions exceeds the limit (#1889)
    
    ### What changes were proposed in this pull request?
    
    Reject the `requireBuffer` and `sendShuffleData` requests for an 
application if one of the partitions exceeds the limit.
    Introduce a config to limit the maximum of partition size, the client will 
receive an exception with message to show the partition size and the configured 
max partition size.
    
    ### Why are the changes needed?
    
    Fix: #1888
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. this PR introduced a new config key 
`rss.server.huge-partition.size.hard.limit` with default value Long.MAX_VALUE 
to keep consistent with the previous code.
    
    ### How was this patch tested?
    
    Manually tested in our env:
    - Configure `rss.server.huge-partition.size.hard.limit` to a small size and 
wait for the expected exception.
---
 .../uniffle/common/ShufflePartitionedData.java     |  34 +++--
 ... => ExceedHugePartitionHardLimitException.java} |   8 +-
 .../common/exception/NoRegisterException.java      |   2 +-
 .../common/exception/NotRetryException.java        |   4 +
 .../org/apache/uniffle/common/rpc/StatusCode.java  |   1 +
 .../uniffle/common/ShufflePartitionedDataTest.java |   6 +-
 docs/server_guide.md                               |   7 +-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |   8 +-
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  28 +++-
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    |   7 +-
 proto/src/main/proto/Rss.proto                     |   2 +
 .../apache/uniffle/server/HugePartitionUtils.java  | 151 +++++++++++++++++++++
 .../apache/uniffle/server/ShuffleServerConf.java   |  11 ++
 .../uniffle/server/ShuffleServerGrpcService.java   |  38 +++++-
 .../uniffle/server/ShuffleServerMetrics.java       |   5 +
 .../apache/uniffle/server/ShuffleTaskManager.java  |  47 +++++--
 .../server/buffer/ShuffleBufferManager.java        |  41 +++---
 .../server/netty/ShuffleServerNettyHandler.java    |  21 ++-
 .../uniffle/server/ShuffleTaskManagerTest.java     |  13 +-
 .../server/buffer/ShuffleBufferManagerTest.java    |  17 ++-
 20 files changed, 369 insertions(+), 82 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index fb3c9c683..6793eccd0 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -19,14 +19,32 @@ package org.apache.uniffle.common;
 
 import java.util.Arrays;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
+
 public class ShufflePartitionedData {
 
+  private static final ShufflePartitionedBlock[] EMPTY_BLOCK_LIST =
+      new ShufflePartitionedBlock[] {};
   private int partitionId;
-  private ShufflePartitionedBlock[] blockList;
+  private final ShufflePartitionedBlock[] blockList;
+  private final long totalBlockSize;
 
+  public ShufflePartitionedData(int partitionId, Pair<Long, 
ShufflePartitionedBlock[]> pair) {
+    this.partitionId = partitionId;
+    this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST : 
pair.getRight();
+    totalBlockSize = pair.getLeft();
+  }
+
+  @VisibleForTesting
   public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[] 
blockList) {
     this.partitionId = partitionId;
-    this.blockList = blockList;
+    this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
+    long size = 0L;
+    for (ShufflePartitionedBlock block : this.blockList) {
+      size += block.getSize();
+    }
+    totalBlockSize = size;
   }
 
   @Override
@@ -47,20 +65,10 @@ public class ShufflePartitionedData {
   }
 
   public ShufflePartitionedBlock[] getBlockList() {
-    if (blockList == null) {
-      return new ShufflePartitionedBlock[] {};
-    }
     return blockList;
   }
 
   public long getTotalBlockSize() {
-    if (blockList == null) {
-      return 0L;
-    }
-    long size = 0;
-    for (ShufflePartitionedBlock block : blockList) {
-      size += block.getSize();
-    }
-    return size;
+    return totalBlockSize;
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
 
b/common/src/main/java/org/apache/uniffle/common/exception/ExceedHugePartitionHardLimitException.java
similarity index 76%
copy from 
common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
copy to 
common/src/main/java/org/apache/uniffle/common/exception/ExceedHugePartitionHardLimitException.java
index 930369d35..b8dfcf76a 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/exception/ExceedHugePartitionHardLimitException.java
@@ -17,16 +17,16 @@
 
 package org.apache.uniffle.common.exception;
 
-public class NoRegisterException extends RssException {
-  public NoRegisterException(String message) {
+public class ExceedHugePartitionHardLimitException extends NotRetryException {
+  public ExceedHugePartitionHardLimitException(String message) {
     super(message);
   }
 
-  public NoRegisterException(Throwable e) {
+  public ExceedHugePartitionHardLimitException(Throwable e) {
     super(e);
   }
 
-  public NoRegisterException(String message, Throwable e) {
+  public ExceedHugePartitionHardLimitException(String message, Throwable e) {
     super(message, e);
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
 
b/common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
index 930369d35..e0182ab0c 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/exception/NoRegisterException.java
@@ -17,7 +17,7 @@
 
 package org.apache.uniffle.common.exception;
 
-public class NoRegisterException extends RssException {
+public class NoRegisterException extends NotRetryException {
   public NoRegisterException(String message) {
     super(message);
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
 
b/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
index 5b230f6d7..c5cef6b02 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
@@ -23,6 +23,10 @@ public class NotRetryException extends RssException {
     super(message);
   }
 
+  public NotRetryException(Throwable e) {
+    super(e);
+  }
+
   public NotRetryException(String message, Throwable e) {
     super(message, e);
   }
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
index 4b891440d..6b413c531 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
@@ -36,6 +36,7 @@ public enum StatusCode {
   INVALID_REQUEST(9),
   NO_BUFFER_FOR_HUGE_PARTITION(10),
   STAGE_RETRY_IGNORE(11),
+  EXCEED_HUGE_PARTITION_HARD_LIMIT(12),
   APP_NOT_FOUND(13),
   INTERNAL_NOT_RETRY_ERROR(14),
   UNKNOWN(-1);
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedDataTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedDataTest.java
index a82b80022..bc20b373a 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedDataTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedDataTest.java
@@ -48,9 +48,9 @@ public class ShufflePartitionedDataTest {
             + Arrays.toString(data1.getBlockList())
             + "}",
         data1.toString());
-    ShufflePartitionedData data2 = new ShufflePartitionedData(0, null);
-    assertEquals("ShufflePartitionedData{partitionId=0, blockList=null}", 
data2.toString());
+    ShufflePartitionedData data2 = new ShufflePartitionedData(0, 
(ShufflePartitionedBlock[]) null);
+    assertEquals("ShufflePartitionedData{partitionId=0, blockList=[]}", 
data2.toString());
     data2.setPartitionId(1);
-    assertEquals("ShufflePartitionedData{partitionId=1, blockList=null}", 
data2.toString());
+    assertEquals("ShufflePartitionedData{partitionId=1, blockList=[]}", 
data2.toString());
   }
 }
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 3cc061c1e..af02084d6 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -125,7 +125,7 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.server.health.checker.script.execute.timeout | 5000    | Timeout for 
`HealthScriptChecker` execute health script.(ms)                                
                                                                                
                |
 
 ### Huge Partition Optimization
-A huge partition is a common problem for Spark/MR and so on, caused by data 
skew. And it can cause the shuffle server to become unstable. To solve this, we 
introduce some mechanisms to limit the writing of huge partitions to avoid 
affecting regular partitions, more details can be found in 
[ISSUE-378](https://github.com/apache/incubator-uniffle/issues/378). The basic 
rules for limiting large partitions are memory usage limits and flushing 
individual buffers directly to persistent storage.
+A huge partition is a common problem for Spark/MR and so on, caused by data 
skew. And it can cause the shuffle server to become unstable. To solve this, we 
introduce some mechanisms to limit the writing of huge partitions to avoid 
affecting regular partitions, and introduce a hard limit config to reject 
extremely huge partition, more details can be found in 
[ISSUE-378](https://github.com/apache/incubator-uniffle/issues/378). The basic 
rules for limiting large partitions are memory usage  [...]
 
 #### Memory usage limit
 To do this, we introduce the extra configs
@@ -144,6 +144,11 @@ For HADOOP FS, the conf value of 
`rss.server.single.buffer.flush.threshold` shou
 
 Finally, to improve the speed of writing to HDFS for a single partition, the 
value of `rss.server.max.concurrency.of.per-partition.write` and 
`rss.server.flush.hdfs.threadPool.size` could be increased to 50 or 100.
 
+#### Hard limit
+Once the huge partition reaches the hard limit size, which is set by the 
configuration `rss.server.huge-partition.size.hard.limit`, the server will 
reject the `sendShuffleData` request and the client will not retry. This allows 
the client to fail fast and enables the user to modify their SQLs or jobs to 
avoid reaching the partition hard limit.
+
+For example, if the hard limit is set to 50g, the server will reject the 
request if the partition size is greater than 50g, causing the job to 
eventually fail.
+
 ### Netty
 In version 0.8.0, we introduced Netty. Enabling Netty on ShuffleServer can 
significantly reduce GC time in high-throughput scenarios. We can enable Netty 
through the parameters `rss.server.netty.port` and `rss.rpc.server.type`. Note: 
After setting the parameter `rss.rpc.server.type` to `GRPC_NETTY`, 
ShuffleServer will be tagged with `GRPC_NETTY`, that is, the node can only be 
assigned to clients with `spark.rss.client.type=GRPC_NETTY`.
 
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index df3e29971..1ee10a8aa 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -531,7 +531,13 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
     // trigger NoBufferForHugePartitionException and get FAILED_REQUIRE_ID
     long requireId =
         shuffleServerClient.requirePreAllocation(
-            appId, shuffleId, Lists.newArrayList(partitionId), 
hugePartitionDataLength, 3, 100);
+            appId,
+            shuffleId,
+            Lists.newArrayList(partitionId),
+            Lists.newArrayList(hugePartitionDataLength),
+            hugePartitionDataLength,
+            3,
+            100);
     assertEquals(FAILED_REQUIRE_ID, requireId);
 
     // Add NoBufferForHugePartitionException check
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 988b7a7f0..cc645172c 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -132,7 +132,10 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
   protected static final int BACK_OFF_BASE = 2000;
   static final List<StatusCode> NOT_RETRY_STATUS_CODES =
       Lists.newArrayList(
-          StatusCode.NO_REGISTER, StatusCode.APP_NOT_FOUND, 
StatusCode.INTERNAL_NOT_RETRY_ERROR);
+          StatusCode.NO_REGISTER,
+          StatusCode.APP_NOT_FOUND,
+          StatusCode.INTERNAL_NOT_RETRY_ERROR,
+          StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT);
 
   @VisibleForTesting
   public ShuffleServerGrpcClient(String host, int port) {
@@ -262,7 +265,13 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
   public long requirePreAllocation(
       String appId, int requireSize, int retryMax, long retryIntervalMax) 
throws Exception {
     return requirePreAllocation(
-        appId, 0, Collections.emptyList(), requireSize, retryMax, 
retryIntervalMax);
+        appId,
+        0,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        requireSize,
+        retryMax,
+        retryIntervalMax);
   }
 
   @VisibleForTesting
@@ -270,6 +279,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       String appId,
       int shuffleId,
       List<Integer> partitionIds,
+      List<Integer> partitionRequireSizes,
       int requireSize,
       int retryMax,
       long retryIntervalMax) {
@@ -277,6 +287,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         appId,
         shuffleId,
         partitionIds,
+        partitionRequireSizes,
         requireSize,
         retryMax,
         retryIntervalMax,
@@ -287,6 +298,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       String appId,
       int shuffleId,
       List<Integer> partitionIds,
+      List<Integer> partitionRequireSizes,
       int requireSize,
       int retryMax,
       long retryIntervalMax,
@@ -295,6 +307,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         RequireBufferRequest.newBuilder()
             .setShuffleId(shuffleId)
             .addAllPartitionIds(partitionIds)
+            .addAllPartitionRequireSizes(partitionRequireSizes)
             .setAppId(appId)
             .setRequireSize(requireSize)
             .build();
@@ -373,7 +386,9 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             System.currentTimeMillis() - start);
       }
       result = rpcResponse.getRequireBufferId();
-    } else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
+    } else if (NOT_RETRY_STATUS_CODES.contains(
+        StatusCode.fromCode(rpcResponse.getStatus().getNumber()))) {
+      
failedStatusCodeRef.set(StatusCode.fromCode(rpcResponse.getStatus().getNumber()));
       String msg =
           "Can't require "
               + requireSize
@@ -518,9 +533,11 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       int blockNum = 0;
       int shuffleId = stb.getKey();
       List<Integer> partitionIds = new ArrayList<>();
+      List<Integer> partitionRequireSizes = new ArrayList<>();
 
       for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : 
stb.getValue().entrySet()) {
         List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
+        int partitionRequireSize = 0;
         for (ShuffleBlockInfo sbi : ptb.getValue()) {
           shuffleBlocks.add(
               ShuffleBlock.newBuilder()
@@ -531,15 +548,17 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
                   .setUncompressLength(sbi.getUncompressLength())
                   
.setData(UnsafeByteOperations.unsafeWrap(sbi.getData().nioBuffer()))
                   .build());
-          size += sbi.getSize();
+          partitionRequireSize += sbi.getSize();
           blockNum++;
         }
+        size += partitionRequireSize;
         shuffleData.add(
             ShuffleData.newBuilder()
                 .setPartitionId(ptb.getKey())
                 .addAllBlock(shuffleBlocks)
                 .build());
         partitionIds.add(ptb.getKey());
+        partitionRequireSizes.add(partitionRequireSize);
       }
 
       final int allocateSize = size;
@@ -552,6 +571,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
                       appId,
                       shuffleId,
                       partitionIds,
+                      partitionRequireSizes,
                       allocateSize,
                       request.getRetryMax() / maxRetryAttempts,
                       request.getRetryIntervalMax(),
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 6da8788d8..5d3303aac 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -149,12 +149,16 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
       int size = 0;
       int blockNum = 0;
       List<Integer> partitionIds = new ArrayList<>();
+      List<Integer> partitionRequireSizes = new ArrayList<>();
       for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : 
stb.getValue().entrySet()) {
+        int partitionRequireSize = 0;
         for (ShuffleBlockInfo sbi : ptb.getValue()) {
-          size += sbi.getSize();
+          partitionRequireSize += sbi.getSize();
           blockNum++;
         }
+        size += partitionRequireSize;
         partitionIds.add(ptb.getKey());
+        partitionRequireSizes.add(partitionRequireSize);
       }
 
       SendShuffleDataRequest sendShuffleDataRequest =
@@ -177,6 +181,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
                       request.getAppId(),
                       shuffleId,
                       partitionIds,
+                      partitionRequireSizes,
                       allocateSize,
                       request.getRetryMax(),
                       request.getRetryIntervalMax(),
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 694a95ede..0bf258ad9 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -55,6 +55,7 @@ message RequireBufferRequest {
   string appId = 2;
   int32 shuffleId = 3;
   repeated int32 partitionIds = 4;
+  repeated int32 partitionRequireSizes = 5;
 }
 
 message RequireBufferResponse {
@@ -312,6 +313,7 @@ enum StatusCode {
   INVALID_REQUEST = 9;
   NO_BUFFER_FOR_HUGE_PARTITION = 10;
   STAGE_RETRY_IGNORE = 11;
+  EXCEED_HUGE_PARTITION_HARD_LIMIT = 12;
   APP_NOT_FOUND = 13;
   INTERNAL_NOT_RETRY_ERROR = 14;
   // add more status
diff --git 
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java 
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
new file mode 100644
index 000000000..1db5c349c
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+
+/** Huge partition utils. */
+public class HugePartitionUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HugePartitionUtils.class);
+
+  /**
+   * Check if the partition size exceeds the huge partition size threshold 
limit.
+   *
+   * @param shuffleBufferManager the shuffle buffer manager
+   * @param usedPartitionDataSize the used partition data size
+   * @return true if the partition size exceeds the huge partition size 
threshold limit, otherwise
+   *     false
+   */
+  public static boolean isHugePartition(
+      ShuffleBufferManager shuffleBufferManager, long usedPartitionDataSize) {
+    return usedPartitionDataSize > 
shuffleBufferManager.getHugePartitionSizeThreshold();
+  }
+
+  /**
+   * Check if the partition is huge partition.
+   *
+   * @param shuffleTaskManager the shuffle task manager
+   * @param appId the app id
+   * @param shuffleId the shuffle id
+   * @param partitionId the partition id
+   * @return true if the partition is huge partition, otherwise false
+   */
+  public static boolean isHugePartition(
+      ShuffleTaskManager shuffleTaskManager, String appId, int shuffleId, int 
partitionId) {
+    return shuffleTaskManager != null
+        && shuffleTaskManager.getShuffleTaskInfo(appId) != null
+        && 
shuffleTaskManager.getShuffleTaskInfo(appId).isHugePartition(shuffleId, 
partitionId);
+  }
+
+  /**
+   * Check if the partition size exceeds the huge partition size hard limit.
+   *
+   * @param shuffleBufferManager the shuffle buffer manager
+   * @param usedPartitionDataSize the used partition data size
+   * @return true if the partition size exceeds the huge partition size hard 
limit, otherwise false
+   */
+  public static boolean hasPartitionExceededHugeHardLimit(
+      ShuffleBufferManager shuffleBufferManager, long usedPartitionDataSize) {
+    return usedPartitionDataSize > 
shuffleBufferManager.getHugePartitionSizeHardLimit();
+  }
+
+  /**
+   * Mark the partition as huge partition if the partition size exceeds the 
huge partition threshold
+   * size.
+   *
+   * @param shuffleBufferManager the shuffle buffer manager
+   * @param shuffleTaskInfo the shuffle task info
+   * @param shuffleId the shuffle id
+   * @param partitionId the partition id
+   * @param partitionSize the partition size
+   */
+  public static void markHugePartition(
+      ShuffleBufferManager shuffleBufferManager,
+      ShuffleTaskInfo shuffleTaskInfo,
+      int shuffleId,
+      int partitionId,
+      long partitionSize) {
+    if (isHugePartition(shuffleBufferManager, partitionSize)) {
+      shuffleTaskInfo.markHugePartition(shuffleId, partitionId);
+    }
+  }
+
+  /**
+   * Check if the partition size exceeds the huge hard limit size.
+   *
+   * @param operation the operation name
+   * @param shuffleBufferManager the shuffle buffer manager
+   * @param partitionSize the partition size
+   * @param increaseSize the increase size
+   */
+  public static void checkExceedPartitionHardLimit(
+      String operation,
+      ShuffleBufferManager shuffleBufferManager,
+      long partitionSize,
+      long increaseSize) {
+    if 
(HugePartitionUtils.hasPartitionExceededHugeHardLimit(shuffleBufferManager, 
partitionSize)) {
+      throw new ExceedHugePartitionHardLimitException(
+          operation
+              + ": Current partition size: "
+              + partitionSize
+              + " exceeded the huge hard limit size: "
+              + shuffleBufferManager.getHugePartitionSizeHardLimit()
+              + " if cache this shuffle data with size: "
+              + increaseSize);
+    }
+  }
+
+  /**
+   * Check if the partition size exceeds the huge memory limit, if so, trigger 
memory limitation.
+   *
+   * @param shuffleBufferManager the shuffle buffer manager
+   * @param appId the app id
+   * @param shuffleId the shuffle id
+   * @param partitionId the partition id
+   * @param usedPartitionDataSize the used partition data size
+   */
+  public static boolean limitHugePartition(
+      ShuffleBufferManager shuffleBufferManager,
+      String appId,
+      int shuffleId,
+      int partitionId,
+      long usedPartitionDataSize) {
+    if (usedPartitionDataSize > 
shuffleBufferManager.getHugePartitionSizeThreshold()) {
+      long memoryUsed =
+          shuffleBufferManager
+              .getShuffleBufferEntry(appId, shuffleId, partitionId)
+              .getValue()
+              .getSize();
+      if (memoryUsed > shuffleBufferManager.getHugePartitionMemoryLimitSize()) 
{
+        LOG.warn(
+            "AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
+                + "huge partition triggered memory limitation.",
+            appId,
+            shuffleId,
+            partitionId,
+            memoryUsed);
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 92e5a46fa..a60113202 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -499,6 +499,17 @@ public class ShuffleServerConf extends RssBaseConf {
                   + HUGE_PARTITION_SIZE_THRESHOLD.key()
                   + "'");
 
+  public static final ConfigOption<Long> HUGE_PARTITION_SIZE_HARD_LIMIT =
+      ConfigOptions.key("rss.server.huge-partition.size.hard.limit")
+          .longType()
+          .defaultValue(Long.MAX_VALUE)
+          .withDescription(
+              "This option sets the maximum allowable partition size 
threshold. "
+                  + "If the partition size exceeds this threshold, the client 
will "
+                  + "receive an error message and the transmission of shuffle 
data "
+                  + "will be terminated. This helps to significantly improve 
the "
+                  + "stability of the cluster by preventing partitions from 
becoming too large.");
+
   public static final ConfigOption<Long> SERVER_DECOMMISSION_CHECK_INTERVAL =
       ConfigOptions.key("rss.server.decommission.check.interval")
           .longType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index dada1765c..22e7aa01d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -33,6 +33,7 @@ import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.audit.AuditContext;
 import org.apache.uniffle.common.config.RssBaseConf;
+import 
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
 import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.exception.NoBufferException;
 import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
@@ -429,6 +431,17 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               manager.updateCachedBlockIds(
                   appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
             }
+          } catch (ExceedHugePartitionHardLimitException e) {
+            String errorMsg =
+                "ExceedHugePartitionHardLimitException Error happened when 
shuffleEngine.write for "
+                    + shuffleDataInfo
+                    + ": "
+                    + e.getMessage();
+            
ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
+            ret = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
+            responseMessage = errorMsg;
+            LOG.error(errorMsg);
+            hasFailureOccurred = true;
           } catch (Exception e) {
             String errorMsg =
                 "Error happened when shuffleEngine.write for "
@@ -621,6 +634,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         return;
       }
       long requireBufferId = -1;
+      String responseMessage = "";
+      String shuffleDataInfo = "appId[" + appId + "], shuffleId[" + 
request.getShuffleId() + "]";
       try {
         if (StringUtils.isEmpty(appId)) {
           // To be compatible with older client version
@@ -634,25 +649,40 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                       appId,
                       request.getShuffleId(),
                       request.getPartitionIdsList(),
+                      request.getPartitionRequireSizesList(),
                       request.getRequireSize());
         }
       } catch (NoBufferException e) {
+        responseMessage = e.getMessage();
         status = StatusCode.NO_BUFFER;
         
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.inc();
         ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
       } catch (NoBufferForHugePartitionException e) {
+        responseMessage = e.getMessage();
         status = StatusCode.NO_BUFFER_FOR_HUGE_PARTITION;
         
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
         ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
       } catch (NoRegisterException e) {
+        responseMessage = e.getMessage();
         status = StatusCode.NO_REGISTER;
         ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
+      } catch (ExceedHugePartitionHardLimitException e) {
+        status = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
+        ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
+        ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
+        responseMessage =
+            "ExceedHugePartitionHardLimitException Error happened when 
requireBuffer for "
+                + shuffleDataInfo
+                + ": "
+                + e.getMessage();
+        LOG.error(responseMessage);
       }
       auditContext.setStatusCode(status);
       RequireBufferResponse response =
           RequireBufferResponse.newBuilder()
               .setStatus(status.toProto())
               .setRequireBufferId(requireBufferId)
+              .setRetMsg(responseMessage)
               .build();
       responseObserver.onNext(response);
       responseObserver.onCompleted();
@@ -1329,11 +1359,12 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     return ret;
   }
 
-  private ShufflePartitionedBlock[] toPartitionedBlock(List<ShuffleBlock> 
blocks) {
+  private Pair<Long, ShufflePartitionedBlock[]> 
toPartitionedBlock(List<ShuffleBlock> blocks) {
     if (blocks == null || blocks.size() == 0) {
-      return new ShufflePartitionedBlock[] {};
+      return Pair.of(0L, new ShufflePartitionedBlock[] {});
     }
     ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
+    long size = 0L;
     int i = 0;
     for (ShuffleBlock block : blocks) {
       ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
@@ -1345,9 +1376,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               block.getBlockId(),
               block.getTaskAttemptId(),
               data);
+      size += ret[i].getSize();
       i++;
     }
-    return ret;
+    return Pair.of(size, ret);
   }
 
   private Map<Integer, long[]> toPartitionBlocksMap(List<PartitionToBlockIds> 
partitionToBlockIds) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 309cee553..bac820520 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -130,6 +130,8 @@ public class ShuffleServerMetrics {
       "total_app_with_huge_partition_num";
   private static final String TOTAL_PARTITION_NUM = "total_partition_num";
   private static final String TOTAL_HUGE_PARTITION_NUM = 
"total_huge_partition_num";
+  private static final String TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM =
+      "total_huge_partition_exceed_hard_limit_num";
 
   private static final String HUGE_PARTITION_NUM = "huge_partition_num";
   private static final String APP_WITH_HUGE_PARTITION_NUM = 
"app_with_huge_partition_num";
@@ -157,6 +159,7 @@ public class ShuffleServerMetrics {
   public static Counter.Child counterTotalAppWithHugePartitionNum;
   public static Counter.Child counterTotalPartitionNum;
   public static Counter.Child counterTotalHugePartitionNum;
+  public static Counter.Child counterTotalHugePartitionExceedHardLimitNum;
 
   public static Counter.Child counterTotalReceivedDataSize;
   public static Counter.Child counterTotalWriteDataSize;
@@ -421,6 +424,8 @@ public class ShuffleServerMetrics {
         metricsManager.addLabeledCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
     counterTotalPartitionNum = 
metricsManager.addLabeledCounter(TOTAL_PARTITION_NUM);
     counterTotalHugePartitionNum = 
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
+    counterTotalHugePartitionExceedHardLimitNum =
+        
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
 
     gaugeLocalStorageIsWritable =
         metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, 
LOCAL_DISK_PATH_LABEL);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index f90ec7c8a..8dc1653ed 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -316,6 +316,14 @@ public class ShuffleTaskManager {
   public StatusCode cacheShuffleData(
       String appId, int shuffleId, boolean isPreAllocated, 
ShufflePartitionedData spd) {
     refreshAppId(appId);
+    long partitionSize = getPartitionDataSize(appId, shuffleId, 
spd.getPartitionId());
+    long deltaSize = spd.getTotalBlockSize();
+    partitionSize += deltaSize;
+    // We do not need to check the huge partition size here, after old client 
upgraded to this
+    // version,
+    // since huge partition size is limited when requireBuffer is called.
+    HugePartitionUtils.checkExceedPartitionHardLimit(
+        "cacheShuffleData", shuffleBufferManager, partitionSize, deltaSize);
     return shuffleBufferManager.cacheShuffleData(appId, shuffleId, 
isPreAllocated, spd);
   }
 
@@ -487,9 +495,8 @@ public class ShuffleTaskManager {
       }
     }
     long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, 
partitionId, size);
-    if (shuffleBufferManager.isHugePartition(partitionSize)) {
-      shuffleTaskInfo.markHugePartition(shuffleId, partitionId);
-    }
+    HugePartitionUtils.markHugePartition(
+        shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId, 
partitionSize);
   }
 
   public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
@@ -517,22 +524,34 @@ public class ShuffleTaskManager {
   }
 
   public long requireBuffer(
-      String appId, int shuffleId, List<Integer> partitionIds, int 
requireSize) {
+      String appId,
+      int shuffleId,
+      List<Integer> partitionIds,
+      List<Integer> partitionRequireSizes,
+      int requireSize) {
     ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
     if (null == shuffleTaskInfo) {
       LOG.error("No such app is registered. appId: {}, shuffleId: {}", appId, 
shuffleId);
       throw new NoRegisterException("No such app is registered. appId: " + 
appId);
     }
-    for (int partitionId : partitionIds) {
-      long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, 
partitionId);
-      if (shuffleBufferManager.limitHugePartition(
-          appId, shuffleId, partitionId, partitionUsedDataSize)) {
-        String errorMessage =
-            String.format(
-                "Huge partition is limited to writing. appId: %s, shuffleId: 
%s, partitionIds: %s, partitionUsedDataSize: %s",
-                appId, shuffleId, partitionIds, partitionUsedDataSize);
-        LOG.error(errorMessage);
-        throw new NoBufferForHugePartitionException(errorMessage);
+    // To be compatible with legacy clients which have empty 
partitionRequireSizes
+    if (partitionIds.size() == partitionRequireSizes.size()) {
+      for (int i = 0; i < partitionIds.size(); i++) {
+        int partitionId = partitionIds.get(i);
+        int partitionRequireSize = partitionRequireSizes.get(i);
+        long partitionUsedDataSize =
+            getPartitionDataSize(appId, shuffleId, partitionId) + 
partitionRequireSize;
+        if (HugePartitionUtils.limitHugePartition(
+            shuffleBufferManager, appId, shuffleId, partitionId, 
partitionUsedDataSize)) {
+          String errorMessage =
+              String.format(
+                  "Huge partition is limited to writing. appId: %s, shuffleId: 
%s, partitionIds: %s, partitionUsedDataSize: %s",
+                  appId, shuffleId, partitionIds, partitionUsedDataSize);
+          LOG.error(errorMessage);
+          throw new NoBufferForHugePartitionException(errorMessage);
+        }
+        HugePartitionUtils.checkExceedPartitionHardLimit(
+            "requireBuffer", shuffleBufferManager, partitionUsedDataSize, 
partitionRequireSize);
       }
     }
     return requireBuffer(appId, requireSize);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 60c37d053..1b432abee 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -47,6 +47,7 @@ import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.NettyUtils;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.HugePartitionUtils;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -73,6 +74,7 @@ public class ShuffleBufferManager {
   private long shuffleFlushThreshold;
   // Huge partition vars
   private ReconfigurableConfManager.Reconfigurable<Long> 
hugePartitionSizeThresholdRef;
+  private ReconfigurableConfManager.Reconfigurable<Long> 
hugePartitionSizeHardLimitRef;
   private long hugePartitionMemoryLimitSize;
   protected AtomicLong preAllocatedSize = new AtomicLong(0L);
   protected AtomicLong inFlushSize = new AtomicLong(0L);
@@ -131,6 +133,8 @@ public class ShuffleBufferManager {
         conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
     this.hugePartitionSizeThresholdRef =
         
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+    this.hugePartitionSizeHardLimitRef =
+        
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_HARD_LIMIT);
     this.hugePartitionMemoryLimitSize =
         Math.round(
             capacity * 
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
@@ -271,7 +275,8 @@ public class ShuffleBufferManager {
       int partitionId,
       int startPartition,
       int endPartition) {
-    boolean isHugePartition = isHugePartition(appId, shuffleId, partitionId);
+    boolean isHugePartition =
+        HugePartitionUtils.isHugePartition(shuffleTaskManager, appId, 
shuffleId, partitionId);
     // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
     if ((isHugePartition || this.bufferFlushEnabled)
@@ -316,7 +321,8 @@ public class ShuffleBufferManager {
           shuffleId,
           range.lowerEndpoint(),
           range.upperEndpoint(),
-          isHugePartition(appId, shuffleId, range.lowerEndpoint()));
+          HugePartitionUtils.isHugePartition(
+              shuffleTaskManager, appId, shuffleId, range.lowerEndpoint()));
     }
   }
 
@@ -531,7 +537,8 @@ public class ShuffleBufferManager {
                   shuffleId,
                   range.lowerEndpoint(),
                   range.upperEndpoint(),
-                  isHugePartition(appId, shuffleId, range.lowerEndpoint()));
+                  HugePartitionUtils.isHugePartition(
+                      shuffleTaskManager, appId, shuffleId, 
range.lowerEndpoint()));
               if (pickedFlushSize > expectedFlushSize) {
                 LOG.info("Already picked enough buffers to flush {} bytes", 
pickedFlushSize);
                 return;
@@ -726,32 +733,16 @@ public class ShuffleBufferManager {
     }
   }
 
-  boolean isHugePartition(String appId, int shuffleId, int partitionId) {
-    return shuffleTaskManager != null
-        && shuffleTaskManager.getShuffleTaskInfo(appId) != null
-        && 
shuffleTaskManager.getShuffleTaskInfo(appId).isHugePartition(shuffleId, 
partitionId);
+  public long getHugePartitionSizeHardLimit() {
+    return hugePartitionSizeHardLimitRef.getSizeAsBytes();
   }
 
-  public boolean isHugePartition(long usedPartitionDataSize) {
-    return usedPartitionDataSize > 
hugePartitionSizeThresholdRef.getSizeAsBytes();
+  public long getHugePartitionSizeThreshold() {
+    return hugePartitionSizeThresholdRef.getSizeAsBytes();
   }
 
-  public boolean limitHugePartition(
-      String appId, int shuffleId, int partitionId, long 
usedPartitionDataSize) {
-    if (usedPartitionDataSize > 
hugePartitionSizeThresholdRef.getSizeAsBytes()) {
-      long memoryUsed = getShuffleBufferEntry(appId, shuffleId, 
partitionId).getValue().getSize();
-      if (memoryUsed > hugePartitionMemoryLimitSize) {
-        LOG.warn(
-            "AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
-                + "huge partition triggered memory limitation.",
-            appId,
-            shuffleId,
-            partitionId,
-            memoryUsed);
-        return true;
-      }
-    }
-    return false;
+  public long getHugePartitionMemoryLimitSize() {
+    return hugePartitionMemoryLimitSize;
   }
 
   public void setUsedMemory(long usedMemory) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 35eeb429d..583fe62f8 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelFutureListener;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.audit.AuditContext;
 import org.apache.uniffle.common.config.RssBaseConf;
+import 
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
 import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
@@ -280,6 +282,17 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               shuffleTaskManager.updateCachedBlockIds(
                   appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
             }
+          } catch (ExceedHugePartitionHardLimitException e) {
+            String errorMsg =
+                "ExceedHugePartitionHardLimitException Error happened when 
shuffleEngine.write for "
+                    + shuffleDataInfo
+                    + ": "
+                    + e.getMessage();
+            
ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
+            ret = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
+            responseMessage = errorMsg;
+            LOG.error(errorMsg);
+            hasFailureOccurred = true;
           } catch (Exception e) {
             String errorMsg =
                 "Error happened when shuffleEngine.write for "
@@ -709,11 +722,12 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     return ret;
   }
 
-  private ShufflePartitionedBlock[] toPartitionedBlock(List<ShuffleBlockInfo> 
blocks) {
+  private Pair<Long, ShufflePartitionedBlock[]> 
toPartitionedBlock(List<ShuffleBlockInfo> blocks) {
     if (blocks == null || blocks.size() == 0) {
-      return new ShufflePartitionedBlock[] {};
+      return Pair.of(0L, new ShufflePartitionedBlock[] {});
     }
     ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
+    long size = 0L;
     int i = 0;
     for (ShuffleBlockInfo block : blocks) {
       ret[i] =
@@ -724,9 +738,10 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               block.getBlockId(),
               block.getTaskAttemptId(),
               block.getData());
+      size += ret[i].getSize();
       i++;
     }
-    return ret;
+    return Pair.of(size, ret);
   }
 
   private StatusCode verifyRequest(String appId) {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index c9cbdf49a..5188d4706 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -193,7 +193,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
 
     // case1, expect NoRegisterException
     try {
-      shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
+      shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
       fail("Should thow NoRegisterException");
     } catch (Exception e) {
       assertTrue(e instanceof NoRegisterException);
@@ -208,7 +208,8 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
 
     // case2
     try {
-      long requiredId = shuffleTaskManager.requireBuffer(appId, 1, 
Arrays.asList(1), 500);
+      long requiredId =
+          shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
       assertNotEquals(-1, requiredId);
     } catch (Exception e) {
       fail("Should not throw Exception");
@@ -219,7 +220,8 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
     try {
-      long requiredId = shuffleTaskManager.requireBuffer(appId, 1, 
Arrays.asList(1), 500);
+      long requiredId =
+          shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
       assertNotEquals(-1, requiredId);
     } catch (Exception e) {
       fail("Should not throw Exception");
@@ -230,7 +232,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
     try {
-      shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
+      shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
       fail("Should throw NoBufferForHugePartitionException");
     } catch (Exception e) {
       assertTrue(e instanceof NoBufferForHugePartitionException);
@@ -1263,7 +1265,8 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     Thread.sleep(2000);
 
     // The NO_REGISTER status code should not appear.
-    assertTrue(shuffleTaskManager.requireBuffer(appId, 2, Arrays.asList(1), 
35) != -4);
+    assertTrue(
+        shuffleTaskManager.requireBuffer(appId, 2, Arrays.asList(1), 
Arrays.asList(35), 35) != -4);
     shuffleTaskManager.removeResources(appId, false);
   }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 007c36290..3a5c991d8 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.DefaultFlushEventHandler;
+import org.apache.uniffle.server.HugePartitionUtils;
 import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -508,8 +509,12 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 
0);
     assertEquals(1 + 32, usedSize);
     assertFalse(
-        shuffleBufferManager.limitHugePartition(
-            appId, shuffleId, 0, 
shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
+        HugePartitionUtils.limitHugePartition(
+            shuffleBufferManager,
+            appId,
+            shuffleId,
+            0,
+            shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
 
     // case2: its partition is huge partition, its buffer will be flushed to 
DISK directly
     partitionedData = createData(0, 36);
@@ -517,8 +522,12 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
     assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory());
     assertTrue(
-        shuffleBufferManager.limitHugePartition(
-            appId, shuffleId, 0, 
shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
+        HugePartitionUtils.limitHugePartition(
+            shuffleBufferManager,
+            appId,
+            shuffleId,
+            0,
+            shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
     partitionedData = createData(0, 1);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());

Reply via email to