eolivelli commented on code in PR #3387:
URL: https://github.com/apache/celeborn/pull/3387#discussion_r2811403810


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -291,15 +251,118 @@ private[deploy] class Controller(
       Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs))
     workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() + 
replicaLocs.size())
 
-    logInfo(s"Reserved ${primaryLocs.size()} primary location " +
-      s"${primaryLocs.asScala.map(_.getUniqueId).mkString(",")} and 
${replicaLocs.size()} replica location " +
-      s"${replicaLocs.asScala.map(_.getUniqueId).mkString(",")} for 
$shuffleKey ")
+    logInfo(s"Reserved ${primaryLocs.size()} primary location" +
+      s" and ${replicaLocs.size()} replica location for $shuffleKey ")
     if (log.isDebugEnabled()) {
       logDebug(s"primary: $primaryLocs\nreplica: $replicaLocs.")
     }
     context.reply(ReserveSlotsResponse(StatusCode.SUCCESS))
   }
 
+  private def createWriters(
+      shuffleKey: String,
+      applicationId: String,
+      shuffleId: Int,
+      requestLocs: jList[PartitionLocation],
+      splitThreshold: Long,
+      splitMode: PartitionSplitMode,
+      partitionType: PartitionType,
+      rangeReadFilter: Boolean,
+      userIdentifier: UserIdentifier,
+      partitionSplitEnabled: Boolean,
+      isSegmentGranularityVisible: Boolean,
+      isPrimary: Boolean): jList[PartitionLocation] = {
+    val partitionLocations = new jArrayList[PartitionLocation]()
+    try {
+      def createWriter(partitionLocation: PartitionLocation): 
PartitionLocation = {
+        createPartitionDataWriter(
+          shuffleKey,
+          applicationId,
+          shuffleId,
+          partitionLocation,
+          splitThreshold,
+          splitMode,
+          partitionType,
+          rangeReadFilter,
+          userIdentifier,
+          partitionSplitEnabled,
+          isSegmentGranularityVisible,
+          isPrimary)
+      }
+      if (createWriterThreadPool == null) {
+        partitionLocations.addAll(requestLocs.asScala.map(createWriter).asJava)
+      } else {
+        partitionLocations.addAll(Utils.tryFuturesWithTimeout(
+          requestLocs.asScala.map(requestLoc =>
+            
Utils.future(createWriter(requestLoc))(createWriterThreadPool)).toList,
+          createWriterParallelTimeout,
+          s"Create FileWriter for $shuffleKey timeout.").asJava)
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Create FileWriter for $shuffleKey failed.", e)
+    }
+    partitionLocations
+  }
+
+  private def createPartitionDataWriter(
+      shuffleKey: String,
+      applicationId: String,
+      shuffleId: Int,
+      requestLoc: PartitionLocation,
+      splitThreshold: Long,
+      splitMode: PartitionSplitMode,
+      partitionType: PartitionType,
+      rangeReadFilter: Boolean,
+      userIdentifier: UserIdentifier,
+      partitionSplitEnabled: Boolean,
+      isSegmentGranularityVisible: Boolean,
+      isPrimary: Boolean): PartitionLocation = {
+    try {
+      var location =
+        if (isPrimary) {
+          partitionLocationInfo.getPrimaryLocation(
+            shuffleKey,
+            requestLoc.getUniqueId)
+        } else {
+          partitionLocationInfo.getReplicaLocation(
+            shuffleKey,
+            requestLoc.getUniqueId)
+        }
+      if (location == null) {
+        location = requestLoc
+        val writer = storageManager.createPartitionDataWriter(
+          applicationId,
+          shuffleId,
+          location,
+          splitThreshold,
+          splitMode,
+          partitionType,
+          rangeReadFilter,
+          userIdentifier,
+          partitionSplitEnabled,
+          isSegmentGranularityVisible)
+        new WorkingPartition(location, writer)
+      } else {
+        location
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Create FileWriter for $requestLoc $shuffleKey failed.", e)
+        throw e
+    }
+  }
+
+  private def destroyWriters(
+      partitionLocations: jList[PartitionLocation],
+      shuffleKey: String): Unit = {
+    partitionLocations.asScala.foreach { partitionLocation =>
+      val fileWriter = 
partitionLocation.asInstanceOf[WorkingPartition].getFileWriter

Review Comment:
   is it possible that here the value is not an WorkingPartition ?
   
   I am asking because in the code above it is possible that there is already a 
PartitionLocation, that could have been put there from other code paths
   
   
https://github.com/apache/celeborn/pull/3387/changes#diff-ddad2bbb9547ade8e07445e08a7f97dbc717b594d43e811436ea42a76f0ce0fcR347
   
   we should destroy only the writers that have been created by this loop
   
   in my patch I kept a list of Writers created by the loop and destroyed only 
them in case of failure
   https://github.com/apache/celeborn/pull/3598



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to