This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c7258cfc [CELEBORN-103] add handleMapPartitionPushData to support
mappartition (#1048)
c7258cfc is described below
commit c7258cfc031e950e000db01b61b7cdc4d60f8fd8
Author: zhongqiangczq <[email protected]>
AuthorDate: Thu Dec 8 11:22:43 2022 +0800
[CELEBORN-103] add handleMapPartitionPushData to support mappartition
(#1048)
---
.../service/deploy/worker/PushDataHandler.scala | 122 +++++++++++++++++----
1 file changed, 103 insertions(+), 19 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 40cfcb13..b6b729dd 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -82,15 +82,22 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
client,
pushData,
pushData.requestId,
- () =>
- handlePushData(
- pushData,
- new SimpleRpcResponseCallback(
- Type.PUSH_DATA,
- client,
- pushData.requestId,
- pushData.shuffleKey,
- pushData.partitionUniqueId)))
+ () => {
+ val callback = new SimpleRpcResponseCallback(
+ Type.PUSH_DATA,
+ client,
+ pushData.requestId,
+ pushData.shuffleKey,
+ pushData.partitionUniqueId)
+ shufflePartitionType.getOrDefault(pushData.shuffleKey,
PartitionType.REDUCE) match {
+ case PartitionType.REDUCE => handlePushData(
+ pushData,
+ callback)
+ case PartitionType.MAP => handleMapPartitionPushData(
+ pushData,
+ callback)
+ }
+ })
case pushMergedData: PushMergedData => {
handleCore(
client,
@@ -532,6 +539,90 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
}
}
+ def handleMapPartitionPushData(pushData: PushData, callback:
RpcResponseCallback): Unit = {
+ val shuffleKey = pushData.shuffleKey
+ val mode = PartitionLocation.getMode(pushData.mode)
+ val body = pushData.body.asInstanceOf[NettyManagedBuffer].getBuf
+ val isMaster = mode == PartitionLocation.Mode.MASTER
+
+ val key = s"${pushData.requestId}"
+ if (isMaster) {
+ workerSource.startTimer(WorkerSource.MasterPushDataTime, key)
+ } else {
+ workerSource.startTimer(WorkerSource.SlavePushDataTime, key)
+ }
+
+ // find FileWriter responsible for the data
+ val location =
+ if (isMaster) {
+ partitionLocationInfo.getMasterLocation(shuffleKey,
pushData.partitionUniqueId)
+ } else {
+ partitionLocationInfo.getSlaveLocation(shuffleKey,
pushData.partitionUniqueId)
+ }
+
+ val wrappedCallback =
+ new WrappedRpcResponseCallback(
+ pushData.`type`(),
+ isMaster,
+ pushData.requestId,
+ null,
+ location,
+ if (isMaster) WorkerSource.MasterPushDataTime else
WorkerSource.SlavePushDataTime,
+ callback)
+
+ if (checkLocationNull(
+ pushData.`type`(),
+ shuffleKey,
+ pushData.partitionUniqueId,
+ null,
+ location,
+ callback,
+ wrappedCallback)) return
+
+ // During worker shutdown, worker will return HARD_SPLIT for all existed
partition.
+ // This should before return exception to make current push data can
revive and retry.
+ if (shutdown.get()) {
+ logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since
worker shutdown.")
+
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+ return
+ }
+
+ val fileWriter =
+ getFileWriterAndCheck(pushData.`type`(), location, isMaster, callback)
match {
+ case (true, _) => return
+ case (false, f: FileWriter) => f
+ }
+
+ // for mappartition we will not check whether disk full or split partition
+
+ fileWriter.incrementPendingWrites()
+
+ // for master, send data to slave
+ if (location.getPeer != null && isMaster) {
+ // to do
+ wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+ } else {
+ wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+ }
+
+ try {
+ fileWriter.write(body)
+ } catch {
+ case e: AlreadyClosedException =>
+ fileWriter.decrementPendingWrites()
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttempt =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ shuffleMapperAttempts.get(shuffleKey).get(mapId)
+ } else -1
+ // TODO just info log for ended attempt
+ logWarning(s"Append data failed for task(shuffle $shuffleKey, map
$mapId, attempt" +
+ s" $attemptId), caused by ${e.getMessage}")
+ case e: Exception =>
+ logError("Exception encountered when write.", e)
+ }
+ }
+
private def handleRpcRequest(client: TransportClient, rpcRequest:
RpcRequest): Unit = {
val msg = Message.decode(rpcRequest.body().nioByteBuffer())
val requestId = rpcRequest.requestId
@@ -701,7 +792,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
callback: RpcResponseCallback,
wrappedCallback: RpcResponseCallback): Boolean = {
if (location == null) {
- val (mapId, attemptId) = getMapAttempt(body, shuffleKey,
partitionUniqueId)
+ val (mapId, attemptId) = getMapAttempt(partitionUniqueId)
if (shuffleMapperAttempts.containsKey(shuffleKey) &&
-1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
// partition data has already been committed
@@ -792,15 +883,8 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
}
private def getMapAttempt(
- body: ByteBuf,
- shuffleKey: String,
partitionUniqueId: String): (Int, Int) = {
- shufflePartitionType.get(shuffleKey) match {
- case PartitionType.MAP => {
- val id = partitionUniqueId.split("-")(0).toInt
- (PackedPartitionId.getRawPartitionId(id),
PackedPartitionId.getAttemptId(id))
- }
- case PartitionType.REDUCE => getMapAttempt(body)
- }
+ val id = partitionUniqueId.split("-")(0).toInt
+ (PackedPartitionId.getRawPartitionId(id),
PackedPartitionId.getAttemptId(id))
}
}