This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e68e1729d [CELEBORN-1483] Add storage policy
e68e1729d is described below
commit e68e1729d95616fb99227610ff2c1f31b31d123a
Author: mingji <[email protected]>
AuthorDate: Tue Jul 9 17:16:09 2024 +0800
[CELEBORN-1483] Add storage policy
### What changes were proposed in this pull request?
To refactor partition data writer.
Part of CIP-8.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
GA
Closes #2595 from FMX/b1483.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/common/protocol/StorageInfo.java | 6 ++
.../org/apache/celeborn/common/CelebornConf.scala | 45 +++++++++-
.../apache/celeborn/common/CelebornConfSuite.scala | 47 ++++++++++
docs/configuration/worker.md | 2 +
.../worker/storage/PartitionDataWriterContext.java | 10 +++
.../celeborn/service/deploy/worker/Worker.scala | 1 +
.../deploy/worker/storage/CelebornFile.scala | 93 ++++++++++++++++++++
.../deploy/worker/storage/CelebornFileProxy.scala | 59 +++++++++++++
.../deploy/worker/storage/StorageManager.scala | 5 +-
.../deploy/worker/storage/StoragePolicy.scala | 99 ++++++++++++++++++++++
.../deploy/worker/storage/StoragePolicySuite.scala | 83 ++++++++++++++++++
11 files changed, 447 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index d92909128..621edb774 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -40,11 +40,13 @@ public class StorageInfo implements Serializable {
public static final Map<Integer, Type> typesMap = new HashMap<>();
public static final Set<String> typeNames = new HashSet<>();
+ public static final Map<String, Type> types = new HashMap<>();
static {
for (Type type : Type.values()) {
typesMap.put(type.value, type);
typeNames.add(type.name());
+ types.put(type.name(), type);
}
}
@@ -234,4 +236,8 @@ public class StorageInfo implements Serializable {
}
return ava;
}
+
+ public static Type fromStrToType(String typeStr) {
+ return types.get(typeStr);
+ }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 742c16444..a2b2e284f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -32,7 +32,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
import org.apache.celeborn.common.network.util.ByteUnit
import org.apache.celeborn.common.protocol._
-import org.apache.celeborn.common.protocol.StorageInfo.Type
+import org.apache.celeborn.common.protocol.StorageInfo.{typesMap, validate,
Type}
import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
import org.apache.celeborn.common.rpc.RpcTimeout
import org.apache.celeborn.common.util.{JavaUtils, Utils}
@@ -1130,6 +1130,19 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def readBufferTargetUpdateInterval: Long =
get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
def readBufferTargetNotifyThreshold: Long =
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
def readBuffersToTriggerReadMin: Int =
get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
+ def workerStoragePolicyCreateFilePolicy: Option[List[String]] =
+ get(WORKER_STORAGE_CREATE_FILE_POLICY).map {
+ policy => policy.split(",").map(_.trim).toList
+ }.orElse(Some(List("MEMORY", "HDD", "SSD", "HDFS", "OSS")))
+
+ def workerStoragePolicyEvictFilePolicy: Option[Map[String, List[String]]] =
+ get(WORKER_STORAGE_EVICT_POLICY).map {
+ policy =>
+ policy.split("\\|").map(group => {
+ val groupArr = group.split(",")
+ Map(groupArr.head -> groupArr.slice(1, groupArr.length).toList)
+ }).reduce(_ ++ _)
+ }.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS"))))
// //////////////////////////////////////////////////////
// Decommission //
@@ -2814,6 +2827,36 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1000ms")
+ val WORKER_STORAGE_CREATE_FILE_POLICY: OptionalConfigEntry[String] =
+ buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy")
+ .categories("worker")
+ .doc("This defined the order for creating files across available
storages." +
+ " Available storages options are: MEMORY,SSD,HDD,HDFS,OSS")
+ .version("0.5.1")
+ .stringConf
+ .checkValue(
+ _.split(",").map(str =>
StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p =>
+ p),
+ "Will use default create file order. Default order:
MEMORY,SSD,HDD,HDFS,OSS")
+ .createOptional
+
+ val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] =
+ buildConf("celeborn.worker.storage.storagePolicy.evictPolicy")
+ .categories("worker")
+ .doc("This define the order of evict files if the storages are
available." +
+ " Available storages: MEMORY,SSD,HDD,HDFS. " +
+ "Definition: StorageTypes|StorageTypes|StorageTypes. " +
+ "Example: MEMORY,SSD|SSD,HDFS." +
+ " The example means that a MEMORY shuffle file can be evicted to SSD "
+
+ "and a SSD shuffle file can be evicted to HDFS.")
+ .version("0.5.1")
+ .stringConf
+ .checkValue(
+ _.replace("|", ",").split(",").map(str =>
+ StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p),
+ "Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,OSS")
+ .createOptional
+
val WORKER_HTTP_HOST: ConfigEntry[String] =
buildConf("celeborn.worker.http.host")
.withAlternative("celeborn.metrics.worker.prometheus.host")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 08c673599..6da252d9d 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -305,6 +305,7 @@ class CelebornConfSuite extends CelebornFunSuite {
def moduleKey(config: ConfigEntry[_]): String = {
config.key.replace("<module>", module)
}
+
conf.set(moduleKey(NETWORK_IO_MODE), transportTestNetworkIoMode)
conf.set(
moduleKey(NETWORK_IO_PREFER_DIRECT_BUFS),
@@ -406,4 +407,50 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(conf.networkIoConnectTimeoutMs("test_child_module") ==
fallbackValue)
}
+ test("Test storage policy case 1") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"MEMORY,SSD")
+ val createFilePolicy1 = conf.workerStoragePolicyCreateFilePolicy
+ assert(List("MEMORY", "SSD") == createFilePolicy1.get)
+
+ conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"MEMORY,HDFS")
+ val createFilePolicy2 = conf.workerStoragePolicyCreateFilePolicy
+ assert(List("MEMORY", "HDFS") == createFilePolicy2.get)
+
+ conf.unset("celeborn.worker.storage.storagePolicy.createFilePolicy")
+ val createFilePolicy3 = conf.workerStoragePolicyCreateFilePolicy
+ assert(List("MEMORY", "HDD", "SSD", "HDFS", "OSS") ==
createFilePolicy3.get)
+
+ try {
+ conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "ABC")
+ val createFilePolicy4 = conf.workerStoragePolicyCreateFilePolicy
+ } catch {
+ case e: Exception =>
+ assert(e.isInstanceOf[IllegalArgumentException])
+ }
+ }
+
+ test("Test storage policy case 2") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "MEMORY,SSD")
+ val evictPolicy1 = conf.workerStoragePolicyEvictFilePolicy
+ assert(Map("MEMORY" -> List("SSD")) == evictPolicy1.get)
+
+ conf.set("celeborn.worker.storage.storagePolicy.evictPolicy",
"MEMORY,SSD,HDFS|HDD,HDFS")
+ val evictPolicy2 = conf.workerStoragePolicyEvictFilePolicy
+ assert(Map("MEMORY" -> List("SSD", "HDFS"), "HDD" -> List("HDFS")) ==
evictPolicy2.get)
+
+ conf.unset("celeborn.worker.storage.storagePolicy.evictPolicy")
+ val evictPolicy3 = conf.workerStoragePolicyEvictFilePolicy
+ assert(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")) ==
evictPolicy3.get)
+
+ try {
+ conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "ABC")
+ conf.workerStoragePolicyEvictFilePolicy
+ } catch {
+ case e: Exception =>
+ assert(e.isInstanceOf[IllegalArgumentException])
+ }
+ }
+
}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ef15cd7d0..53938ccc0 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -154,6 +154,8 @@ license: |
| celeborn.worker.storage.disk.reserve.ratio | <undefined> | false |
Celeborn worker reserved ratio for each disk. The minimum usable size for each
disk is the max space between the reserved space and the space calculate via
reserved ratio. | 0.3.2 | |
| celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker
reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size |
| celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a
expire dirs to be deleted on disk. | 0.3.2 | |
+| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> |
false | This defined the order for creating files across available storages.
Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 | |
+| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> |
false | This define the order of evict files if the storages are available.
Available storages: MEMORY,SSD,HDD,HDFS. Definition:
StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The
example means that a MEMORY shuffle file can be evicted to SSD and a SSD
shuffle file can be evicted to HDFS. | 0.5.1 | |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false |
Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file
writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a
file writer to create if its creation was failed. | 0.2.0 | |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
index b73b901ad..86d53f9a9 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java
@@ -21,6 +21,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.PartitionSplitMode;
import org.apache.celeborn.common.protocol.PartitionType;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.util.Utils;
public class PartitionDataWriterContext {
@@ -34,6 +35,7 @@ public class PartitionDataWriterContext {
private final boolean partitionSplitEnabled;
private final String shuffleKey;
private final PartitionType partitionType;
+ private StorageInfo.Type storageType = null;
public PartitionDataWriterContext(
long splitThreshold,
@@ -96,4 +98,12 @@ public class PartitionDataWriterContext {
public PartitionType getPartitionType() {
return partitionType;
}
+
+ public StorageInfo.Type getStorageType() {
+ return storageType;
+ }
+
+ public void setStorageType(StorageInfo.Type storageType) {
+ this.storageType = storageType;
+ }
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 8ee278b45..2f8767383 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -60,6 +60,7 @@ import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingSta
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
import
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter,
StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.StoragePolicy
private[celeborn] class Worker(
override val conf: CelebornConf,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala
new file mode 100644
index 000000000..5d67f75d8
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.File
+import java.nio.channels.FileChannel
+
+import io.netty.buffer.{ByteBuf, CompositeByteBuf}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+
+abstract class CelebornFile {
+ var fileInfo: FileInfo = _
+ var flushBuffer: CompositeByteBuf = _
+ val flushLock = new AnyRef
+ var flusher: Flusher = _
+ var flushWorkerIndex: Int = _
+ var writerCloseTimeoutMs: Long = _
+ var flusherBufferSize = 0L
+ var source: AbstractSource = _ // metrics
+ var chunkSize: Long = _
+ var metricsCollectCriticalEnabled = false
+ var storageType: StorageInfo.Type = _
+
+ def write(buf: ByteBuf): Unit
+
+ def needEvict: Boolean
+
+ def evict(file: CelebornFile): Unit
+
+ def close(): Unit
+}
+
+class CelebornMemoryFile(
+ conf: CelebornConf,
+ source: AbstractSource,
+ fileInfo: MemoryFileInfo,
+ storageType: StorageInfo.Type) extends CelebornFile {
+
+ override def write(buf: ByteBuf): Unit = {}
+
+ override def needEvict: Boolean = ???
+
+ override def evict(file: CelebornFile): Unit = ???
+
+ override def close(): Unit = ???
+}
+
+class CelebornDiskFile(
+ flusher: Flusher,
+ diskFileInfo: DiskFileInfo,
+ workingDir: File,
+ storageType: StorageInfo.Type) extends CelebornFile {
+ private var channel: FileChannel = null
+
+ override def write(buf: ByteBuf): Unit = ???
+
+ override def needEvict: Boolean = ???
+
+ override def evict(file: CelebornFile): Unit = ???
+
+ override def close(): Unit = ???
+}
+
+class CelebornDFSFile(flusher: Flusher, hdfsFileInfo: DiskFileInfo,
storageType: StorageInfo.Type)
+ extends CelebornFile {
+
+ override def write(buf: ByteBuf): Unit = ???
+
+ override def needEvict: Boolean = ???
+
+ override def evict(file: CelebornFile): Unit = ???
+
+ override def close(): Unit = ???
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala
new file mode 100644
index 000000000..4c4a4b5b4
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+
+class CelebornFileProxy(
+ partitionDataWriterContext: PartitionDataWriterContext,
+ storageManager: StorageManager,
+ conf: CelebornConf,
+ source: AbstractSource) {
+ var currentFile: CelebornFile = _
+ var flusher: Flusher = null
+ var flushWorkerIndex = 0
+
+ currentFile =
storageManager.storagePolicy.createFile(partitionDataWriterContext)
+
+ def write(buf: ByteBuf) = {
+ this.synchronized {
+ currentFile.write(buf)
+ }
+ }
+
+ def evict(force: Boolean) = {
+ if (currentFile.needEvict || force) {
+ this.synchronized {
+ val nFile =
+ storageManager.storagePolicy.getEvictedFile(currentFile,
partitionDataWriterContext)
+ currentFile.evict(nFile)
+ currentFile = nFile
+ }
+ }
+ }
+
+ def close(): Unit = {}
+
+ def isMemoryShuffleFile: Boolean = {
+ currentFile.storageType == StorageInfo.Type.MEMORY
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e767060c7..34579327a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -68,6 +68,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hasHDFSStorage = conf.hasHDFSStorage
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
+ val storagePolicy = new StoragePolicy(conf, this, workerSource)
// (deviceName -> deviceInfo) and (mount point -> diskInfo)
val (deviceInfos, diskInfos) = {
@@ -873,7 +874,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
&& MemoryManager.instance().memoryFileStorageAvailable()) {
logDebug(s"Create memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
(
- createMemoryFile(
+ createMemoryFileInfo(
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
@@ -899,7 +900,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
- def createMemoryFile(
+ def createMemoryFileInfo(
appId: String,
shuffleId: Int,
fileName: String,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
new file mode 100644
index 000000000..228063d81
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.CelebornIOException
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.StorageInfo
+
+class StoragePolicy(conf: CelebornConf, storageManager: StorageManager,
source: AbstractSource)
+ extends Logging {
+ var createFileOrder: Option[List[String]] =
conf.workerStoragePolicyCreateFilePolicy
+ var evictFileOrder: Option[Map[String, List[String]]] =
conf.workerStoragePolicyEvictFilePolicy
+
+ def getEvictedFile(
+ celebornFile: CelebornFile,
+ partitionDataWriterContext: PartitionDataWriterContext): CelebornFile = {
+ evictFileOrder.foreach { order =>
+ val orderList = order.get(celebornFile.storageType.name())
+ if (orderList != null) {
+ return createFile(partitionDataWriterContext, orderList)
+ }
+ }
+ null
+ }
+
+ def createFile(
+ partitionDataWriterContext: PartitionDataWriterContext,
+ order: Option[List[String]] = createFileOrder): CelebornFile = {
+ logDebug(
+ s"create file for ${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+ val location = partitionDataWriterContext.getPartitionLocation
+
+ def tryCreateFileByType(storageInfoType: StorageInfo.Type): CelebornFile =
{
+ try {
+ storageInfoType match {
+ case StorageInfo.Type.MEMORY =>
+ logDebug(s"Create memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+ val memoryFileInfo = storageManager.createMemoryFileInfo(
+ partitionDataWriterContext.getAppId,
+ partitionDataWriterContext.getShuffleId,
+ location.getFileName,
+ partitionDataWriterContext.getUserIdentifier,
+ partitionDataWriterContext.getPartitionType,
+ partitionDataWriterContext.isPartitionSplitEnabled)
+ partitionDataWriterContext.setStorageType(storageInfoType)
+ new CelebornMemoryFile(conf, source, memoryFileInfo,
storageInfoType)
+ case StorageInfo.Type.HDD | StorageInfo.Type.SSD |
StorageInfo.Type.HDFS | StorageInfo.Type.OSS =>
+ logDebug(s"create non-memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+ val (flusher, diskFileInfo, workingDir) =
storageManager.createDiskFile(
+ location,
+ partitionDataWriterContext.getAppId,
+ partitionDataWriterContext.getShuffleId,
+ location.getFileName,
+ partitionDataWriterContext.getUserIdentifier,
+ partitionDataWriterContext.getPartitionType,
+ partitionDataWriterContext.isPartitionSplitEnabled)
+ if (storageInfoType == StorageInfo.Type.HDD || storageInfoType ==
StorageInfo.Type.SSD) {
+ new CelebornDiskFile(flusher, diskFileInfo, workingDir,
storageInfoType)
+ } else {
+ new CelebornDFSFile(flusher, diskFileInfo, storageInfoType)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"create celeborn file for storage ${storageInfoType}
failed", e)
+ null
+ }
+ }
+
+ order.foreach(lst => {
+ for (storageStr <- lst) {
+ val storageInfoType = StorageInfo.fromStrToType(storageStr)
+ val file = tryCreateFileByType(storageInfoType)
+ if (file != null) {
+ return file
+ }
+ }
+ })
+
+ throw new CelebornIOException(s"Create file failed for
${partitionDataWriterContext}")
+ }
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala
new file mode 100644
index 000000000..c2cfd596b
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import java.io.File
+
+import org.mockito.ArgumentMatchers.any
+import org.mockito.MockitoSugar.mock
+import org.mockito.MockitoSugar.when
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.meta.{DiskFileInfo, MemoryFileInfo}
+import org.apache.celeborn.common.metrics.source.AbstractSource
+import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
+
+class StoragePolicySuite extends CelebornFunSuite {
+ val mockedStorageManager: StorageManager = mock[StorageManager]
+ val mockedSource: AbstractSource = mock[AbstractSource]
+ val mockedPartitionWriterContext: PartitionDataWriterContext =
mock[PartitionDataWriterContext]
+
+ val mockedCelebornMemoryFile = mock[MemoryFileInfo]
+ when(
+ mockedStorageManager.createMemoryFileInfo(any(), any(), any(), any(),
any(), any())).thenAnswer(
+ mockedCelebornMemoryFile)
+
+ val mockedDiskFile = mock[DiskFileInfo]
+ val mockedFlusher = mock[Flusher]
+ val mockedFile = mock[File]
+ when(
+ mockedStorageManager.createDiskFile(
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
+
+ val mockedPartitionLocation =
+ new PartitionLocation(1, 1, "h1", 1, 2, 3, 4,
PartitionLocation.Mode.PRIMARY)
+
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(mockedPartitionLocation)
+
+ test("test create file order case1") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"MEMORY,SSD,HDD,HDFS,OSS")
+ val storagePolicy = new StoragePolicy(conf, mockedStorageManager,
mockedSource)
+ val file = storagePolicy.createFile(mockedPartitionWriterContext)
+ assert(file.isInstanceOf[CelebornMemoryFile])
+ }
+
+ test("test create file order case2") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"SSD,HDD,HDFS,OSS")
+ val storagePolicy = new StoragePolicy(conf, mockedStorageManager,
mockedSource)
+ val file = storagePolicy.createFile(mockedPartitionWriterContext)
+ assert(file.isInstanceOf[CelebornDiskFile])
+ }
+
+ test("test getEvicted file case1") {
+ val mockedMemoryFile = mock[CelebornMemoryFile]
+ val conf = new CelebornConf()
+ val storagePolicy = new StoragePolicy(conf, mockedStorageManager,
mockedSource)
+ when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY)
+ val nFile = storagePolicy.getEvictedFile(mockedMemoryFile,
mockedPartitionWriterContext)
+ assert(nFile.isInstanceOf[CelebornDiskFile])
+ }
+}