RexXiong commented on code in PR #3601:
URL: https://github.com/apache/celeborn/pull/3601#discussion_r2807541334
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -945,6 +953,14 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
numPartitions = numPartitions,
crc32PerPartition = crc32PerPartition,
bytesWrittenPerPartition = bytesWrittenPerPartition)
+
+ if (mapperAttemptFinishedSuccess && shuffleWriteLimitEnabled) {
Review Comment:
Should we consider adding a negative test case for this feature?
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -132,6 +132,11 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val mockDestroyFailure = conf.testMockDestroySlotsFailure
private val authEnabled = conf.authEnabledOnClient
private var applicationMeta: ApplicationMeta = _
+
+ private val shuffleWriteLimitEnabled = conf.shuffleWriteLimitEnabled
+ private val shuffleWriteLimitThreshold = conf.shuffleWriteLimitThreshold
+ private val shuffleTotalWrittenBytes = JavaUtils.newConcurrentHashMap[Int,
AtomicLong]()
Review Comment:
Should clean shuffleId related data when shuffle expires.
##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -218,7 +218,8 @@ object ControlMessages extends Logging {
numPartitions: Int,
crc32PerPartition: Array[Int],
bytesWrittenPerPartition: Array[Long],
- serdeVersion: SerdeVersion)
+ serdeVersion: SerdeVersion,
+ bytesWritten: Long)
Review Comment:
It's not a big deal since MapperEnd is only utilized on the engine side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]