This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fbdfe57 feat: introduce storage manager selector to support more 
selector strategy (#621)
1fbdfe57 is described below

commit 1fbdfe576f0a164f173cbff9ad83dccccb909ee2
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Mar 1 19:16:54 2023 +0800

    feat: introduce storage manager selector to support more selector strategy 
(#621)
    
    ### What changes were proposed in this pull request?
    
    1. Introduce storage manager selector to support more selector strategy for 
`MultiStorageManager`
    2. Introduce the conf of `rss.server.multistorage.manager.selector.class` 
to support different flush strategy, like I hope huge partition directly 
flushed to HDFS and normal partition could be flushed to DISK when single 
buffer flush is enabled.
    
    ### Why are the changes needed?
    Solving the problem mentioned in 
https://github.com/apache/incubator-uniffle/issues/378#issuecomment-1373447729.
    
    In current codebase, when encountering huge partition, if single buffer 
flush is enabled, the normal partition data will be flush to HDFS(I don't hope 
so, because the local disk is free and the flushing speed is faster than HDFS). 
But if disable single flush buffer, the huge partition event before marking as 
huge partition may be big, which cause the slow flushing and then cause 
requiring allocated buffer failed.
    
    Based on above problems, this PR is to make single event carrying with 100 
mb flushed into HDFS or local file leveraging the conf of 
`rss.server.multistorage.manager.selector.class`
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Doc will be updated later.
    
    ### How was this patch tested?
    1. UTs
---
 docs/server_guide.md                               |  1 +
 .../uniffle/server/ShuffleDataFlushEvent.java      | 10 ++++
 .../apache/uniffle/server/ShuffleServerConf.java   |  7 +++
 .../apache/uniffle/server/ShuffleTaskManager.java  |  7 ++-
 .../server/buffer/ShuffleBufferManager.java        | 70 +++++++++++-----------
 .../server/storage/MultiStorageManager.java        | 51 +++++++++++-----
 .../multi/DefaultStorageManagerSelector.java       | 47 +++++++++++++++
 .../multi/FallbackBasedStorageManagerSelector.java | 57 ++++++++++++++++++
 ...gePartitionSensitiveStorageManagerSelector.java | 43 +++++++++++++
 .../storage/multi/StorageManagerSelector.java      | 31 ++++++++++
 .../server/storage/MultiStorageManagerTest.java    | 38 ++++++++++++
 11 files changed, 310 insertions(+), 52 deletions(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index fe20f342..bfae4405 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -89,6 +89,7 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.server.leak.shuffledata.check.interval            | 3600000 | The 
interval of leak shuffle data check (ms)                                        
                                                                                
                                                                                
                                                                                
                                                     |
 | rss.server.max.concurrency.of.single.partition.writer | 1       | The max 
concurrency of single partition writer, the data partition file number is equal 
to this value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.                                                  
                                                                                
                                                 |
 | rss.metrics.reporter.class                            | -       | The class 
of metrics reporter.                                                            
                                                                                
                                                                                
                                                                                
                                               |
+|rss.server.multistorage.manager.selector.class         | 
org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                  |
 
 ### Advanced Configurations
 |Property Name|Default| Description                                            
                                                                                
                                                     |
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 8dab3ac6..56abf7a2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -47,6 +47,8 @@ public class ShuffleDataFlushEvent {
   private Storage underStorage;
   private final List<Runnable> cleanupCallbackChains;
 
+  private boolean ownedByHugePartition = false;
+
   public ShuffleDataFlushEvent(
       long eventId,
       String appId,
@@ -163,4 +165,12 @@ public class ShuffleDataFlushEvent {
         + ", underStorage=" + (underStorage == null ? null : 
underStorage.getClass().getSimpleName())
         + ", isPended=" + isPended;
   }
+
+  public boolean isOwnedByHugePartition() {
+    return ownedByHugePartition;
+  }
+
+  public void markOwnedByHugePartition() {
+    this.ownedByHugePartition = true;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5cf52049..8051669e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -284,6 +284,13 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(64L * 1024L * 1024L)
       .withDescription("For multistorage, the event size exceed this value, 
flush data  to cold storage");
 
+  public static final ConfigOption<String> MULTISTORAGE_MANAGER_SELECTOR_CLASS 
= ConfigOptions
+      .key("rss.server.multistorage.manager.selector.class")
+      .stringType()
+      
.defaultValue("org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector")
+      .withDescription("For multistorage, the storage manager selector 
strategy to support "
+          + "policies of flushing to different storages");
+
   public static final ConfigOption<String> 
MULTISTORAGE_FALLBACK_STRATEGY_CLASS = ConfigOptions
       .key("rss.server.multistorage.fallback.strategy.class")
       .stringType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 32f56485..e723d22c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -135,6 +135,10 @@ public class ShuffleTaskManager {
               this::triggerFlush, triggerFlushInterval / 2,
           triggerFlushInterval, TimeUnit.MILLISECONDS);
     }
+    if (shuffleBufferManager != null) {
+      shuffleBufferManager.setShuffleTaskManager(this);
+    }
+
     // the thread for clear expired resources
     clearResourceThread = () -> {
       while (true) {
@@ -204,8 +208,7 @@ public class ShuffleTaskManager {
         appId,
         shuffleId,
         isPreAllocated,
-        spd,
-        this::getPartitionDataSize
+        spd
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 497b3430..a69a28e1 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -41,16 +41,17 @@ import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.common.util.TripleFunction;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskManager;
 
 public class ShuffleBufferManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleBufferManager.class);
 
+  private ShuffleTaskManager shuffleTaskManager;
   private final ShuffleFlushManager shuffleFlushManager;
   private long capacity;
   private long readCapacity;
@@ -103,6 +104,10 @@ public class ShuffleBufferManager {
     );
   }
 
+  public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
+    this.shuffleTaskManager = taskManager;
+  }
+
   public StatusCode registerBuffer(String appId, int shuffleId, int 
startPartition, int endPartition) {
     bufferPool.putIfAbsent(appId, Maps.newConcurrentMap());
     Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers = 
bufferPool.get(appId);
@@ -120,27 +125,11 @@ public class ShuffleBufferManager {
     return StatusCode.SUCCESS;
   }
 
-  // Only for tests
   public StatusCode cacheShuffleData(
       String appId,
       int shuffleId,
       boolean isPreAllocated,
       ShufflePartitionedData spd) {
-    return cacheShuffleData(
-        appId,
-        shuffleId,
-        isPreAllocated,
-        spd,
-        null
-    );
-  }
-
-  public StatusCode cacheShuffleData(
-      String appId,
-      int shuffleId,
-      boolean isPreAllocated,
-      ShufflePartitionedData spd,
-      TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc) 
{
     if (!isPreAllocated && isFull()) {
       LOG.warn("Got unexpected data, can't cache it because the space is 
full");
       return StatusCode.NO_BUFFER;
@@ -165,8 +154,7 @@ public class ShuffleBufferManager {
           shuffleId,
           spd.getPartitionId(),
           entry.getKey().lowerEndpoint(),
-          entry.getKey().upperEndpoint(),
-          getPartitionDataSizeFunc
+          entry.getKey().upperEndpoint()
       );
       flushIfNecessary();
     }
@@ -232,19 +220,12 @@ public class ShuffleBufferManager {
       int shuffleId,
       int partitionId,
       int startPartition,
-      int endPartition,
-      TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc) 
{
+      int endPartition) {
+    boolean isHugePartition = isHugePartition(appId, shuffleId, partitionId);
     // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
-    if (this.bufferFlushEnabled && buffer.getSize() > 
this.bufferFlushThreshold) {
-      flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
-      return;
-    }
-
-    if (getPartitionDataSizeFunc != null
-        && getPartitionDataSizeFunc.accept(appId, shuffleId, partitionId) > 
hugePartitionSizeThreshold
-        && buffer.getSize() > this.bufferFlushThreshold) {
-      flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
+    if ((isHugePartition || this.bufferFlushEnabled) && buffer.getSize() > 
this.bufferFlushThreshold) {
+      flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, 
isHugePartition);
       return;
     }
   }
@@ -265,12 +246,19 @@ public class ShuffleBufferManager {
     for (Map.Entry<Range<Integer>, ShuffleBuffer> entry : 
buffers.asMapOfRanges().entrySet()) {
       ShuffleBuffer buffer = entry.getValue();
       Range<Integer> range = entry.getKey();
-      flushBuffer(buffer, appId, shuffleId, range.lowerEndpoint(), 
range.upperEndpoint());
+      flushBuffer(
+          buffer,
+          appId,
+          shuffleId,
+          range.lowerEndpoint(),
+          range.upperEndpoint(),
+          isHugePartition(appId, shuffleId, range.lowerEndpoint())
+      );
     }
   }
 
   protected void flushBuffer(ShuffleBuffer buffer, String appId,
-      int shuffleId, int startPartition, int endPartition) {
+      int shuffleId, int startPartition, int endPartition, boolean 
isHugePartition) {
     ShuffleDataFlushEvent event =
         buffer.toFlushEvent(
             appId,
@@ -284,6 +272,9 @@ public class ShuffleBufferManager {
       event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, 
false));
       updateShuffleSize(appId, shuffleId, -event.getSize());
       inFlushSize.addAndGet(event.getSize());
+      if (isHugePartition) {
+        event.markOwnedByHugePartition();
+      }
       ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
       shuffleFlushManager.addToFlushQueue(event);
     }
@@ -394,8 +385,14 @@ public class ShuffleBufferManager {
             for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
                 shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
               Range<Integer> range = rangeEntry.getKey();
-              flushBuffer(rangeEntry.getValue(), appId, shuffleId,
-                  range.lowerEndpoint(), range.upperEndpoint());
+              flushBuffer(
+                  rangeEntry.getValue(),
+                  appId,
+                  shuffleId,
+                  range.lowerEndpoint(),
+                  range.upperEndpoint(),
+                  isHugePartition(appId, shuffleId, range.lowerEndpoint())
+              );
             }
           }
         }
@@ -576,6 +573,11 @@ public class ShuffleBufferManager {
     }
   }
 
+  boolean isHugePartition(String appId, int shuffleId, int partitionId) {
+    return shuffleTaskManager != null
+        && shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 
partitionId) > hugePartitionSizeThreshold;
+  }
+
   public boolean isHugePartition(long usedPartitionDataSize) {
     return usedPartitionDataSize > hugePartitionSizeThreshold;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index ed01de8f..a9c4af78 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.event.PurgeEvent;
+import org.apache.uniffle.server.storage.multi.StorageManagerSelector;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 
@@ -43,25 +44,53 @@ public class MultiStorageManager implements StorageManager {
 
   private final StorageManager warmStorageManager;
   private final StorageManager coldStorageManager;
-  private final long flushColdStorageThresholdSize;
-  private AbstractStorageManagerFallbackStrategy 
storageManagerFallbackStrategy;
   private final Cache<ShuffleDataFlushEvent, StorageManager> 
eventOfUnderStorageManagers;
+  private final StorageManagerSelector storageManagerSelector;
 
   MultiStorageManager(ShuffleServerConf conf) {
     warmStorageManager = new LocalStorageManager(conf);
     coldStorageManager = new HdfsStorageManager(conf);
-    flushColdStorageThresholdSize = 
conf.getSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE);
+
     try {
-      storageManagerFallbackStrategy = loadFallbackStrategy(conf);
+      AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy = 
loadFallbackStrategy(conf);
+      this.storageManagerSelector = loadManagerSelector(
+          conf,
+          storageManagerFallbackStrategy,
+          warmStorageManager,
+          coldStorageManager
+      );
     } catch (Exception e) {
-      throw new RuntimeException("Load fallback strategy failed.", e);
+      throw new RuntimeException("Errors on loading selector manager.", e);
     }
+
     long cacheTimeout = 
conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
     eventOfUnderStorageManagers = CacheBuilder.newBuilder()
         .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
         .build();
   }
 
+  private StorageManagerSelector loadManagerSelector(
+      ShuffleServerConf conf,
+      AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy,
+      StorageManager warmStorageManager,
+      StorageManager coldStorageManager) throws Exception {
+    String name = 
conf.get(ShuffleServerConf.MULTISTORAGE_MANAGER_SELECTOR_CLASS);
+    Class<?> klass = Class.forName(name);
+    Constructor<?> constructor = klass.getConstructor(
+        StorageManager.class,
+        StorageManager.class,
+        AbstractStorageManagerFallbackStrategy.class,
+        conf.getClass()
+    );
+    StorageManagerSelector instance = (StorageManagerSelector) 
constructor.newInstance(
+        warmStorageManager,
+        coldStorageManager,
+        storageManagerFallbackStrategy,
+        conf
+    );
+    return instance;
+  }
+
   public static AbstractStorageManagerFallbackStrategy loadFallbackStrategy(
       ShuffleServerConf conf) throws Exception {
     String name = 
conf.getString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
@@ -100,17 +129,7 @@ public class MultiStorageManager implements StorageManager 
{
   }
 
   private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
-    StorageManager storageManager;
-    if (event.getSize() > flushColdStorageThresholdSize) {
-      storageManager = coldStorageManager;
-    } else {
-      storageManager = warmStorageManager;
-    }
-
-    if (!storageManager.canWrite(event) || event.getRetryTimes() > 0) {
-      storageManager = storageManagerFallbackStrategy.tryFallback(
-          storageManager, event, warmStorageManager, coldStorageManager);
-    }
+    StorageManager storageManager = storageManagerSelector.select(event);
     eventOfUnderStorageManagers.put(event, storageManager);
     event.addCleanupCallback(() -> 
eventOfUnderStorageManagers.invalidate(event));
     return storageManager;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/multi/DefaultStorageManagerSelector.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/DefaultStorageManagerSelector.java
new file mode 100644
index 00000000..50f49823
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/DefaultStorageManagerSelector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import 
org.apache.uniffle.server.storage.AbstractStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.StorageManager;
+
+import static 
org.apache.uniffle.server.ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE;
+
+public class DefaultStorageManagerSelector extends 
FallbackBasedStorageManagerSelector {
+  private final long flushColdStorageThresholdSize;
+
+  public DefaultStorageManagerSelector(
+      StorageManager warmStorageManager,
+      StorageManager coldStorageManager,
+      AbstractStorageManagerFallbackStrategy fallbackStrategy,
+      ShuffleServerConf rssConf) {
+    super(warmStorageManager, coldStorageManager, fallbackStrategy);
+    this.flushColdStorageThresholdSize = 
rssConf.get(FLUSH_COLD_STORAGE_THRESHOLD_SIZE);
+  }
+
+  @Override
+  StorageManager regularSelect(ShuffleDataFlushEvent flushEvent) {
+    StorageManager storageManager = warmStorageManager;
+    if (flushEvent.getSize() > flushColdStorageThresholdSize) {
+      storageManager = coldStorageManager;
+    }
+    return storageManager;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/multi/FallbackBasedStorageManagerSelector.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/FallbackBasedStorageManagerSelector.java
new file mode 100644
index 00000000..50443091
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/FallbackBasedStorageManagerSelector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import 
org.apache.uniffle.server.storage.AbstractStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.StorageManager;
+
+public abstract class FallbackBasedStorageManagerSelector implements 
StorageManagerSelector {
+  protected final StorageManager warmStorageManager;
+  protected final StorageManager coldStorageManager;
+  private final AbstractStorageManagerFallbackStrategy fallbackStrategy;
+
+  public FallbackBasedStorageManagerSelector(
+      StorageManager warmStorageManager,
+      StorageManager coldStorageManager,
+      AbstractStorageManagerFallbackStrategy fallbackStrategy) {
+    this.warmStorageManager = warmStorageManager;
+    this.coldStorageManager = coldStorageManager;
+    this.fallbackStrategy = fallbackStrategy;
+  }
+
+  abstract StorageManager regularSelect(ShuffleDataFlushEvent flushEvent);
+
+  private StorageManager fallbackSelect(ShuffleDataFlushEvent flushEvent, 
StorageManager candidateStorageManager) {
+    return fallbackStrategy.tryFallback(
+        candidateStorageManager,
+        flushEvent,
+        warmStorageManager,
+        coldStorageManager
+    );
+  }
+
+  @Override
+  public StorageManager select(ShuffleDataFlushEvent flushEvent) {
+    StorageManager storageManager = regularSelect(flushEvent);
+    if (!storageManager.canWrite(flushEvent) || flushEvent.getRetryTimes() > 
0) {
+      storageManager = fallbackSelect(flushEvent, storageManager);
+    }
+    return storageManager;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/multi/HugePartitionSensitiveStorageManagerSelector.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/HugePartitionSensitiveStorageManagerSelector.java
new file mode 100644
index 00000000..808d5d47
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/HugePartitionSensitiveStorageManagerSelector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import 
org.apache.uniffle.server.storage.AbstractStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.StorageManager;
+
+public class HugePartitionSensitiveStorageManagerSelector extends 
FallbackBasedStorageManagerSelector {
+
+  public HugePartitionSensitiveStorageManagerSelector(
+      StorageManager warmStorageManager,
+      StorageManager coldStorageManager,
+      AbstractStorageManagerFallbackStrategy fallbackStrategy,
+      ShuffleServerConf rssConf) {
+    super(warmStorageManager, coldStorageManager, fallbackStrategy);
+  }
+
+  @Override
+  StorageManager regularSelect(ShuffleDataFlushEvent flushEvent) {
+    StorageManager storageManager = warmStorageManager;
+    if (flushEvent.isOwnedByHugePartition()) {
+      storageManager = coldStorageManager;
+    }
+    return storageManager;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/multi/StorageManagerSelector.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/StorageManagerSelector.java
new file mode 100644
index 00000000..55cbd509
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/multi/StorageManagerSelector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage.multi;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.storage.StorageManager;
+
+public interface StorageManagerSelector {
+
+  StorageManager select(ShuffleDataFlushEvent flushEvent);
+
+  enum ColdStoragePreferredFactor {
+    HUGE_EVENT,
+    OWNED_BY_HUGE_PARTITION,
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index d385e933..3cea6d32 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -56,6 +56,44 @@ public class MultiStorageManagerTest {
     assertTrue((manager.selectStorage(event) instanceof HdfsStorage));
   }
 
+  @Test
+  public void testStorageManagerSelectorOfPreferCold() {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    conf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+        RotateStorageManagerFallbackStrategy.class.getCanonicalName());
+    conf.set(
+        ShuffleServerConf.MULTISTORAGE_MANAGER_SELECTOR_CLASS,
+        
"org.apache.uniffle.server.storage.multi.HugePartitionSensitiveStorageManagerSelector"
+    );
+    MultiStorageManager manager = new MultiStorageManager(conf);
+    String remoteStorage = "test";
+    String appId = "selectStorageManagerIfCanNotWriteTest_appId";
+    manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+
+    /**
+     * case1: only event owned by huge partition will be flushed to cold 
storage
+     * when the
+     * {@link 
org.apache.uniffle.server.storage.multi.StorageManagerSelector.ColdStoragePreferredFactor.HUGE_PARTITION}
+     * is enabled.
+     */
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+        new ShufflePartitionedBlock(10001, 1000, 1, 1, 1L, null)
+    );
+    ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 100000, blocks, null, null);
+    Storage storage = manager.selectStorage(event);
+    assertTrue(storage instanceof LocalStorage);
+
+    ShuffleDataFlushEvent event1 = new ShuffleDataFlushEvent(1, appId, 1, 1, 
1, 10, blocks, null, null);
+    event1.markOwnedByHugePartition();
+    storage = manager.selectStorage(event1);
+    assertTrue(storage instanceof HdfsStorage);
+  }
+
   @Test
   public void underStorageManagerSelectionTest() {
     ShuffleServerConf conf = new ShuffleServerConf();

Reply via email to