This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e68e1729d [CELEBORN-1483] Add storage policy
e68e1729d is described below

commit e68e1729d95616fb99227610ff2c1f31b31d123a
Author: mingji <[email protected]>
AuthorDate: Tue Jul 9 17:16:09 2024 +0800

    [CELEBORN-1483] Add storage policy
    
    ### What changes were proposed in this pull request?
    To refactor partition data writer.
    Part of CIP-8.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    GA
    
    Closes #2595 from FMX/b1483.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/common/protocol/StorageInfo.java      |  6 ++
 .../org/apache/celeborn/common/CelebornConf.scala  | 45 +++++++++-
 .../apache/celeborn/common/CelebornConfSuite.scala | 47 ++++++++++
 docs/configuration/worker.md                       |  2 +
 .../worker/storage/PartitionDataWriterContext.java | 10 +++
 .../celeborn/service/deploy/worker/Worker.scala    |  1 +
 .../deploy/worker/storage/CelebornFile.scala       | 93 ++++++++++++++++++++
 .../deploy/worker/storage/CelebornFileProxy.scala  | 59 +++++++++++++
 .../deploy/worker/storage/StorageManager.scala     |  5 +-
 .../deploy/worker/storage/StoragePolicy.scala      | 99 ++++++++++++++++++++++
 .../deploy/worker/storage/StoragePolicySuite.scala | 83 ++++++++++++++++++
 11 files changed, 447 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java 
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index d92909128..621edb774 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -40,11 +40,13 @@ public class StorageInfo implements Serializable {
 
   public static final Map<Integer, Type> typesMap = new HashMap<>();
   public static final Set<String> typeNames = new HashSet<>();
+  public static final Map<String, Type> types = new HashMap<>();
 
   static {
     for (Type type : Type.values()) {
       typesMap.put(type.value, type);
       typeNames.add(type.name());
+      types.put(type.name(), type);
     }
   }
 
@@ -234,4 +236,8 @@ public class StorageInfo implements Serializable {
     }
     return ava;
   }
+
+  public static Type fromStrToType(String typeStr) {
+    return types.get(typeStr);
+  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 742c16444..a2b2e284f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -32,7 +32,7 @@ import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.internal.config._
 import org.apache.celeborn.common.network.util.ByteUnit
 import org.apache.celeborn.common.protocol._
-import org.apache.celeborn.common.protocol.StorageInfo.Type
+import org.apache.celeborn.common.protocol.StorageInfo.{typesMap, validate, 
Type}
 import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
 import org.apache.celeborn.common.rpc.RpcTimeout
 import org.apache.celeborn.common.util.{JavaUtils, Utils}
@@ -1130,6 +1130,19 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def readBufferTargetUpdateInterval: Long = 
get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
   def readBufferTargetNotifyThreshold: Long = 
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
   def readBuffersToTriggerReadMin: Int = 
get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
+  def workerStoragePolicyCreateFilePolicy: Option[List[String]] =
+    get(WORKER_STORAGE_CREATE_FILE_POLICY).map {
+      policy => policy.split(",").map(_.trim).toList
+    }.orElse(Some(List("MEMORY", "HDD", "SSD", "HDFS", "OSS")))
+
+  def workerStoragePolicyEvictFilePolicy: Option[Map[String, List[String]]] =
+    get(WORKER_STORAGE_EVICT_POLICY).map {
+      policy =>
+        policy.split("\\|").map(group => {
+          val groupArr = group.split(",")
+          Map(groupArr.head -> groupArr.slice(1, groupArr.length).toList)
+        }).reduce(_ ++ _)
+    }.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS"))))
 
   // //////////////////////////////////////////////////////
   //                   Decommission                      //
@@ -2814,6 +2827,36 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1000ms")
 
+  val WORKER_STORAGE_CREATE_FILE_POLICY: OptionalConfigEntry[String] =
+    buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy")
+      .categories("worker")
+      .doc("This defined the order for creating files across available 
storages." +
+        " Available storages options are: MEMORY,SSD,HDD,HDFS,OSS")
+      .version("0.5.1")
+      .stringConf
+      .checkValue(
+        _.split(",").map(str => 
StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p =>
+          p),
+        "Will use default create file order. Default order: 
MEMORY,SSD,HDD,HDFS,OSS")
+      .createOptional
+
+  val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] =
+    buildConf("celeborn.worker.storage.storagePolicy.evictPolicy")
+      .categories("worker")
+      .doc("This define the order of evict files if the storages are 
available." +
+        " Available storages: MEMORY,SSD,HDD,HDFS. " +
+        "Definition: StorageTypes|StorageTypes|StorageTypes. " +
+        "Example: MEMORY,SSD|SSD,HDFS." +
+        " The example means that a MEMORY shuffle file can be evicted to SSD " 
+
+        "and a SSD shuffle file can be evicted to HDFS.")
+      .version("0.5.1")
+      .stringConf
+      .checkValue(
+        _.replace("|", ",").split(",").map(str =>
+          StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p),
+        "Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,OSS")
+      .createOptional
+
   val WORKER_HTTP_HOST: ConfigEntry[String] =
     buildConf("celeborn.worker.http.host")
       .withAlternative("celeborn.metrics.worker.prometheus.host")
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 08c673599..6da252d9d 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -305,6 +305,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     def moduleKey(config: ConfigEntry[_]): String = {
       config.key.replace("<module>", module)
     }
+
     conf.set(moduleKey(NETWORK_IO_MODE), transportTestNetworkIoMode)
     conf.set(
       moduleKey(NETWORK_IO_PREFER_DIRECT_BUFS),
@@ -406,4 +407,50 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(conf.networkIoConnectTimeoutMs("test_child_module") == 
fallbackValue)
   }
 
+  test("Test storage policy case 1") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"MEMORY,SSD")
+    val createFilePolicy1 = conf.workerStoragePolicyCreateFilePolicy
+    assert(List("MEMORY", "SSD") == createFilePolicy1.get)
+
+    conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"MEMORY,HDFS")
+    val createFilePolicy2 = conf.workerStoragePolicyCreateFilePolicy
+    assert(List("MEMORY", "HDFS") == createFilePolicy2.get)
+
+    conf.unset("celeborn.worker.storage.storagePolicy.createFilePolicy")
+    val createFilePolicy3 = conf.workerStoragePolicyCreateFilePolicy
+    assert(List("MEMORY", "HDD", "SSD", "HDFS", "OSS") == 
createFilePolicy3.get)
+
+    try {
+      conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "ABC")
+      val createFilePolicy4 = conf.workerStoragePolicyCreateFilePolicy
+    } catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[IllegalArgumentException])
+    }
+  }
+
+  test("Test storage policy case 2") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "MEMORY,SSD")
+    val evictPolicy1 = conf.workerStoragePolicyEvictFilePolicy
+    assert(Map("MEMORY" -> List("SSD")) == evictPolicy1.get)
+
+    conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", 
"MEMORY,SSD,HDFS|HDD,HDFS")
+    val evictPolicy2 = conf.workerStoragePolicyEvictFilePolicy
+    assert(Map("MEMORY" -> List("SSD", "HDFS"), "HDD" -> List("HDFS")) == 
evictPolicy2.get)
+
+    conf.unset("celeborn.worker.storage.storagePolicy.evictPolicy")
+    val evictPolicy3 = conf.workerStoragePolicyEvictFilePolicy
+    assert(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")) == 
evictPolicy3.get)
+
+    try {
+      conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "ABC")
+      conf.workerStoragePolicyEvictFilePolicy
+    } catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[IllegalArgumentException])
+    }
+  }
+
 }
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ef15cd7d0..53938ccc0 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -154,6 +154,8 @@ license: |
 | celeborn.worker.storage.disk.reserve.ratio | &lt;undefined&gt; | false | 
Celeborn worker reserved ratio for each disk. The minimum usable size for each 
disk is the max space between the reserved space and the space calculate via 
reserved ratio. | 0.3.2 |  | 
 | celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker 
reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size | 
 | celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a 
expire dirs to be deleted on disk. | 0.3.2 |  | 
+| celeborn.worker.storage.storagePolicy.createFilePolicy | &lt;undefined&gt; | 
false | This defined the order for creating files across available storages. 
Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 |  | 
+| celeborn.worker.storage.storagePolicy.evictPolicy | &lt;undefined&gt; | 
false | This define the order of evict files if the storages are available. 
Available storages: MEMORY,SSD,HDD,HDFS. Definition: 
StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The 
example means that a MEMORY shuffle file can be evicted to SSD and a SSD 
shuffle file can be evicted to HDFS. | 0.5.1 |  | 
 | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | 
Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir | 
 | celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file 
writer to close | 0.2.0 |  | 
 | celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a 
file writer to create if its creation was failed. | 0.2.0 |  | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
index b73b901ad..86d53f9a9 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
@@ -21,6 +21,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.PartitionSplitMode;
 import org.apache.celeborn.common.protocol.PartitionType;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.util.Utils;
 
 public class PartitionDataWriterContext {
@@ -34,6 +35,7 @@ public class PartitionDataWriterContext {
   private final boolean partitionSplitEnabled;
   private final String shuffleKey;
   private final PartitionType partitionType;
+  private StorageInfo.Type storageType = null;
 
   public PartitionDataWriterContext(
       long splitThreshold,
@@ -96,4 +98,12 @@ public class PartitionDataWriterContext {
   public PartitionType getPartitionType() {
     return partitionType;
   }
+
+  public StorageInfo.Type getStorageType() {
+    return storageType;
+  }
+
+  public void setStorageType(StorageInfo.Type storageType) {
+    this.storageType = storageType;
+  }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 8ee278b45..2f8767383 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -60,6 +60,7 @@ import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingSta
 import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
 import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
 import 
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, 
StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.StoragePolicy
 
 private[celeborn] class Worker(
     override val conf: CelebornConf,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala
new file mode 100644
index 000000000..5d67f75d8
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.File
+import java.nio.channels.FileChannel
+
+import io.netty.buffer.{ByteBuf, CompositeByteBuf}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+
+abstract class CelebornFile {
+  var fileInfo: FileInfo = _
+  var flushBuffer: CompositeByteBuf = _
+  val flushLock = new AnyRef
+  var flusher: Flusher = _
+  var flushWorkerIndex: Int = _
+  var writerCloseTimeoutMs: Long = _
+  var flusherBufferSize = 0L
+  var source: AbstractSource = _ // metrics
+  var chunkSize: Long = _
+  var metricsCollectCriticalEnabled = false
+  var storageType: StorageInfo.Type = _
+
+  def write(buf: ByteBuf): Unit
+
+  def needEvict: Boolean
+
+  def evict(file: CelebornFile): Unit
+
+  def close(): Unit
+}
+
+class CelebornMemoryFile(
+    conf: CelebornConf,
+    source: AbstractSource,
+    fileInfo: MemoryFileInfo,
+    storageType: StorageInfo.Type) extends CelebornFile {
+
+  override def write(buf: ByteBuf): Unit = {}
+
+  override def needEvict: Boolean = ???
+
+  override def evict(file: CelebornFile): Unit = ???
+
+  override def close(): Unit = ???
+}
+
+class CelebornDiskFile(
+    flusher: Flusher,
+    diskFileInfo: DiskFileInfo,
+    workingDir: File,
+    storageType: StorageInfo.Type) extends CelebornFile {
+  private var channel: FileChannel = null
+
+  override def write(buf: ByteBuf): Unit = ???
+
+  override def needEvict: Boolean = ???
+
+  override def evict(file: CelebornFile): Unit = ???
+
+  override def close(): Unit = ???
+}
+
+class CelebornDFSFile(flusher: Flusher, hdfsFileInfo: DiskFileInfo, 
storageType: StorageInfo.Type)
+  extends CelebornFile {
+
+  override def write(buf: ByteBuf): Unit = ???
+
+  override def needEvict: Boolean = ???
+
+  override def evict(file: CelebornFile): Unit = ???
+
+  override def close(): Unit = ???
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala
new file mode 100644
index 000000000..4c4a4b5b4
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+
+class CelebornFileProxy(
+    partitionDataWriterContext: PartitionDataWriterContext,
+    storageManager: StorageManager,
+    conf: CelebornConf,
+    source: AbstractSource) {
+  var currentFile: CelebornFile = _
+  var flusher: Flusher = null
+  var flushWorkerIndex = 0
+
+  currentFile = 
storageManager.storagePolicy.createFile(partitionDataWriterContext)
+
+  def write(buf: ByteBuf) = {
+    this.synchronized {
+      currentFile.write(buf)
+    }
+  }
+
+  def evict(force: Boolean) = {
+    if (currentFile.needEvict || force) {
+      this.synchronized {
+        val nFile =
+          storageManager.storagePolicy.getEvictedFile(currentFile, 
partitionDataWriterContext)
+        currentFile.evict(nFile)
+        currentFile = nFile
+      }
+    }
+  }
+
+  def close(): Unit = {}
+
+  def isMemoryShuffleFile: Boolean = {
+    currentFile.storageType == StorageInfo.Type.MEMORY
+  }
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e767060c7..34579327a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -68,6 +68,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   val hasHDFSStorage = conf.hasHDFSStorage
 
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
+  val storagePolicy = new StoragePolicy(conf, this, workerSource)
 
   // (deviceName -> deviceInfo) and (mount point -> diskInfo)
   val (deviceInfos, diskInfos) = {
@@ -873,7 +874,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       && MemoryManager.instance().memoryFileStorageAvailable()) {
       logDebug(s"Create memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
       (
-        createMemoryFile(
+        createMemoryFileInfo(
           partitionDataWriterContext.getAppId,
           partitionDataWriterContext.getShuffleId,
           location.getFileName,
@@ -899,7 +900,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     }
   }
 
-  def createMemoryFile(
+  def createMemoryFileInfo(
       appId: String,
       shuffleId: Int,
       fileName: String,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
new file mode 100644
index 000000000..228063d81
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.CelebornIOException
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+
+class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, 
source: AbstractSource)
+  extends Logging {
+  var createFileOrder: Option[List[String]] = 
conf.workerStoragePolicyCreateFilePolicy
+  var evictFileOrder: Option[Map[String, List[String]]] = 
conf.workerStoragePolicyEvictFilePolicy
+
+  def getEvictedFile(
+      celebornFile: CelebornFile,
+      partitionDataWriterContext: PartitionDataWriterContext): CelebornFile = {
+    evictFileOrder.foreach { order =>
+      val orderList = order.get(celebornFile.storageType.name())
+      if (orderList != null) {
+        return createFile(partitionDataWriterContext, orderList)
+      }
+    }
+    null
+  }
+
+  def createFile(
+      partitionDataWriterContext: PartitionDataWriterContext,
+      order: Option[List[String]] = createFileOrder): CelebornFile = {
+    logDebug(
+      s"create file for ${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+    val location = partitionDataWriterContext.getPartitionLocation
+
+    def tryCreateFileByType(storageInfoType: StorageInfo.Type): CelebornFile = 
{
+      try {
+        storageInfoType match {
+          case StorageInfo.Type.MEMORY =>
+            logDebug(s"Create memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+            val memoryFileInfo = storageManager.createMemoryFileInfo(
+              partitionDataWriterContext.getAppId,
+              partitionDataWriterContext.getShuffleId,
+              location.getFileName,
+              partitionDataWriterContext.getUserIdentifier,
+              partitionDataWriterContext.getPartitionType,
+              partitionDataWriterContext.isPartitionSplitEnabled)
+            partitionDataWriterContext.setStorageType(storageInfoType)
+            new CelebornMemoryFile(conf, source, memoryFileInfo, 
storageInfoType)
+          case StorageInfo.Type.HDD | StorageInfo.Type.SSD | 
StorageInfo.Type.HDFS | StorageInfo.Type.OSS =>
+            logDebug(s"create non-memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+            val (flusher, diskFileInfo, workingDir) = 
storageManager.createDiskFile(
+              location,
+              partitionDataWriterContext.getAppId,
+              partitionDataWriterContext.getShuffleId,
+              location.getFileName,
+              partitionDataWriterContext.getUserIdentifier,
+              partitionDataWriterContext.getPartitionType,
+              partitionDataWriterContext.isPartitionSplitEnabled)
+            if (storageInfoType == StorageInfo.Type.HDD || storageInfoType == 
StorageInfo.Type.SSD) {
+              new CelebornDiskFile(flusher, diskFileInfo, workingDir, 
storageInfoType)
+            } else {
+              new CelebornDFSFile(flusher, diskFileInfo, storageInfoType)
+            }
+        }
+      } catch {
+        case e: Exception =>
+          logError(s"create celeborn file for storage ${storageInfoType} 
failed", e)
+          null
+      }
+    }
+
+    order.foreach(lst => {
+      for (storageStr <- lst) {
+        val storageInfoType = StorageInfo.fromStrToType(storageStr)
+        val file = tryCreateFileByType(storageInfoType)
+        if (file != null) {
+          return file
+        }
+      }
+    })
+
+    throw new CelebornIOException(s"Create file failed for 
${partitionDataWriterContext}")
+  }
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala
new file mode 100644
index 000000000..c2cfd596b
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.File
+
+import org.mockito.ArgumentMatchers.any
+import org.mockito.MockitoSugar.mock
+import org.mockito.MockitoSugar.when
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.meta.{DiskFileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
+
+class StoragePolicySuite extends CelebornFunSuite {
+  val mockedStorageManager: StorageManager = mock[StorageManager]
+  val mockedSource: AbstractSource = mock[AbstractSource]
+  val mockedPartitionWriterContext: PartitionDataWriterContext = 
mock[PartitionDataWriterContext]
+
+  val mockedCelebornMemoryFile = mock[MemoryFileInfo]
+  when(
+    mockedStorageManager.createMemoryFileInfo(any(), any(), any(), any(), 
any(), any())).thenAnswer(
+    mockedCelebornMemoryFile)
+
+  val mockedDiskFile = mock[DiskFileInfo]
+  val mockedFlusher = mock[Flusher]
+  val mockedFile = mock[File]
+  when(
+    mockedStorageManager.createDiskFile(
+      any(),
+      any(),
+      any(),
+      any(),
+      any(),
+      any(),
+      any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
+
+  val mockedPartitionLocation =
+    new PartitionLocation(1, 1, "h1", 1, 2, 3, 4, 
PartitionLocation.Mode.PRIMARY)
+  
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(mockedPartitionLocation)
+
+  test("test create file order case1") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"MEMORY,SSD,HDD,HDFS,OSS")
+    val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
+    val file = storagePolicy.createFile(mockedPartitionWriterContext)
+    assert(file.isInstanceOf[CelebornMemoryFile])
+  }
+
+  test("test create file order case2") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"SSD,HDD,HDFS,OSS")
+    val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
+    val file = storagePolicy.createFile(mockedPartitionWriterContext)
+    assert(file.isInstanceOf[CelebornDiskFile])
+  }
+
+  test("test getEvicted file case1") {
+    val mockedMemoryFile = mock[CelebornMemoryFile]
+    val conf = new CelebornConf()
+    val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
+    when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY)
+    val nFile = storagePolicy.getEvictedFile(mockedMemoryFile, 
mockedPartitionWriterContext)
+    assert(nFile.isInstanceOf[CelebornDiskFile])
+  }
+}

Reply via email to