This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0040c4a7 [#757] feat(server): separate flush thread pools for 
different storage types. (#775)
0040c4a7 is described below

commit 0040c4a71a69a502e17af993192a7f4128c6435a
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Jul 11 17:51:10 2023 +0800

    [#757] feat(server): separate flush thread pools for different storage 
types. (#775)
    
    ### What changes were proposed in this pull request?
    Separate flush thread pools for different storage type.
    
    ### Why are the changes needed?
    Writing local files requires less concurrency, while writing hdfs requires 
more concurrency,
    it is best to separate thread pools.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    existing UTs.
    
    Co-authored-by: leixianming <[email protected]>
---
 README.md                                          |   3 +-
 conf/server.conf                                   |   3 +-
 .../operator/examples/configuration.yaml           |   3 +-
 docs/server_guide.md                               |  63 ++--
 .../uniffle/server/DefaultFlushEventHandler.java   | 142 +++++++++
 .../apache/uniffle/server/FlushEventHandler.java}  |  28 +-
 .../uniffle/server/ShuffleDataFlushEvent.java      |   6 +
 .../apache/uniffle/server/ShuffleFlushManager.java | 316 ++++++---------------
 .../apache/uniffle/server/ShuffleServerConf.java   |  10 +-
 .../uniffle/server/ShuffleServerMetrics.java       |   8 +
 .../uniffle/server/ShuffleFlushManagerTest.java    |  75 +++--
 .../server/buffer/ShuffleBufferManagerTest.java    |  15 +-
 12 files changed, 355 insertions(+), 317 deletions(-)

diff --git a/README.md b/README.md
index 59666b73..58cb1020 100644
--- a/README.md
+++ b/README.md
@@ -193,7 +193,8 @@ rss-xxx.tgz will be generated for deployment
      rss.storage.basePath /data1/rssdata,/data2/rssdata....
      # it's better to config thread num according to local disk num
      rss.server.flush.thread.alive 5
-     rss.server.flush.threadPool.size 10
+     rss.server.flush.localfile.threadPool.size 10
+     rss.server.flush.hadoop.threadPool.size 10
      rss.server.buffer.capacity 40g
      rss.server.read.buffer.capacity 20g
      rss.server.heartbeat.interval 10000
diff --git a/conf/server.conf b/conf/server.conf
index 7bd9dc44..44f2ddef 100644
--- a/conf/server.conf
+++ b/conf/server.conf
@@ -23,5 +23,6 @@ rss.coordinator.quorum xxx:19999,xxx:19999
 rss.server.buffer.capacity 40gb
 rss.server.read.buffer.capacity 20gb
 rss.server.flush.thread.alive 5
-rss.server.flush.threadPool.size 10
+rss.server.flush.localfile.threadPool.size 10
+rss.server.flush.hadoop.threadPool.size 10
 rss.server.disk.capacity 1t
diff --git a/deploy/kubernetes/operator/examples/configuration.yaml 
b/deploy/kubernetes/operator/examples/configuration.yaml
index 5fcb70b8..972e5019 100644
--- a/deploy/kubernetes/operator/examples/configuration.yaml
+++ b/deploy/kubernetes/operator/examples/configuration.yaml
@@ -58,7 +58,8 @@ data:
     rss.server.event.size.threshold.l3 256m
     rss.server.flush.cold.storage.threshold.size 128m
     rss.server.flush.thread.alive 6
-    rss.server.flush.threadPool.size 12
+    rss.server.flush.localfile.threadPool.size 12
+    rss.server.flush.hadoop.threadPool.size 12
     rss.server.hadoop.dfs.client.socket-timeout 15000
     rss.server.hadoop.dfs.replication 2
     rss.server.hdfs.base.path hdfs://${your-hdfs-path}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index ce83b93f..93f04854 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -44,7 +44,8 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
      rss.storage.basePath /data1/rssdata,/data2/rssdata....
      # it's better to config thread num according to local disk num
      rss.server.flush.thread.alive 5
-     rss.server.flush.threadPool.size 10
+     rss.server.flush.localfile.threadPool.size 10
+     rss.server.flush.hadoop.threadPool.size 10
      rss.server.buffer.capacity 40g
      rss.server.read.buffer.capacity 20g
      rss.server.heartbeat.interval 10000
@@ -63,34 +64,35 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
    ```
 
 ## Configuration
-| Property Name                                         | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                             |
-|-------------------------------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| rss.coordinator.quorum                                | -       | 
Coordinator quorum                                                              
                                                                                
                                                                                
                                                                                
                                                             |
-| rss.rpc.server.port                                   | -       | RPC port 
for Shuffle server, if set zero, grpc server start on random port.              
                                                                                
                                                                                
                                                                                
                                                    |
-| rss.jetty.http.port                                   | -       | Http port 
for Shuffle server                                                              
                                                                                
                                                                                
                                                                                
                                                   |
-| rss.server.netty.port                                 | -1      | Netty port 
for Shuffle server, if set zero, netty server start on random port.             
                                                                                
                                                                                
                                                                                
                                                  |
-| rss.server.buffer.capacity                            | -1      | Max memory 
of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio 
is used                                                                         
                                                                                
                                                                                
                                                  |
-| rss.server.buffer.capacity.ratio                      | 0.8     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * 
ratio                                                                           
                                                                                
                                                                                
                                                           |
-| rss.server.memory.shuffle.highWaterMark.percentage    | 75.0    | Threshold 
of spill data to storage, percentage of rss.server.buffer.capacity              
                                                                                
                                                                                
                                                                                
                                                   |
-| rss.server.memory.shuffle.lowWaterMark.percentage     | 25.0    | Threshold 
of keep data in memory, percentage of rss.server.buffer.capacity                
                                                                                
                                                                                
                                                                                
                                                   |
-| rss.server.read.buffer.capacity                       | -1      | Max size 
of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is 
used                                                                            
                                                                                
                                                                                
                                                      |
-| rss.server.read.buffer.capacity.ratio                 | 0.4     | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size * ratio                                                                    
                                                                                
                                                                                
                                                            |
-| rss.server.heartbeat.interval                         | 10000   | Heartbeat 
interval to Coordinator (ms)                                                    
                                                                                
                                                                                
                                                                                
                                                   |
-| rss.server.flush.threadPool.size                      | 10      | Thread 
pool for flush data to file                                                     
                                                                                
                                                                                
                                                                                
                                                      |
-| rss.server.commit.timeout                             | 600000  | Timeout 
when commit shuffle data (ms)                                                   
                                                                                
                                                                                
                                                                                
                                                     |
-| rss.storage.type                                      | -       | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                                                    |
-| rss.server.flush.cold.storage.threshold.size          | 64M     | The 
threshold of data size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is 
used                                                                            
                                                                                
                                                                                
                                                            |
-| rss.server.tags                                       | -       | The 
comma-separated list of tags to indicate the shuffle server's attributes. It 
will be used as the assignment basis for the coordinator                        
                                                                                
                                                                                
                                                            |
-| rss.server.single.buffer.flush.enabled                | false   | Whether 
single buffer flush when size exceeded rss.server.single.buffer.flush.threshold 
                                                                                
                                                                                
                                                                                
                                                     |
-| rss.server.single.buffer.flush.threshold              | 64M     | The 
threshold of single shuffle buffer flush                                        
                                                                                
                                                                                
                                                                                
                                                         |
-| rss.server.disk.capacity                              | -1      | Disk 
capacity that shuffle server can use. If negative, it will use disk whole space 
* ratio                                                                         
                                                                                
                                                                                
                                                        |
-| rss.server.disk.capacity.ratio                        | 0.9     | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                                        |
-| rss.server.multistorage.fallback.strategy.class       | -       | The 
fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`. 
If not set, 
`org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy` will 
be used. |
-| rss.server.leak.shuffledata.check.interval            | 3600000 | The 
interval of leak shuffle data check (ms)                                        
                                                                                
                                                                                
                                                                                
                                                         |
-| rss.server.max.concurrency.of.per-partition.write | 1       | The max 
concurrency of single partition writer, the data partition file number is equal 
to this value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.                                                  
                                                                                
                                                     |
+| Property Name                                           | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                             |
+|---------------------------------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| rss.coordinator.quorum                                  | -       | 
Coordinator quorum                                                              
                                                                                
                                                                                
                                                                                
                                                             |
+| rss.rpc.server.port                                     | -       | RPC port 
for Shuffle server, if set zero, grpc server start on random port.              
                                                                                
                                                                                
                                                                                
                                                    |
+| rss.jetty.http.port                                     | -       | Http 
port for Shuffle server                                                         
                                                                                
                                                                                
                                                                                
                                                        |
+| rss.server.netty.port                                   | -1      | Netty 
port for Shuffle server, if set zero, netty server start on random port.        
                                                                                
                                                                                
                                                                                
                                                       |
+| rss.server.buffer.capacity                              | -1      | Max 
memory of buffer manager for shuffle server. If negative, JVM heap size * 
buffer.ratio is used                                                            
                                                                                
                                                                                
                                                               |
+| rss.server.buffer.capacity.ratio                        | 0.8     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * 
ratio                                                                           
                                                                                
                                                                                
                                                           |
+| rss.server.memory.shuffle.highWaterMark.percentage      | 75.0    | 
Threshold of spill data to storage, percentage of rss.server.buffer.capacity    
                                                                                
                                                                                
                                                                                
                                                             |
+| rss.server.memory.shuffle.lowWaterMark.percentage       | 25.0    | 
Threshold of keep data in memory, percentage of rss.server.buffer.capacity      
                                                                                
                                                                                
                                                                                
                                                             |
+| rss.server.read.buffer.capacity                         | -1      | Max size 
of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is 
used                                                                            
                                                                                
                                                                                
                                                      |
+| rss.server.read.buffer.capacity.ratio                   | 0.4     | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size * ratio                                                                    
                                                                                
                                                                                
                                                            |
+| rss.server.heartbeat.interval                           | 10000   | 
Heartbeat interval to Coordinator (ms)                                          
                                                                                
                                                                                
                                                                                
                                                             |
+| rss.server.flush.localfile.threadPool.size              | 10      | Thread 
pool for flush data to local file                                               
                                                                                
                                                                                
                                                                                
                                                      |
+| rss.server.flush.hadoop.threadPool.size                 | 10      | Thread 
pool for flush data to hadoop storage                                           
                                                                                
                                                                                
                                                                                
                                                      |
+| rss.server.commit.timeout                               | 600000  | Timeout 
when commit shuffle data (ms)                                                   
                                                                                
                                                                                
                                                                                
                                                     |
+| rss.storage.type                                        | -       | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                                                    |
+| rss.server.flush.cold.storage.threshold.size            | 64M     | The 
threshold of data size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is 
used                                                                            
                                                                                
                                                                                
                                                            |
+| rss.server.tags                                         | -       | The 
comma-separated list of tags to indicate the shuffle server's attributes. It 
will be used as the assignment basis for the coordinator                        
                                                                                
                                                                                
                                                            |
+| rss.server.single.buffer.flush.enabled                  | false   | Whether 
single buffer flush when size exceeded rss.server.single.buffer.flush.threshold 
                                                                                
                                                                                
                                                                                
                                                     |
+| rss.server.single.buffer.flush.threshold                | 64M     | The 
threshold of single shuffle buffer flush                                        
                                                                                
                                                                                
                                                                                
                                                         |
+| rss.server.disk.capacity                                | -1      | Disk 
capacity that shuffle server can use. If negative, it will use disk whole space 
* ratio                                                                         
                                                                                
                                                                                
                                                        |
+| rss.server.disk.capacity.ratio                          | 0.9     | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                                        |
+| rss.server.multistorage.fallback.strategy.class         | -       | The 
fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`. 
If not set, 
`org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy` will 
be used. |
+| rss.server.leak.shuffledata.check.interval              | 3600000 | The 
interval of leak shuffle data check (ms)                                        
                                                                                
                                                                                
                                                                                
                                                         |
+| rss.server.max.concurrency.of.per-partition.write       | 1       | The max 
concurrency of single partition writer, the data partition file number is equal 
to this value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.                                                  
                                                                                
                                                     |
 | rss.server.max.concurrency.limit.of.per-partition.write | - | The limit for 
max concurrency per-partition write specified by client, this won't be enabled 
by default.                                                                     
                                                                                
                                                                                
                                                |
-| rss.metrics.reporter.class                            | -       | The class 
of metrics reporter.                                                            
                                                                                
                                                                                
                                                                                
                                                   |
-|rss.server.multistorage.manager.selector.class         | 
org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                                
  [...]
+| rss.metrics.reporter.class                              | -       | The 
class of metrics reporter.                                                      
                                                                                
                                                                                
                                                                                
                                                         |
+| rss.server.multistorage.manager.selector.class          | 
org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                                
[...]
 
 ### Advanced Configurations
 |Property Name|Default| Description                                            
                                                                                
                                                     |
@@ -129,7 +131,7 @@ If you don't use HADOOP FS, the huge partition may be 
flushed to local disk, whi
 
 For HADOOP FS, the conf value of `rss.server.single.buffer.flush.threshold` 
should be greater than the value of 
`rss.server.flush.cold.storage.threshold.size`, which will flush data directly 
to Hadoop FS. 
 
-Finally, to improve the speed of writing to HADOOP FS for a single partition, 
the value of `rss.server.max.concurrency.of.per-partition.write` and 
`rss.server.flush.threadPool.size` could be increased to 10 or 20.
+Finally, to improve the speed of writing to HDFS for a single partition, the 
value of `rss.server.max.concurrency.of.per-partition.write` and 
`rss.server.flush.hdfs.threadPool.size` could be increased to 10 or 20.
 
 #### Example of server conf
 ```
@@ -149,7 +151,8 @@ rss.server.commit.timeout 600000
 rss.server.app.expired.withoutHeartbeat 120000
 
 # For huge partitions
-rss.server.flush.threadPool.size 20
+rss.server.flush.localfile.threadPool.size 20
+rss.server.flush.hadoop.threadPool.size 20
 rss.server.flush.cold.storage.threshold.size 128m
 rss.server.single.buffer.flush.threshold 129m
 rss.server.max.concurrency.of.per-partition.write 20
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
new file mode 100644
index 00000000..ccfb987d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HadoopStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class DefaultFlushEventHandler implements FlushEventHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultFlushEventHandler.class);
+
+  private final ShuffleServerConf shuffleServerConf;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hadoopThreadPoolExecutor;
+  private final StorageType storageType;
+  protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = 
Queues.newLinkedBlockingQueue();
+  private Consumer<ShuffleDataFlushEvent> eventConsumer;
+
+  private volatile boolean stopped = false;
+
+  public DefaultFlushEventHandler(ShuffleServerConf conf, StorageManager 
storageManager,
+      Consumer<ShuffleDataFlushEvent> eventConsumer) {
+    this.shuffleServerConf = conf;
+    this.storageType = 
StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.storageManager = storageManager;
+    this.eventConsumer = eventConsumer;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    if (!flushQueue.offer(event)) {
+      LOG.warn("Flush queue is full, discard event: " + event);
+    } else {
+      ShuffleServerMetrics.gaugeEventQueueSize.inc();
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, 
boolean isLocalFile) {
+    try {
+      eventConsumer.accept(event);
+    } finally {
+      if (isLocalFile) {
+        ShuffleServerMetrics.counterLocalFileEventFlush.inc();
+      } else {
+        ShuffleServerMetrics.counterHadoopEventFlush.inc();
+      }
+    }
+  }
+
+  protected void initFlushEventExecutor() {
+    if (StorageType.withLocalfile(storageType)) {
+      int poolSize = 
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE);
+      localFileThreadPoolExecutor = createFlushEventExecutor(poolSize, 
"LocalFileFlushEventThreadPool");
+    }
+    if (StorageType.withHadoop(storageType)) {
+      int poolSize = 
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE);
+      hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize, 
"HadoopFlushEventThreadPool");
+    }
+    startEventProcessor();
+  }
+
+  private void startEventProcessor() {
+    // the thread for flush data
+    Thread processEventThread = new Thread(this::eventLoop);
+    processEventThread.setName("ProcessEventThread");
+    processEventThread.setDaemon(true);
+    processEventThread.start();
+  }
+
+  protected void eventLoop() {
+    while (!stopped && !Thread.currentThread().isInterrupted()) {
+      processNextEvent();
+    }
+  }
+
+  protected void processNextEvent() {
+    try {
+      ShuffleDataFlushEvent event = flushQueue.take();
+      Storage storage = storageManager.selectStorage(event);
+      if (storage instanceof HadoopStorage) {
+        hadoopThreadPoolExecutor.execute(() -> 
handleEventAndUpdateMetrics(event, false));
+      } else if (storage instanceof LocalStorage) {
+        localFileThreadPoolExecutor.execute(() -> 
handleEventAndUpdateMetrics(event, true));
+      } else {
+        throw new RssException("Unexpected storage type!");
+      }
+    } catch (Exception e) {
+      LOG.error("Exception happened when process event.", e);
+    }
+  }
+
+  protected Executor createFlushEventExecutor(int poolSize, String 
threadFactoryName) {
+    int waitQueueSize = shuffleServerConf.getInteger(
+        ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
+    BlockingQueue<Runnable> waitQueue = 
Queues.newLinkedBlockingQueue(waitQueueSize);
+    long keepAliveTime = 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
+    return new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, 
TimeUnit.SECONDS, waitQueue,
+        ThreadUtils.getThreadFactory(threadFactoryName));
+  }
+
+  @Override
+  public int getEventNumInFlush() {
+    return flushQueue.size();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/FlushEventHandler.java
similarity index 55%
rename from 
server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
rename to server/src/main/java/org/apache/uniffle/server/FlushEventHandler.java
index 3ce5efd0..06bbc731 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/FlushEventHandler.java
@@ -17,30 +17,10 @@
 
 package org.apache.uniffle.server;
 
-import java.util.concurrent.Executor;
+public interface FlushEventHandler {
+  void handle(ShuffleDataFlushEvent event);
 
-import org.apache.uniffle.server.storage.StorageManager;
-
-public class TestShuffleFlushManager extends ShuffleFlushManager {
-  public TestShuffleFlushManager(ShuffleServerConf shuffleServerConf, String 
shuffleServerId,
-                                 ShuffleServer shuffleServer, StorageManager 
storageManager) {
-    super(shuffleServerConf, shuffleServer, storageManager);
-  }
-
-  @Override
-  protected void eventLoop() {
-    // do nothing
-  }
-
-  @Override
-  protected Executor createFlushEventExecutor() {
-    return Runnable::run;
-  }
-
-  public void flush() {
-    while (!flushQueue.isEmpty()) {
-      processNextEvent();
-    }
-  }
+  int getEventNumInFlush();
 
+  void stop();
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 56abf7a2..13305ec4 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -48,6 +48,7 @@ public class ShuffleDataFlushEvent {
   private final List<Runnable> cleanupCallbackChains;
 
   private boolean ownedByHugePartition = false;
+  private long startPendingTime;
 
   public ShuffleDataFlushEvent(
       long eventId,
@@ -124,6 +125,7 @@ public class ShuffleDataFlushEvent {
 
   public void markPended() {
     isPended = true;
+    startPendingTime = System.currentTimeMillis();
   }
 
   public Storage getUnderStorage() {
@@ -173,4 +175,8 @@ public class ShuffleDataFlushEvent {
   public void markOwnedByHugePartition() {
     this.ownedByHugePartition = true;
   }
+
+  public long getStartPendingTime() {
+    return startPendingTime;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 2d9dfa5d..d7663770 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -21,15 +21,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -41,8 +34,6 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.common.util.ThreadUtils;
-import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -55,8 +46,6 @@ public class ShuffleFlushManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleFlushManager.class);
   public static final AtomicLong ATOMIC_EVENT_ID = new AtomicLong(0);
   private final ShuffleServer shuffleServer;
-  protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = 
Queues.newLinkedBlockingQueue();
-  private final Executor threadPoolExecutor;
   private final List<String> storageBasePaths;
   private final String storageType;
   private final int storageDataReplica;
@@ -67,9 +56,8 @@ public class ShuffleFlushManager {
   private final int retryMax;
 
   private final StorageManager storageManager;
-  private final BlockingQueue<PendingShuffleFlushEvent> pendingEvents = 
Queues.newLinkedBlockingQueue();
   private final long pendingEventTimeoutSec;
-  private int processPendingEventIndex = 0;
+  private FlushEventHandler eventHandler;
 
   public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, 
ShuffleServer shuffleServer,
                              StorageManager storageManager) {
@@ -83,75 +71,36 @@ public class ShuffleFlushManager {
 
     storageBasePaths = RssUtils.getConfiguredLocalDirs(shuffleServerConf);
     pendingEventTimeoutSec = 
shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
-    threadPoolExecutor = createFlushEventExecutor();
-    startEventProcessor();
-    // todo: extract a class named Service, and support stop method
-    Thread thread = new Thread("PendingEventProcessThread") {
-      @Override
-      public void run() {
-        for (; ; ) {
-          try {
-            processPendingEvents();
-            processPendingEventIndex = (processPendingEventIndex + 1) % 1000;
-            if (processPendingEventIndex == 0) {
-              // todo: get sleep interval from configuration
-              Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-            }
-          } catch (Exception e) {
-            LOG.error(getName() + " happened exception: ", e);
-          }
-        }
-      }
-    };
-    thread.setDaemon(true);
-    thread.start();
-  }
-
-  private void startEventProcessor() {
-    // the thread for flush data
-    Thread processEventThread = new Thread(this::eventLoop);
-    processEventThread.setName("ProcessEventThread");
-    processEventThread.setDaemon(true);
-    processEventThread.start();
-  }
-
-  protected Executor createFlushEventExecutor() {
-    int waitQueueSize = shuffleServerConf.getInteger(
-        ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
-    BlockingQueue<Runnable> waitQueue = 
Queues.newLinkedBlockingQueue(waitQueueSize);
-    int poolSize = 
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
-    long keepAliveTime = 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
-    return new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, 
TimeUnit.SECONDS, waitQueue,
-        ThreadUtils.getThreadFactory("FlushEventThreadPool"));
+    eventHandler = new DefaultFlushEventHandler(shuffleServerConf, 
storageManager, this::processEvent);
   }
 
   public void addToFlushQueue(ShuffleDataFlushEvent event) {
-    if (!flushQueue.offer(event)) {
-      LOG.warn("Flush queue is full, discard event: " + event);
-    } else {
-      ShuffleServerMetrics.gaugeEventQueueSize.inc();
-    }
+    eventHandler.handle(event);
   }
 
-  protected void eventLoop() {
-    while (true) {
-      processNextEvent();
-    }
-  }
-
-  protected void processNextEvent() {
-    try {
-      ShuffleDataFlushEvent event = flushQueue.take();
-      threadPoolExecutor.execute(() -> processEvent(event));
-    } catch (Exception e) {
-      LOG.error("Exception happened when process event.", e);
-    }
-  }
-
-  private void processEvent(ShuffleDataFlushEvent event) {
+  public void processEvent(ShuffleDataFlushEvent event) {
     try {
       ShuffleServerMetrics.gaugeWriteHandler.inc();
-      flushToFile(event);
+      long start = System.currentTimeMillis();
+      boolean writeSuccess = flushToFile(event);
+      if (writeSuccess || event.getRetryTimes() > retryMax) {
+        if (event.getRetryTimes() > retryMax) {
+          LOG.error("Failed to write data for {} in {} times, shuffle data 
will be lost", event, retryMax);
+          if (event.getUnderStorage() != null) {
+            
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
+          }
+        }
+        event.doCleanup();
+        if (shuffleServer != null) {
+          long duration = System.currentTimeMillis() - start;
+          if (writeSuccess) {
+            LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
+          } else {
+            ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
+            LOG.error("Flush to file for {} failed in {} ms and release {} 
bytes", event, duration, event.getSize());
+          }
+        }
+      }
     } catch (Exception e) {
       LOG.error("Exception happened when flush data for " + event, e);
     } finally {
@@ -160,102 +109,88 @@ public class ShuffleFlushManager {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. 
event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: 
{}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage 
managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server 
too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having 
entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
-          }
-        }
+      if (event.isPended()
+              && System.currentTimeMillis() - event.getStartPendingTime() > 
pendingEventTimeoutSec * 1000L) {
+        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+        LOG.error("Flush event cannot be flushed for {} sec, the event {} is 
dropped",
+            pendingEventTimeoutSec, event);
+        return true;
+      }
 
-        String user = StringUtils.defaultString(
-            
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-            StringUtils.EMPTY
-        );
-        int maxConcurrencyPerPartitionToWrite = 
getMaxConcurrencyPerPartitionWrite(event);
-        CreateShuffleWriteHandlerRequest request = new 
CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            getShuffleServerId(),
-            hadoopConf,
-            storageDataReplica,
-            user,
-            maxConcurrencyPerPartitionToWrite);
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-        writeSuccess = storageManager.write(storage, handler, event);
-        if (writeSuccess) {
-          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
-          
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-          break;
-        } else {
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage 
managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {
+            LOG.error("Drop this event directly due to already having entered 
pending queue. event: {}", event);
+            return true;
+          }
           event.increaseRetryTimes();
           
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+          event.markPended();
+          eventHandler.handle(event);
         }
-      } catch (Throwable throwable) {
-        // just log the error, don't throw the exception and stop the flush 
thread
-        LOG.error("Exception happened when process flush shuffle data for {}", 
event, throwable);
-        event.increaseRetryTimes();
+        return false;
       }
-    }
 
-    if (event.getRetryTimes() > retryMax) {
-      LOG.error("Failed to write data for {} in {} times, shuffle data will be 
lost", event, retryMax);
-      if (event.getUnderStorage() != null) {
-        
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
-      }
-    }
-
-    event.doCleanup();
-    if (shuffleServer != null) {
-      long duration = System.currentTimeMillis() - start;
+      String user = StringUtils.defaultString(
+          
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+          StringUtils.EMPTY
+      );
+      int maxConcurrencyPerPartitionToWrite = 
getMaxConcurrencyPerPartitionWrite(event);
+      CreateShuffleWriteHandlerRequest request = new 
CreateShuffleWriteHandlerRequest(
+          storageType,
+          event.getAppId(),
+          event.getShuffleId(),
+          event.getStartPartition(),
+          event.getEndPartition(),
+          storageBasePaths.toArray(new String[storageBasePaths.size()]),
+          getShuffleServerId(),
+          hadoopConf,
+          storageDataReplica,
+          user,
+          maxConcurrencyPerPartitionToWrite);
+      ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+      writeSuccess = storageManager.write(storage, handler, event);
       if (writeSuccess) {
-        LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
-      } else {
-        ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
-        LOG.error("Flush to file for {} failed in {} ms and release {} bytes", 
event, duration, event.getSize());
+        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
+        
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+      } else if (event.getRetryTimes() <= retryMax) {
+        if (event.isPended()) {
+          LOG.error("Drop this event directly due to already having entered 
pending queue. event: {}", event);
+        }
+        event.increaseRetryTimes();
+        ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+        event.markPended();
+        eventHandler.handle(event);
       }
+    } catch (Throwable throwable) {
+      // just log the error, don't throw the exception and stop the flush 
thread
+      LOG.error("Exception happened when process flush shuffle data for {}", 
event, throwable);
+      event.increaseRetryTimes();
     }
+    return writeSuccess;
   }
 
   private int getMaxConcurrencyPerPartitionWrite(ShuffleDataFlushEvent event) {
@@ -318,83 +253,18 @@ public class ShuffleFlushManager {
   }
 
   public int getEventNumInFlush() {
-    return flushQueue.size();
+    return eventHandler.getEventNumInFlush();
   }
 
   public Configuration getHadoopConf() {
     return hadoopConf;
   }
 
-  @VisibleForTesting
-  void processPendingEvents() throws Exception {
-    PendingShuffleFlushEvent event = pendingEvents.take();
-    Storage storage = storageManager.selectStorage(event.getEvent());
-    if (storage == null) {
-      dropPendingEvent(event);
-      LOG.error("Flush event cannot be flushed because of application related 
was cleared, {}", event.getEvent());
-      return;
-    }
-    if (System.currentTimeMillis() - event.getCreateTimeStamp() > 
pendingEventTimeoutSec * 1000L) {
-      dropPendingEvent(event);
-      LOG.error("Flush event cannot be flushed for {} sec, the event {} is 
dropped",
-          pendingEventTimeoutSec, event.getEvent());
-      return;
-    }
-    // storage maybe null if the application cache was cleared already
-    // add event to flush queue, and it will be released
-    if (storage.canWrite()) {
-      addToFlushQueue(event.getEvent());
-      return;
-    }
-    addPendingEventsInternal(event);
-  }
-
-  private void dropPendingEvent(PendingShuffleFlushEvent event) {
-    ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
-    event.getEvent().doCleanup();
-  }
-
-  @VisibleForTesting
-  void addPendingEvents(ShuffleDataFlushEvent event) {
-    addPendingEventsInternal(new PendingShuffleFlushEvent(event));
-  }
-
-  @VisibleForTesting
-  int getPendingEventsSize() {
-    return pendingEvents.size();
-  }
-
-  private void addPendingEventsInternal(PendingShuffleFlushEvent event) {
-    boolean pendingEventsResult = pendingEvents.offer(event);
-    ShuffleDataFlushEvent flushEvent = event.getEvent();
-    if (!pendingEventsResult) {
-      LOG.error("Post pendingEvent queue fail!! App: " + flushEvent.getAppId() 
+ " Shuffle "
-          + flushEvent.getShuffleId() + " Partition " + 
flushEvent.getStartPartition());
-    }
-  }
-
   public void removeResourcesOfShuffleId(String appId, Collection<Integer> 
shuffleIds) {
     Optional.ofNullable(committedBlockIds.get(appId))
         .ifPresent(shuffleIdToBlockIds -> 
shuffleIds.forEach(shuffleIdToBlockIds::remove));
   }
 
-  private static class PendingShuffleFlushEvent {
-    private final ShuffleDataFlushEvent event;
-    private final long createTimeStamp = System.currentTimeMillis();
-
-    PendingShuffleFlushEvent(ShuffleDataFlushEvent event) {
-      this.event = event;
-    }
-
-    public ShuffleDataFlushEvent getEvent() {
-      return event;
-    }
-
-    public long getCreateTimeStamp() {
-      return createTimeStamp;
-    }
-  }
-
   public ShuffleDataDistributionType getDataDistributionType(String appId) {
     return 
shuffleServer.getShuffleTaskManager().getDataDistributionType(appId);
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index ecd11609..89e75e2f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -75,12 +75,18 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(10 * 1000L)
       .withDescription("Heartbeat interval to Coordinator (ms)");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = 
ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> 
SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.localfile.threadPool.size")
       .intType()
       .defaultValue(10)
       .withDescription("thread pool for flush data to file");
 
+  public static final ConfigOption<Integer> 
SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.hadoop.threadPool.size")
+      .intType()
+      .defaultValue(10)
+      .withDescription("thread pool for flush data to hadoop storage");
+
   public static final ConfigOption<Integer> 
SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE = ConfigOptions
       .key("rss.server.flush.threadPool.queue.size")
       .intType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 7a83cd40..f982bc32 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -94,6 +94,9 @@ public class ShuffleServerMetrics {
   private static final String HUGE_PARTITION_NUM = "huge_partition_num";
   private static final String APP_WITH_HUGE_PARTITION_NUM = 
"app_with_huge_partition_num";
 
+  private static final String LOCAL_FILE_EVENT_FLUSH_NUM = 
"local_file_event_flush_num";
+  private static final String HADOOP_EVENT_FLUSH_NUM = 
"hadoop_event_flush_num";
+
   public static Counter.Child counterTotalAppNum;
   public static Counter.Child counterTotalAppWithHugePartitionNum;
   public static Counter.Child counterTotalPartitionNum;
@@ -154,6 +157,8 @@ public class ShuffleServerMetrics {
   public static Counter counterRemoteStorageFailedWrite;
   public static Counter counterRemoteStorageSuccessWrite;
   private static String tags;
+  public static Counter counterLocalFileEventFlush;
+  public static Counter counterHadoopEventFlush;
 
   private static MetricsManager metricsManager;
   private static boolean isRegister = false;
@@ -285,5 +290,8 @@ public class ShuffleServerMetrics {
 
     gaugeHugePartitionNum = metricsManager.addLabeledGauge(HUGE_PARTITION_NUM);
     gaugeAppWithHugePartitionNum = 
metricsManager.addLabeledGauge(APP_WITH_HUGE_PARTITION_NUM);
+
+    counterLocalFileEventFlush = 
metricsManager.addCounter(LOCAL_FILE_EVENT_FLUSH_NUM);
+    counterHadoopEventFlush = 
metricsManager.addCounter(HADOOP_EVENT_FLUSH_NUM);
   }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 7de4c653..28b7dbd1 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -172,7 +172,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
       ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 
1, null);
       manager.addToFlushQueue(event);
     });
-    waitForFlush(manager, appId, 1, 10 * 5);
+    waitForFlush(manager, appId, 1, 20 * 5);
 
     FileStatus[] fileStatuses = fs.listStatus(new Path(HDFS_URI + "/rss/test/" 
+ appId + "/1/1-1"));
     long actual = Arrays.stream(fileStatuses).filter(x -> 
x.getPath().getName().endsWith("data")).count();
@@ -583,39 +583,56 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
   }
 
   @Test
-  public void processPendingEventsTest(@TempDir File tempDir) throws Exception 
{
-    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.toString());
+  public void defaultFlushEventHandlerTest(@TempDir File tempDir) throws 
Exception {
+    
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
10000L);
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE_HDFS.toString());
     shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(tempDir.getAbsolutePath()));
     shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
-    shuffleServerConf.set(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 5L);
-    StorageManager storageManager =
-        
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
-    ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
-    ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 
100, null, null, null);
-    assertEquals(0, manager.getPendingEventsSize());
-    manager.addPendingEvents(event);
-    Thread.sleep(1000);
-    assertEquals(0, manager.getPendingEventsSize());
-    do {
-      Thread.sleep(1 * 1000);
-    } while (manager.getEventNumInFlush() != 0);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE, 
1);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE,
 1);
+    
shuffleServerConf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+            LocalStorageManagerFallbackStrategy.class.getCanonicalName());
 
-    List<ShufflePartitionedBlock> blocks =
-        Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, 
(byte[]) null));
+    StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    String appId = "fallbackWrittenWhenMultiStorageManagerEnableTest";
+    storageManager.registerRemoteStorage(appId, new 
RemoteStorageInfo(remoteStorage.getPath()));
+
+    ShuffleFlushManager flushManager = new ShuffleFlushManager(
+            shuffleServerConf,
+            mockShuffleServer,
+            storageManager
+    );
+
+    ShuffleServerMetrics.counterLocalFileEventFlush.clear();
+    ShuffleServerMetrics.counterHadoopEventFlush.clear();
+    // case1: normally written to local storage
+    ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 1, 
null, 100);
+    flushManager.addToFlushQueue(event);
+    waitForFlush(flushManager, appId, 1, 5);
+    assertEquals(0, event.getRetryTimes());
+    assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
+
+    // case2: huge event is written to cold storage directly
+    event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
+    flushManager.addToFlushQueue(event);
+    waitForFlush(flushManager, appId, 1, 10);
+    assertEquals(0, event.getRetryTimes());
+    assertEquals(1, ShuffleServerMetrics.counterHadoopEventFlush.get());
+
+    // case3: local disk is full or corrupted, fallback to HDFS
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+            new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, (byte[]) null)
+    );
     ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1, 
1, 100, blocks, null, null);
-    bigEvent.setUnderStorage(storageManager.selectStorage(event));
-    storageManager.updateWriteMetrics(bigEvent, 0);
+    bigEvent.setUnderStorage(((MultiStorageManager) 
storageManager).getWarmStorageManager().selectStorage(event));
+    ((MultiStorageManager) 
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
 
-    manager.addPendingEvents(event);
-    manager.addPendingEvents(event);
-    manager.addPendingEvents(event);
-    Thread.sleep(1000);
-    assertTrue(2 <= manager.getPendingEventsSize());
-    int eventNum = (int) 
ShuffleServerMetrics.counterTotalDroppedEventNum.get();
-    Thread.sleep(6 * 1000);
-    assertEquals(eventNum + 3, (int) 
ShuffleServerMetrics.counterTotalDroppedEventNum.get());
-    assertEquals(0, manager.getPendingEventsSize());
+    event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+    flushManager.addToFlushQueue(event);
+    waitForFlush(flushManager, appId, 1, 15);
+    assertEquals(1, event.getRetryTimes());
+    assertEquals(2, ShuffleServerMetrics.counterLocalFileEventFlush.get());
+    assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get());
   }
 
   private void validateLocalMetadata(StorageManager storageManager, Long size) 
{
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index dabd1e08..2c3636aa 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -21,9 +21,11 @@ import java.io.File;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.RangeMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -42,7 +44,6 @@ import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskManager;
-import org.apache.uniffle.server.TestShuffleFlushManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
 import org.apache.uniffle.storage.util.StorageType;
@@ -577,8 +578,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD, 64L);
 
     StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(conf);
-    TestShuffleFlushManager shuffleFlushManager = new 
TestShuffleFlushManager(conf,
-        "serverId", mockShuffleServer, storageManager);
+    ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(conf, 
mockShuffleServer, storageManager);
     shuffleBufferManager = new ShuffleBufferManager(serverConf, 
shuffleFlushManager);
 
     String appId = "shuffleFlushTest";
@@ -594,7 +594,8 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(96, shuffleBufferManager.getUsedMemory());
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, 
createData(0, 31));
     assertEquals(96 + 63, shuffleBufferManager.getUsedMemory());
-    shuffleFlushManager.flush();
+    Thread.sleep(100);
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     // small shuffle id is kept in memory
     assertEquals(63, shuffleBufferManager.getUsedMemory());
     assertEquals(0, shuffleBufferManager.getInFlushSize());
@@ -603,7 +604,8 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, 
createData(0, 31));
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, 
createData(0, 31));
     assertEquals(63 * 3, shuffleBufferManager.getUsedMemory());
-    shuffleFlushManager.flush();
+    Thread.sleep(100);
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     assertEquals(0, shuffleBufferManager.getUsedMemory());
     assertEquals(0, shuffleBufferManager.getInFlushSize());
 
@@ -612,7 +614,8 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, 
createData(0, 21));
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleIdTwo, false, 
createData(0, 20));
     assertEquals(54 + 53 + 52, shuffleBufferManager.getUsedMemory());
-    shuffleFlushManager.flush();
+    Thread.sleep(100);
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     assertEquals(52, shuffleBufferManager.getUsedMemory());
     assertEquals(0, shuffleBufferManager.getInFlushSize());
   }

Reply via email to