waitinfuture commented on code in PR #2346:
URL:
https://github.com/apache/incubator-celeborn/pull/2346#discussion_r1514482730
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -253,11 +254,24 @@ private[celeborn] class Master(
internalRpcEndpoint)
}
+ private val sendApplicationMetaThreads =
conf.masterSendApplicationMetaThreads
+ // Send ApplicationMeta to workers
+ private var sendApplicationMetaExecutor: ExecutorService = _
+ // Maintains the mapping for the workers assigned to each application
+ private val workersAssignedToApp
Review Comment:
Since this map is not stored in ratis, in corner case where leader changes
it can happen that two masters send `APPLICATION_META` to the same worker. So I
guess we still can't throw Exception in `SecretRegistryImpl#register`?
Also, I don't think storing `workersAssignedToApp` to ratis is a good idea
because it can be large.
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -850,9 +867,46 @@ private[celeborn] class Master(
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
+ if (authEnabled) {
+ pushApplicationMetaToWorkers(requestSlots, slots)
+ }
context.reply(RequestSlotsResponse(StatusCode.SUCCESS,
slots.asInstanceOf[WorkerResource]))
}
+ def pushApplicationMetaToWorkers(
+ requestSlots: RequestSlots,
+ slots: util.Map[WorkerInfo, (util.List[PartitionLocation],
util.List[PartitionLocation])])
+ : Unit = {
+ // Pass application registration information to the workers
+ val pbApplicationMeta = PbApplicationMeta.newBuilder()
+ .setAppId(requestSlots.applicationId)
+ .setSecret(secretRegistry.getSecretKey(requestSlots.applicationId))
+ .build()
+ val transportMessage =
+ new TransportMessage(MessageType.APPLICATION_META,
pbApplicationMeta.toByteArray)
+ val workerSet = workersAssignedToApp.computeIfAbsent(
+ requestSlots.applicationId,
+ new util.function.Function[String, util.Set[WorkerInfo]] {
+ override def apply(key: String): util.Set[WorkerInfo] =
+ util.Collections.newSetFromMap(new util.concurrent.ConcurrentHashMap[
Review Comment:
nit: `JavaUtils.newConcurrentHashMap`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]