waitinfuture commented on code in PR #2064:
URL:
https://github.com/apache/incubator-celeborn/pull/2064#discussion_r1388151623
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -612,10 +619,13 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
}
})
+ pushMergedData.body().retain()
Review Comment:
ditto, L521 already calls `retain()`, I don't quite understand the reason to
call it again here.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -341,6 +353,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
}
})
+ pushData.body().retain()
Review Comment:
I don't see the reason to call `retain()` here, could you explain a bit
about the reason? L255 already called `retain()`
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1199,51 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
}
+ private def writeLocalData(
+ fileWriters: Seq[FileWriter],
+ body: ByteBuf,
+ shuffleKey: String,
+ batchOffsets: Option[Array[Int]],
+ writePromise: Option[Promise[Unit]]): Unit = {
+ def writeData(fileWriter: FileWriter, body: ByteBuf, shuffleKey: String):
Unit = {
+ try {
+ fileWriter.write(body)
+ } catch {
+ case e: AlreadyClosedException =>
+ fileWriter.decrementPendingWrites()
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttempt =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ shuffleMapperAttempts.get(shuffleKey).get(mapId)
+ } else -1
+ // TODO just info log for ended attempt
+ logWarning(s"Append data failed for task(shuffle $shuffleKey, map
$mapId, attempt" +
+ s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
+ writePromise.map(_.failure(e)).orElse(throw e)
+ case e: Exception =>
+ logError("Exception encountered when write.", e)
+ writePromise.map(_.failure(e)).orElse(throw e)
+ }
+ }
+ batchOffsets match {
+ case Some(batchOffsets) =>
+ batchOffsets.zip(fileWriters).foreach { case (offset, writer) =>
+ val length =
+ if (offset == batchOffsets.last) {
+ body.readableBytes() - offset
+ } else {
+ batchOffsets(batchOffsets.indexOf(offset) + 1) - offset
+ }
+ val batchBody = body.slice(body.readerIndex() + offset, length)
+ writeData(writer, batchBody, shuffleKey)
+ }
+ writePromise.foreach(_.success())
Review Comment:
Seems we should only call `success` when `writePromise` is not failed in
`writeLocalData`
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1199,51 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
}
+ private def writeLocalData(
+ fileWriters: Seq[FileWriter],
+ body: ByteBuf,
+ shuffleKey: String,
+ batchOffsets: Option[Array[Int]],
+ writePromise: Option[Promise[Unit]]): Unit = {
+ def writeData(fileWriter: FileWriter, body: ByteBuf, shuffleKey: String):
Unit = {
+ try {
+ fileWriter.write(body)
+ } catch {
+ case e: AlreadyClosedException =>
+ fileWriter.decrementPendingWrites()
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttempt =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ shuffleMapperAttempts.get(shuffleKey).get(mapId)
+ } else -1
+ // TODO just info log for ended attempt
+ logWarning(s"Append data failed for task(shuffle $shuffleKey, map
$mapId, attempt" +
+ s" $attemptId), caused by AlreadyClosedException, endedAttempt
$endedAttempt, error message: ${e.getMessage}")
+ writePromise.map(_.failure(e)).orElse(throw e)
+ case e: Exception =>
+ logError("Exception encountered when write.", e)
+ writePromise.map(_.failure(e)).orElse(throw e)
+ }
+ }
+ batchOffsets match {
+ case Some(batchOffsets) =>
+ batchOffsets.zip(fileWriters).foreach { case (offset, writer) =>
+ val length =
+ if (offset == batchOffsets.last) {
+ body.readableBytes() - offset
+ } else {
+ batchOffsets(batchOffsets.indexOf(offset) + 1) - offset
+ }
+ val batchBody = body.slice(body.readerIndex() + offset, length)
+ writeData(writer, batchBody, shuffleKey)
+ }
+ writePromise.foreach(_.success())
+ case _ =>
+ writeData(fileWriters.head, body, shuffleKey)
+ writePromise.foreach(_.success())
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]