pan3793 commented on code in PR #3598:
URL: https://github.com/apache/celeborn/pull/3598#discussion_r2799290278
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -6801,6 +6808,17 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(2)
+ val RESERVE_SLOTS_IO_THREAD_POOL_SIZE: ConfigEntry[Int] =
+ buildConf("celeborn.worker.reserve.slots.io.threads")
Review Comment:
dot for namespaces, camelCase for words
```suggestion
buildConf("celeborn.worker.reserveSlots.io.threads")
```
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -86,6 +88,10 @@ private[deploy] class Controller(
asyncReplyPool = worker.asyncReplyPool
shutdown = worker.shutdown
+ reserveSlotsThreadPool =
+
Executors.newFixedThreadPool(conf.workerReserveSlotsIoThreadPoolSize).asInstanceOf[
Review Comment:
maybe use `ThreadUtils.sameThreadExecutionContext` for threads = 1.
when something goes wrong, we will lost full stacktrace if we run the task
in another thread.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -86,6 +88,10 @@ private[deploy] class Controller(
asyncReplyPool = worker.asyncReplyPool
shutdown = worker.shutdown
+ reserveSlotsThreadPool =
+
Executors.newFixedThreadPool(conf.workerReserveSlotsIoThreadPoolSize).asInstanceOf[
Review Comment:
use Celeborn's util to create a thread pool instead of raw juc classes, to
properly set no daemon, name prefix, exception handler, etc.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -193,111 +199,145 @@ private[deploy] class Controller(
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR,
msg))
return
}
- val primaryLocs = new jArrayList[PartitionLocation]()
- try {
- for (ind <- 0 until requestPrimaryLocs.size()) {
- var location = partitionLocationInfo.getPrimaryLocation(
- shuffleKey,
- requestPrimaryLocs.get(ind).getUniqueId)
- if (location == null) {
- location = requestPrimaryLocs.get(ind)
- val writer = storageManager.createPartitionDataWriter(
- applicationId,
- shuffleId,
- location,
- splitThreshold,
- splitMode,
- partitionType,
- rangeReadFilter,
- userIdentifier,
- partitionSplitEnabled,
- isSegmentGranularityVisible)
- primaryLocs.add(new WorkingPartition(location, writer))
- } else {
- primaryLocs.add(location)
- }
- }
- } catch {
- case e: Exception =>
- logError(s"CreateWriter for $shuffleKey failed.", e)
+
+ def collectResults(
+ tasks: ArrayBuffer[CompletableFuture[PartitionLocation]],
+ createdWriters: CopyOnWriteArrayList[PartitionDataWriter],
+ startTime: Long) = {
+ val primaryFuture = CompletableFuture.allOf(tasks.toSeq: _*)
+ .whenComplete(new BiConsumer[Void, Throwable] {
+ override def accept(ignore: Void, error: Throwable): Unit = {
+ if (error != null) {
+ createdWriters.forEach(new Consumer[PartitionDataWriter] {
+ override def accept(fileWriter: PartitionDataWriter) {
+ fileWriter.destroy(new IOException(
+ s"Destroy FileWriter $fileWriter caused by " +
+ s"reserving slots failed for $shuffleKey.",
+ error))
+ }
+ })
+ } else {
+ val timeToReserveLocations = System.currentTimeMillis() -
startTime;
+ logInfo(
+ s"Reserved ${tasks.size} slots for $shuffleKey in
$timeToReserveLocations ms (with ${conf.workerReserveSlotsIoThreadPoolSize}
threads)")
Review Comment:
calculation of `${conf.workerReserveSlotsIoThreadPoolSize}` is not free,
especially on the hot path, so materialize it to save the evaluation cost every
time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]