This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 648931c46 [#1727] improvement(server): Introduce block num threshold 
for early buffer flush to mitigate GC issues (#1759)
648931c46 is described below

commit 648931c46eef79a8c40ee470348f1946cf184f16
Author: RickyMa <[email protected]>
AuthorDate: Fri Jun 7 11:29:10 2024 +0800

    [#1727] improvement(server): Introduce block num threshold for early buffer 
flush to mitigate GC issues (#1759)
    
    ### What changes were proposed in this pull request?
    
    Introduce block number threshold when flushing a single buffer, mitigating 
GC/OOM issues from potential excessive small blocks.
    
    ### Why are the changes needed?
    
    For: https://github.com/apache/incubator-uniffle/issues/1727.
    
    In a production environment, the Uniffle server may run jobs with various 
unreasonable configurations. These jobs might have a large number of partitions 
(tens of thousands, hundreds of thousands, or even millions), or they might 
have manually been configured with a very small spill size, or some other 
reasons. This may ultimately bring a large number of small blocks to the 
server, and the server has no choice but to maintain them in the heap memory 
for a long time, simply because **_ [...]
    
    In Netty mode, we use off-heap memory to store shuffle data. However, when 
facing jobs with extremely unreasonable configurations, the total size of the 
reference objects of the blocks maintained in the heap memory by the server may 
even exceed the size of the data stored off-heap. This can bring great 
instability to the server.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 docs/server_guide.md                               | 79 +++++++++++-----------
 .../ShuffleServerConcurrentWriteOfHadoopTest.java  |  2 +-
 .../apache/uniffle/server/ShuffleServerConf.java   | 14 +++-
 .../server/buffer/ShuffleBufferManager.java        | 18 ++++-
 .../server/buffer/ShuffleBufferManagerTest.java    |  6 +-
 5 files changed, 73 insertions(+), 46 deletions(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index ec2d24fcc..4fecff93b 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -69,45 +69,46 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
    ```
 
 ## Configuration
-| Property Name                                            | Default           
                                                     | Description              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| rss.coordinator.quorum                                   | -                 
                                                     | Coordinator quorum       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| rss.rpc.server.type                                      | GRPC              
                                                     | Shuffle server type, 
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the server side for better stability and 
performance.                                                                    
                                                                                
                              [...]
-| rss.rpc.server.port                                      | 19999             
                                                     | RPC port for Shuffle 
server, if set zero, grpc server start on random port.                          
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.jetty.http.port                                      | 19998             
                                                     | Http port for Shuffle 
server                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.netty.port                                    | -1                
                                                     | Netty port for Shuffle 
server, if set zero, Netty server start on random port.                         
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.netty.epoll.enable                            | false             
                                                     | Whether to enable epoll 
model with Netty server.                                                        
                                                                                
                                                                                
                                                                                
               [...]
-| rss.server.netty.accept.thread                           | 10                
                                                     | Accept thread count in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.netty.worker.thread                           | 0                 
                                                     | Worker thread count in 
netty. When set to 0, the default value is dynamically set to twice the number 
of processor cores, but it will not be less than 100 to ensure the minimum 
throughput of the service.                                                      
                                                                                
                      [...]
-| rss.server.netty.connect.backlog                         | 0                 
                                                     | For Netty server, 
requested maximum length of the queue of incoming connections.                  
                                                                                
                                                                                
                                                                                
                     [...]
-| rss.server.netty.connect.timeout                         | 5000              
                                                     | Timeout for connection 
in netty.                                                                       
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.netty.receive.buf                             | 0                 
                                                     | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system 
automatically estimates the receive buffer size based on default settings.      
                              [...]
-| rss.server.netty.send.buf                                | 0                 
                                                     | Send buffer size 
(SO_SNDBUF).                                                                    
                                                                                
                                                                                
                                                                                
                      [...]
-| rss.server.buffer.capacity                               | -1                
                                                     | Max memory of buffer 
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used   
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.buffer.capacity.ratio                         | 0.6               
                                                     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or 
off-heap size(when enabling Netty) * ratio                                      
                                                                                
                                                                                
                                    [...]
-| rss.server.memory.shuffle.highWaterMark.percentage       | 75.0              
                                                     | Threshold of spill data 
to storage, percentage of rss.server.buffer.capacity                            
                                                                                
                                                                                
                                                                                
               [...]
-| rss.server.memory.shuffle.lowWaterMark.percentage        | 25.0              
                                                     | Threshold of keep data 
in memory, percentage of rss.server.buffer.capacity                             
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.read.buffer.capacity                          | -1                
                                                     | Max size of buffer for 
reading data. If negative, JVM heap size * read.buffer.ratio is used            
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.read.buffer.capacity.ratio                    | 0.2               
                                                     | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size or off-heap size(when enabling Netty) * ratio                              
                                                                                
                                                                                
                                      [...]
-| rss.server.heartbeat.interval                            | 10000             
                                                     | Heartbeat interval to 
Coordinator (ms)                                                                
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000             
                                                     | How often to collect 
Netty pending tasks number metrics (in milliseconds)                            
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.flush.localfile.threadPool.size               | 10                
                                                     | Thread pool for flush 
data to local file                                                              
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.flush.hadoop.threadPool.size                  | 60                
                                                     | Thread pool for flush 
data to hadoop storage                                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.commit.timeout                                | 600000            
                                                     | Timeout when commit 
shuffle data (ms)                                                               
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.storage.type                                         | -                 
                                                     | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                              [...]
-| rss.server.flush.cold.storage.threshold.size             | 64M               
                                                     | The threshold of data 
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used                 
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.tags                                          | -                 
                                                     | The comma-separated list 
of tags to indicate the shuffle server's attributes. It will be used as the 
assignment basis for the coordinator                                            
                                                                                
                                                                                
                  [...]
-| rss.server.single.buffer.flush.enabled                   | true              
                                                     | Whether single buffer 
flush when size exceeded rss.server.single.buffer.flush.threshold               
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.single.buffer.flush.threshold                 | 128M              
                                                     | The threshold of single 
shuffle buffer flush                                                            
                                                                                
                                                                                
                                                                                
               [...]
-| rss.server.disk.capacity                                 | -1                
                                                     | Disk capacity that 
shuffle server can use. If negative, it will use disk whole space * ratio       
                                                                                
                                                                                
                                                                                
                    [...]
-| rss.server.disk.capacity.ratio                           | 0.9               
                                                     | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                  [...]
-| rss.server.hybrid.storage.fallback.strategy.class        | -                 
                                                     | The fallback strategy 
for `MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`. 
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackS 
[...]
-| rss.server.leak.shuffledata.check.interval               | 3600000           
                                                     | The interval of leak 
shuffle data check (ms)                                                         
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.max.concurrency.of.per-partition.write        | 30                
                                                     | The max concurrency of 
single partition writer, the data partition file number is equal to this value. 
Default value is 1. This config could improve the writing speed, especially for 
huge partition.                                                                 
                                                                                
                [...]
-| rss.server.max.concurrency.limit.of.per-partition.write  | -                 
                                                     | The limit for max 
concurrency per-partition write specified by client, this won't be enabled by 
default.                                                                        
                                                                                
                                                                                
                       [...]
-| rss.metrics.reporter.class                               | -                 
                                                     | The class of metrics 
reporter.                                                                       
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.hybrid.storage.manager.selector.class         | 
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                              
[...]
-| rss.server.disk-capacity.watermark.check.enabled         | false             
                                                     | If it is co-located with 
other services, the high-low watermark check based on the uniffle used is not 
correct. Due to this, the whole disk capacity watermark check is necessary, 
which will reuse the current watermark value. It will be disabled by default.   
                                                                                
                    [...]
+| Property Name                                            | Default           
                                                     | Description              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| rss.coordinator.quorum                                   | -                 
                                                     | Coordinator quorum       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| rss.rpc.server.type                                      | GRPC              
                                                     | Shuffle server type, 
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the server side for better stability and 
performance.                                                                    
                                                                                
                              [...]
+| rss.rpc.server.port                                      | 19999             
                                                     | RPC port for Shuffle 
server, if set zero, grpc server start on random port.                          
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.jetty.http.port                                      | 19998             
                                                     | Http port for Shuffle 
server                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.netty.port                                    | -1                
                                                     | Netty port for Shuffle 
server, if set zero, Netty server start on random port.                         
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.netty.epoll.enable                            | false             
                                                     | Whether to enable epoll 
model with Netty server.                                                        
                                                                                
                                                                                
                                                                                
               [...]
+| rss.server.netty.accept.thread                           | 10                
                                                     | Accept thread count in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.netty.worker.thread                           | 0                 
                                                     | Worker thread count in 
netty. When set to 0, the default value is dynamically set to twice the number 
of processor cores, but it will not be less than 100 to ensure the minimum 
throughput of the service.                                                      
                                                                                
                      [...]
+| rss.server.netty.connect.backlog                         | 0                 
                                                     | For Netty server, 
requested maximum length of the queue of incoming connections.                  
                                                                                
                                                                                
                                                                                
                     [...]
+| rss.server.netty.connect.timeout                         | 5000              
                                                     | Timeout for connection 
in netty.                                                                       
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.netty.receive.buf                             | 0                 
                                                     | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system 
automatically estimates the receive buffer size based on default settings.      
                              [...]
+| rss.server.netty.send.buf                                | 0                 
                                                     | Send buffer size 
(SO_SNDBUF).                                                                    
                                                                                
                                                                                
                                                                                
                      [...]
+| rss.server.buffer.capacity                               | -1                
                                                     | Max memory of buffer 
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used   
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.buffer.capacity.ratio                         | 0.6               
                                                     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or 
off-heap size(when enabling Netty) * ratio                                      
                                                                                
                                                                                
                                    [...]
+| rss.server.memory.shuffle.highWaterMark.percentage       | 75.0              
                                                     | Threshold of spill data 
to storage, percentage of rss.server.buffer.capacity                            
                                                                                
                                                                                
                                                                                
               [...]
+| rss.server.memory.shuffle.lowWaterMark.percentage        | 25.0              
                                                     | Threshold of keep data 
in memory, percentage of rss.server.buffer.capacity                             
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.read.buffer.capacity                          | -1                
                                                     | Max size of buffer for 
reading data. If negative, JVM heap size * read.buffer.ratio is used            
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.read.buffer.capacity.ratio                    | 0.2               
                                                     | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size or off-heap size(when enabling Netty) * ratio                              
                                                                                
                                                                                
                                      [...]
+| rss.server.heartbeat.interval                            | 10000             
                                                     | Heartbeat interval to 
Coordinator (ms)                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000             
                                                     | How often to collect 
Netty pending tasks number metrics (in milliseconds)                            
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.flush.localfile.threadPool.size               | 10                
                                                     | Thread pool for flush 
data to local file                                                              
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.flush.hadoop.threadPool.size                  | 60                
                                                     | Thread pool for flush 
data to hadoop storage                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.commit.timeout                                | 600000            
                                                     | Timeout when commit 
shuffle data (ms)                                                               
                                                                                
                                                                                
                                                                                
                   [...]
+| rss.storage.type                                         | -                 
                                                     | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                              [...]
+| rss.server.flush.cold.storage.threshold.size             | 64M               
                                                     | The threshold of data 
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used                 
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.tags                                          | -                 
                                                     | The comma-separated list 
of tags to indicate the shuffle server's attributes. It will be used as the 
assignment basis for the coordinator                                            
                                                                                
                                                                                
                  [...]
+| rss.server.single.buffer.flush.enabled                   | true              
                                                     | Whether single buffer 
flush when size exceeded rss.server.single.buffer.flush.threshold               
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.single.buffer.flush.threshold                 | 128M              
                                                     | The threshold of single 
shuffle buffer flush                                                            
                                                                                
                                                                                
                                                                                
               [...]
+| rss.server.single.buffer.flush.blocksNumberThreshold     | -                 
                                                     | The blocks number 
threshold for triggering a flush for a single shuffle buffer. This threshold is 
mainly used to control jobs with an excessive number of small blocks, allowing 
these small blocks to be flushed as much as possible, rather than being 
maintained in the heap and unable to be garbage collected. This can cause 
severe garbage collection issues on [...]
+| rss.server.disk.capacity                                 | -1                
                                                     | Disk capacity that 
shuffle server can use. If negative, it will use disk whole space * ratio       
                                                                                
                                                                                
                                                                                
                    [...]
+| rss.server.disk.capacity.ratio                           | 0.9               
                                                     | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                  [...]
+| rss.server.hybrid.storage.fallback.strategy.class        | -                 
                                                     | The fallback strategy 
for `MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`. 
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackS 
[...]
+| rss.server.leak.shuffledata.check.interval               | 3600000           
                                                     | The interval of leak 
shuffle data check (ms)                                                         
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.max.concurrency.of.per-partition.write        | 30                
                                                     | The max concurrency of 
single partition writer, the data partition file number is equal to this value. 
Default value is 1. This config could improve the writing speed, especially for 
huge partition.                                                                 
                                                                                
                [...]
+| rss.server.max.concurrency.limit.of.per-partition.write  | -                 
                                                     | The limit for max 
concurrency per-partition write specified by client, this won't be enabled by 
default.                                                                        
                                                                                
                                                                                
                       [...]
+| rss.metrics.reporter.class                               | -                 
                                                     | The class of metrics 
reporter.                                                                       
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.hybrid.storage.manager.selector.class         | 
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                              
[...]
+| rss.server.disk-capacity.watermark.check.enabled         | false             
                                                     | If it is co-located with 
other services, the high-low watermark check based on the uniffle used is not 
correct. Due to this, the whole disk capacity watermark check is necessary, 
which will reuse the current watermark value. It will be disabled by default.   
                                                                                
                    [...]
 
 ### Advanced Configurations
 | Property Name                                    | Default | Description     
                                                                                
                                                                                
            |
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index 7d5d11481..d4c41abff 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -88,7 +88,7 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends 
ShuffleServerWithH
     shuffleServerConf.setInteger(
         ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, 
MAX_CONCURRENCY);
     
shuffleServerConf.setBoolean(shuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, 
true);
-    shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
1024 * 1024L);
+    
shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD, 
1024 * 1024L);
     return shuffleServerConf;
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index fe3979012..064a6b28b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -387,12 +387,24 @@ public class ShuffleServerConf extends RssBaseConf {
           .withDescription(
               "Whether single buffer flush when size exceeded 
rss.server.single.buffer.flush.threshold");
 
-  public static final ConfigOption<Long> SINGLE_BUFFER_FLUSH_THRESHOLD =
+  public static final ConfigOption<Long> SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD =
       ConfigOptions.key("rss.server.single.buffer.flush.threshold")
           .longType()
           .defaultValue(128 * 1024 * 1024L)
           .withDescription("The threshold of single shuffle buffer flush");
 
+  public static final ConfigOption<Integer> 
SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD =
+      ConfigOptions.key("rss.server.single.buffer.flush.blocksNumberThreshold")
+          .intType()
+          .defaultValue(Integer.MAX_VALUE)
+          .withDescription(
+              "The blocks number threshold for triggering a flush for a single 
shuffle buffer. "
+                  + "This threshold is mainly used to control jobs with an 
excessive number of small blocks, "
+                  + "allowing these small blocks to be flushed as much as 
possible, "
+                  + "rather than being maintained in the heap and unable to be 
garbage collected. "
+                  + "This can cause severe garbage collection issues on the 
server side, and may even lead to out-of-heap-memory errors. "
+                  + "If the threshold is set too high, it becomes meaningless. 
It won't be enabled by default.");
+
   public static final ConfigOption<Long> 
SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL =
       ConfigOptions.key("rss.server.leak.shuffledata.check.interval")
           .longType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 6ec8c13e6..e85d2eae4 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -65,6 +65,7 @@ public class ShuffleBufferManager {
   private long lowWaterMark;
   private boolean bufferFlushEnabled;
   private long bufferFlushThreshold;
+  private long bufferFlushBlocksNumThreshold;
   // when shuffle buffer manager flushes data, shuffles with data size < 
shuffleFlushThreshold is
   // kept in memory to
   // reduce small I/Os to persistent storage, especially for local HDDs.
@@ -124,7 +125,9 @@ public class ShuffleBufferManager {
                 * 
conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
     this.bufferFlushEnabled = 
conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
     this.bufferFlushThreshold =
-        conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
+        
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD);
+    this.bufferFlushBlocksNumThreshold =
+        
conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD);
     this.shuffleFlushThreshold =
         conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
     this.hugePartitionSizeThresholdRef =
@@ -266,7 +269,18 @@ public class ShuffleBufferManager {
     // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
     if ((isHugePartition || this.bufferFlushEnabled)
-        && buffer.getSize() > this.bufferFlushThreshold) {
+        && (buffer.getSize() > this.bufferFlushThreshold
+            || buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Start to flush single buffer. Details - shuffleId:{}, 
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, 
blocksNum:{}",
+            shuffleId,
+            startPartition,
+            endPartition,
+            isHugePartition,
+            buffer.getSize(),
+            buffer.getBlocks().size());
+      }
       flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, 
isHugePartition);
       return;
     }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 8e958b22b..007c36290 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -477,7 +477,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 
0.1);
     shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD, 100L);
     shuffleConf.set(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, false);
-    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
64L);
+    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
 64L);
 
     ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
     StorageManager storageManager =
@@ -537,7 +537,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 80.0);
     shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
1024L);
     shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, 
true);
-    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
128L);
+    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
 128L);
 
     ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
     StorageManager storageManager =
@@ -705,7 +705,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 80.0);
     shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
1024L);
     shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, 
true);
-    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
16L);
+    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
 16L);
     
shuffleConf.setSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
16L);
     shuffleConf.setString(
         ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE_HDFS.name());

Reply via email to