This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new cb0d30089 [CELEBORN-2063] Parallelize the create partition writer in
handleReserveSlots to speed up the reserveSlots RPC process time
cb0d30089 is described below
commit cb0d30089c9c9155e2664ce7d3440ded8749db9c
Author: SteNicholas <[email protected]>
AuthorDate: Tue Feb 17 16:03:13 2026 +0800
[CELEBORN-2063] Parallelize the create partition writer in
handleReserveSlots to speed up the reserveSlots RPC process time
### What changes were proposed in this pull request?
Parallelize the create partition writer in `handleReserveSlots` to speed up
the reserveSlots RPC process time。
### Why are the changes needed?
The creation of partition writer in `handleReserveSlots` could use
parallelize way to speed up the reserveSlots RPC process time.
### Does this PR introduce _any_ user-facing change?
Introduce `celeborn.worker.writer.create.parallel.enabled`,
`celeborn.worker.writer.create.parallel.threads` and
`eleborn.worker.writer.create.parallel.timeout` to config parallelize the
creation of file writer.
### How was this patch tested?
CI.
Closes #3387 from SteNicholas/CELEBORN-2063.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 8e6f4d5f95f58238913bf6f5bc769e5508d64efe)
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 55 ++++++
.../org/apache/celeborn/common/util/Utils.scala | 82 +++++++-
docs/configuration/worker.md | 10 +-
.../service/deploy/worker/Controller.scala | 212 ++++++++++++++-------
.../celeborn/service/deploy/worker/Worker.scala | 14 ++
5 files changed, 289 insertions(+), 84 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3447d5dbd..15053091c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1341,6 +1341,17 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
def workerCreateWriterMaxAttempts: Int =
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
+<<<<<<< HEAD
+=======
+ def workerCreateWriterParallelEnabled: Boolean =
get(WORKER_WRITER_CREATE_PARALLEL_ENABLED)
+ def workerCreateWriterParallelThreads: Int =
+
get(WORKER_WRITER_CREATE_PARALLEL_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
+ def workerCreateWriterParallelTimeout: Long =
get(WORKER_WRITER_CREATE_PARALLEL_TIMEOUT)
+ def workerWriterHdfsCreateAuxiliaryFileMaxRetries: Int =
+ get(WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_MAX_RETRIES)
+ def workerWriterHdfsCreateAuxiliaryFileRetryWait: Long =
+ get(WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_RETRY_WAIT)
+>>>>>>> 8e6f4d5f9 ([CELEBORN-2063] Parallelize the create partition writer in
handleReserveSlots to speed up the reserveSlots RPC process time)
def workerFlusherLocalGatherAPIEnabled: Boolean =
get(WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED)
// //////////////////////////////////////////////////////
@@ -4038,6 +4049,50 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(3)
+<<<<<<< HEAD
+=======
+ val WORKER_WRITER_CREATE_PARALLEL_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.writer.create.parallel.enabled")
+ .categories("worker")
+ .version("0.6.3")
+ .doc("Whether to parallelize the creation of file writer.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val WORKER_WRITER_CREATE_PARALLEL_THREADS: OptionalConfigEntry[Int] =
+ buildConf("celeborn.worker.writer.create.parallel.threads")
+ .categories("worker")
+ .version("0.6.3")
+ .doc("Thread number of worker to parallelize the creation of file
writer.")
+ .intConf
+ .createOptional
+
+ val WORKER_WRITER_CREATE_PARALLEL_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.writer.create.parallel.timeout")
+ .categories("worker")
+ .version("0.6.3")
+ .doc("Timeout for a worker to create a file writer in parallel.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("120s")
+
+ val WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_MAX_RETRIES: ConfigEntry[Int] =
+ buildConf("celeborn.worker.writer.hdfs.createAuxiliaryFile.maxRetries")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("Retry count for a auxiliary file including index file and success
file with HDFS storage to create" +
+ " if its creation was failed.")
+ .intConf
+ .createWithDefault(5)
+
+ val WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_RETRY_WAIT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.writer.hdfs.createAuxiliaryFile.retryWait")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("Wait interval after failure to create a auxiliary file with HDFS
storage and then retry it.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("200ms")
+
+>>>>>>> 8e6f4d5f9 ([CELEBORN-2063] Parallelize the create partition writer in
handleReserveSlots to speed up the reserveSlots RPC process time)
val WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.flusher.local.gatherAPI.enabled")
.internal
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 7dec9f086..900356c9d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1004,11 +1004,14 @@ object Utils extends Logging {
}
/**
- * if the action is timeout, will return the callback result
- * if other exception will be thrown directly
+ * If the action is timeout, will return the callback result.
+ * If other exception will be thrown directly.
+ *
* @param block the normal action block
* @param callback callback if timeout
+ * @param threadPool thread pool to submit
* @param timeoutInSeconds timeout limit value in seconds
+ * @param errorMessage error message to log exception
* @tparam T result type
* @return result
*/
@@ -1016,15 +1019,45 @@ object Utils extends Logging {
threadPool: ThreadPoolExecutor,
timeoutInSeconds: Long = 10,
errorMessage: String = "none"): T = {
- val futureTask = new Callable[T] {
+ tryFutureWithTimeoutAndCallback(callback)(
+ future(block)(threadPool),
+ timeoutInSeconds,
+ errorMessage)
+ }
+
+ /**
+ * Create future that thread pool submits future task.
+ *
+ * @param block the normal action block
+ * @param threadPool thread pool to submit
+ * @tparam T result type
+ * @return future
+ */
+ def future[T](block: => T)(
+ threadPool: ThreadPoolExecutor): java.util.concurrent.Future[T] = {
+ threadPool.submit(new Callable[T] {
override def call(): T = {
block
}
- }
+ })
+ }
- var future: java.util.concurrent.Future[T] = null
+ /**
+ * If the action is timeout, will return the callback result.
+ * If other exception will be thrown directly.
+ *
+ * @param callback callback if timeout
+ * @param future future to try with timeout and callback
+ * @param timeoutInSeconds timeout limit value in seconds
+ * @param errorMessage error message to log exception
+ * @tparam T result type
+ * @return result
+ */
+ def tryFutureWithTimeoutAndCallback[T](callback: => T)(
+ future: java.util.concurrent.Future[T],
+ timeoutInSeconds: Long = 10,
+ errorMessage: String = "none"): T = {
try {
- future = threadPool.submit(futureTask)
future.get(timeoutInSeconds, TimeUnit.SECONDS)
} catch {
case _: TimeoutException =>
@@ -1034,9 +1067,40 @@ object Utils extends Logging {
case throwable: Throwable =>
throw throwable
} finally {
- if (null != future && !future.isCancelled) {
- future.cancel(true)
- }
+ cancelFuture(future)
+ }
+ }
+
+ /**
+ * If the action is timeout, will return the callback result.
+ * If other exception will be thrown directly.
+ *
+ * @param futures futures to try with timeout and callback
+ * @param timeoutInSeconds timeout limit value in seconds
+ * @param errorMessage error message to log exception
+ * @tparam T result type
+ * @return results
+ */
+ def tryFuturesWithTimeout[T](
+ futures: List[java.util.concurrent.Future[T]],
+ timeoutInSeconds: Long = 10,
+ errorMessage: String = "none"): List[T] = {
+ try {
+ futures.map(_.get(timeoutInSeconds, TimeUnit.SECONDS))
+ } catch {
+ case throwable: Throwable =>
+ logError(
+ s"${throwable.getClass.getSimpleName} in thread
${Thread.currentThread().getName}," +
+ s" error message: $errorMessage")
+ throw throwable
+ } finally {
+ futures.foreach(cancelFuture)
+ }
+ }
+
+ def cancelFuture[T](future: java.util.concurrent.Future[T]): Unit = {
+ if (null != future && !future.isCancelled) {
+ future.cancel(true)
}
}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index d9db38d0b..5eb90f155 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -77,7 +77,7 @@ license: |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval
of checking whether all the shuffle expired during worker decommission | 0.4.0
| |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time
of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max
ratio of direct memory to store shuffle data. This feature is experimental and
disabled by default. | 0.5.0 | |
-| celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | false | Max ratio of
direct memory for read buffer | 0.2.0 | |
+| celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of
direct memory for read buffer | 0.2.0 | |
| celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct
memory usage is above this limit, the worker will merge low utilization push
data's body buffer | 0.6.2 | |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | false | If direct
memory usage reaches this limit, the worker will stop to receive data from
Celeborn shuffle clients. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct
memory usage reaches this limit, the worker will stop to receive replication
data from other workers. This value should be higher than
celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | |
@@ -100,6 +100,7 @@ license: |
| celeborn.worker.flusher.threads | 16 | false | Flusher's thread count per
disk for unknown-type disks. | 0.2.0 | |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | false |
The wait interval of checking whether all released slots to be committed or
destroyed during worker graceful shutdown | 0.2.0 | |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | false
| The wait time of waiting for the released slots to be committed or destroyed
during worker graceful shutdown. | 0.2.0 | |
+| celeborn.worker.graceful.shutdown.dbDeleteFailurePolicy | IGNORE | false |
Policy for handling DB delete failures during graceful shutdown. THROW: throw
exception, EXIT: trigger graceful shutdown, IGNORE: log error and continue
(default). | 0.7.0 | |
| celeborn.worker.graceful.shutdown.enabled | false | false | When true,
during worker shutdown, the worker will wait for all released slots to be
committed or destroyed. | 0.2.0 | |
| celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s |
false | The wait time of waiting for sorting partition files during worker
graceful shutdown. | 0.2.0 | |
| celeborn.worker.graceful.shutdown.recoverDbBackend | ROCKSDB | false |
Specifies a disk-based store used in local db. ROCKSDB or LEVELDB (deprecated).
| 0.4.0 | |
@@ -107,6 +108,7 @@ license: |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s |
false | Interval for a Celeborn worker to flush committed file infos into DB. |
0.3.1 | |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false
| Whether to call sync method to save committed file infos into DB to handle OS
crash. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's
graceful shutdown timeout time. | 0.2.0 | |
+| celeborn.worker.hdfs.replication.factor | 2 | false | HDFS replication
factor for shuffle files. | 0.7.0 | |
| celeborn.worker.http.auth.administers | | false | A comma-separated list of
users who have admin privileges, Note, when
celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as
administrator. | 0.6.0 | |
| celeborn.worker.http.auth.basic.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined password authentication implementation of
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0
| |
| celeborn.worker.http.auth.bearer.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined token authentication implementation of
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 |
|
@@ -175,6 +177,7 @@ license: |
| celeborn.worker.replicate.port | 0 | false | Server port for Worker to
receive replicate data request from other Workers. | 0.2.0 | |
| celeborn.worker.replicate.randomConnection.enabled | true | false | Whether
worker will create random connection to peer when replicate data. When false,
worker tend to reuse the same cached TransportClient to a specific replicate
worker; when true, worker tend to use different cached TransportClient. Netty
will use the same thread to serve the same connection, so with more connections
replicate server can leverage more netty threads | 0.2.1 | |
| celeborn.worker.replicate.threads | 64 | false | Thread number of worker to
replicate shuffle data. | 0.2.0 | |
+| celeborn.worker.reuse.hdfs.outputStream.enabled | false | false | Whether to
enable reuse output stream on hdfs. | 0.7.0 | |
| celeborn.worker.rpc.port | 0 | false | Server port for Worker to receive RPC
request. | 0.2.0 | |
| celeborn.worker.shuffle.partitionSplit.enabled | true | false | enable the
partition split on worker side | 0.3.0 |
celeborn.worker.partition.split.enabled |
| celeborn.worker.shuffle.partitionSplit.max | 2g | false | Specify the
maximum partition size for splitting, and ensure that individual partition
files are always smaller than this limit. | 0.3.0 | |
@@ -197,5 +200,10 @@ license: |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false |
Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file
writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a
file writer to create if its creation was failed. | 0.2.0 | |
+| celeborn.worker.writer.create.parallel.enabled | false | false | Whether to
parallelize the creation of file writer. | 0.6.3 | |
+| celeborn.worker.writer.create.parallel.threads | <undefined> | false |
Thread number of worker to parallelize the creation of file writer. | 0.6.3 |
|
+| celeborn.worker.writer.create.parallel.timeout | 120s | false | Timeout for
a worker to create a file writer in parallel. | 0.6.3 | |
+| celeborn.worker.writer.hdfs.createAuxiliaryFile.maxRetries | 5 | false |
Retry count for a auxiliary file including index file and success file with
HDFS storage to create if its creation was failed. | 0.7.0 | |
+| celeborn.worker.writer.hdfs.createAuxiliaryFile.retryWait | 200ms | false |
Wait interval after failure to create a auxiliary file with HDFS storage and
then retry it. | 0.7.0 | |
| worker.flush.reuseCopyBuffer.enabled | true | false | Whether to enable
reuse copy buffer for flush. Note that this copy buffer must not be referenced
again after flushing. This means that, for example, the Hdfs(Oss or S3) client
will not asynchronously access this buffer after the flush method returns,
otherwise data modification problems will occur. | 0.6.1 | |
<!--end-include-->
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index cd2485b0a..d281b2dbb 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -32,14 +32,14 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{ReduceFileMeta, WorkerInfo,
WorkerPartitionLocationInfo}
+import org.apache.celeborn.common.meta.{WorkerInfo,
WorkerPartitionLocationInfo}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.util.{JavaUtils, Utils}
-import
org.apache.celeborn.service.deploy.worker.storage.{MapPartitionMetaHandler,
PartitionDataWriter, SegmentMapPartitionMetaHandler, StorageManager}
+import
org.apache.celeborn.service.deploy.worker.storage.{MapPartitionMetaHandler,
PartitionDataWriter, StorageManager}
private[deploy] class Controller(
override val rpcEnv: RpcEnv,
@@ -64,11 +64,13 @@ private[deploy] class Controller(
var commitThreadPool: ThreadPoolExecutor = _
var commitFinishedChecker: ScheduledExecutorService = _
var asyncReplyPool: ScheduledExecutorService = _
+ var createWriterThreadPool: ThreadPoolExecutor = _
val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate
var shutdown: AtomicBoolean = _
val defaultPushdataTimeout = conf.pushDataTimeoutMs
val mockCommitFilesFailure = conf.testMockCommitFilesFailure
val shuffleCommitTimeout = conf.workerShuffleCommitTimeout
+ val createWriterParallelTimeout = conf.workerCreateWriterParallelTimeout
val workerCommitFilesCheckInterval = conf.workerCommitFilesCheckInterval
def init(worker: Worker): Unit = {
@@ -83,6 +85,7 @@ private[deploy] class Controller(
timer = worker.timer
commitThreadPool = worker.commitThreadPool
asyncReplyPool = worker.asyncReplyPool
+ createWriterThreadPool = worker.createWriterThreadPool
shutdown = worker.shutdown
commitFinishedChecker = worker.commitFinishedChecker
@@ -192,88 +195,45 @@ private[deploy] class Controller(
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR,
msg))
return
}
- val primaryLocs = new jArrayList[PartitionLocation]()
- try {
- for (ind <- 0 until requestPrimaryLocs.size()) {
- var location = partitionLocationInfo.getPrimaryLocation(
- shuffleKey,
- requestPrimaryLocs.get(ind).getUniqueId)
- if (location == null) {
- location = requestPrimaryLocs.get(ind)
- val writer = storageManager.createPartitionDataWriter(
- applicationId,
- shuffleId,
- location,
- splitThreshold,
- splitMode,
- partitionType,
- rangeReadFilter,
- userIdentifier,
- partitionSplitEnabled,
- isSegmentGranularityVisible)
- primaryLocs.add(new WorkingPartition(location, writer))
- } else {
- primaryLocs.add(location)
- }
- }
- } catch {
- case e: Exception =>
- logError(s"CreateWriter for $shuffleKey failed.", e)
- }
+ val primaryLocs = createWriters(
+ shuffleKey,
+ applicationId,
+ shuffleId,
+ requestPrimaryLocs,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible,
+ isPrimary = true)
if (primaryLocs.size() < requestPrimaryLocs.size()) {
val msg = s"Not all primary partition satisfied for $shuffleKey"
logWarning(s"[handleReserveSlots] $msg, will destroy writers.")
- primaryLocs.asScala.foreach { partitionLocation =>
- val fileWriter =
partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
- fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter
caused by " +
- s"reserving slots failed for $shuffleKey."))
- }
+ destroyWriters(primaryLocs, shuffleKey)
context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg))
return
}
- val replicaLocs = new jArrayList[PartitionLocation]()
- try {
- for (ind <- 0 until requestReplicaLocs.size()) {
- var location =
- partitionLocationInfo.getReplicaLocation(
- shuffleKey,
- requestReplicaLocs.get(ind).getUniqueId)
- if (location == null) {
- location = requestReplicaLocs.get(ind)
- val writer = storageManager.createPartitionDataWriter(
- applicationId,
- shuffleId,
- location,
- splitThreshold,
- splitMode,
- partitionType,
- rangeReadFilter,
- userIdentifier,
- partitionSplitEnabled,
- isSegmentGranularityVisible)
- replicaLocs.add(new WorkingPartition(location, writer))
- } else {
- replicaLocs.add(location)
- }
- }
- } catch {
- case e: Exception =>
- logError(s"CreateWriter for $shuffleKey failed.", e)
- }
+ val replicaLocs = createWriters(
+ shuffleKey,
+ applicationId,
+ shuffleId,
+ requestReplicaLocs,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible,
+ isPrimary = false)
if (replicaLocs.size() < requestReplicaLocs.size()) {
val msg = s"Not all replica partition satisfied for $shuffleKey"
logWarning(s"[handleReserveSlots] $msg, destroy writers.")
- primaryLocs.asScala.foreach { partitionLocation =>
- val fileWriter =
partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
- fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter
caused by " +
- s"reserving slots failed for $shuffleKey."))
- }
- replicaLocs.asScala.foreach { partitionLocation =>
- val fileWriter =
partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
- fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter
caused by " +
- s"reserving slots failed for $shuffleKey."))
- }
+ destroyWriters(primaryLocs, shuffleKey)
+ destroyWriters(replicaLocs, shuffleKey)
context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg))
return
}
@@ -299,6 +259,110 @@ private[deploy] class Controller(
context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
}
+ private def createWriters(
+ shuffleKey: String,
+ applicationId: String,
+ shuffleId: Int,
+ requestLocs: jList[PartitionLocation],
+ splitThreshold: Long,
+ splitMode: PartitionSplitMode,
+ partitionType: PartitionType,
+ rangeReadFilter: Boolean,
+ userIdentifier: UserIdentifier,
+ partitionSplitEnabled: Boolean,
+ isSegmentGranularityVisible: Boolean,
+ isPrimary: Boolean): jList[PartitionLocation] = {
+ val partitionLocations = new jArrayList[PartitionLocation]()
+ try {
+ def createWriter(partitionLocation: PartitionLocation):
PartitionLocation = {
+ createPartitionDataWriter(
+ shuffleKey,
+ applicationId,
+ shuffleId,
+ partitionLocation,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible,
+ isPrimary)
+ }
+ if (createWriterThreadPool == null) {
+ partitionLocations.addAll(requestLocs.asScala.map(createWriter).asJava)
+ } else {
+ partitionLocations.addAll(Utils.tryFuturesWithTimeout(
+ requestLocs.asScala.map(requestLoc =>
+
Utils.future(createWriter(requestLoc))(createWriterThreadPool)).toList,
+ createWriterParallelTimeout,
+ s"Create FileWriter for $shuffleKey timeout.").asJava)
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Create FileWriter for $shuffleKey failed.", e)
+ }
+ partitionLocations
+ }
+
+ private def createPartitionDataWriter(
+ shuffleKey: String,
+ applicationId: String,
+ shuffleId: Int,
+ requestLoc: PartitionLocation,
+ splitThreshold: Long,
+ splitMode: PartitionSplitMode,
+ partitionType: PartitionType,
+ rangeReadFilter: Boolean,
+ userIdentifier: UserIdentifier,
+ partitionSplitEnabled: Boolean,
+ isSegmentGranularityVisible: Boolean,
+ isPrimary: Boolean): PartitionLocation = {
+ try {
+ var location =
+ if (isPrimary) {
+ partitionLocationInfo.getPrimaryLocation(
+ shuffleKey,
+ requestLoc.getUniqueId)
+ } else {
+ partitionLocationInfo.getReplicaLocation(
+ shuffleKey,
+ requestLoc.getUniqueId)
+ }
+ if (location == null) {
+ location = requestLoc
+ val writer = storageManager.createPartitionDataWriter(
+ applicationId,
+ shuffleId,
+ location,
+ splitThreshold,
+ splitMode,
+ partitionType,
+ rangeReadFilter,
+ userIdentifier,
+ partitionSplitEnabled,
+ isSegmentGranularityVisible)
+ new WorkingPartition(location, writer)
+ } else {
+ location
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Create FileWriter for $requestLoc $shuffleKey failed.", e)
+ throw e
+ }
+ }
+
+ private def destroyWriters(
+ partitionLocations: jList[PartitionLocation],
+ shuffleKey: String): Unit = {
+ partitionLocations.asScala.foreach { partitionLocation =>
+ val fileWriter =
partitionLocation.asInstanceOf[WorkingPartition].getFileWriter
+ fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter
caused by " +
+ s"reserving slots failed for $shuffleKey."))
+ }
+ }
+
private def commitFiles(
shuffleKey: String,
uniqueIds: jList[String],
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index be382e36f..8786ad232 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -346,6 +346,7 @@ private[celeborn] class Worker(
conf.workerCleanThreads)
val asyncReplyPool: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-rpc-async-replier")
+ var createWriterThreadPool: ThreadPoolExecutor = _
val timer = new
HashedWheelTimer(ThreadUtils.namedSingleThreadFactory("worker-timer"))
// Configs
@@ -581,6 +582,13 @@ private[celeborn] class Worker(
}
})
+ if (conf.workerCreateWriterParallelEnabled) {
+ createWriterThreadPool =
+ ThreadUtils.newDaemonFixedThreadPool(
+ conf.workerCreateWriterParallelThreads,
+ "worker-writer-creator")
+ }
+
pushDataHandler.init(this)
replicateHandler.init(this)
fetchHandler.init(this)
@@ -628,12 +636,18 @@ private[celeborn] class Worker(
commitThreadPool.shutdown()
commitFinishedChecker.shutdown();
asyncReplyPool.shutdown()
+ if (createWriterThreadPool != null) {
+ createWriterThreadPool.shutdown()
+ }
} else {
forwardMessageScheduler.shutdownNow()
replicateThreadPool.shutdownNow()
commitThreadPool.shutdownNow()
commitFinishedChecker.shutdownNow();
asyncReplyPool.shutdownNow()
+ if (createWriterThreadPool != null) {
+ createWriterThreadPool.shutdownNow()
+ }
}
workerSource.appActiveConnections.clear()
partitionsSorter.close(exitKind)