turboFei commented on code in PR #2891:
URL: https://github.com/apache/celeborn/pull/2891#discussion_r1833810707
##########
client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala:
##########
@@ -59,18 +59,21 @@ class ApplicationHeartbeater(
override def run(): Unit = {
try {
require(masterClient != null, "When sending a heartbeat, client
shouldn't be null.")
- val ((tmpTotalWritten, tmpTotalFileCount),
tmpShuffleFallbackCount) = shuffleMetrics()
+ val (
+ (tmpTotalWritten, tmpTotalFileCount),
+ (tmpShuffleCount, tmpShuffleFallbackCounts)) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count:
$tmpTotalFileCount, " +
- s"shuffle fallback count: $tmpShuffleFallbackCount")
+ s"shuffle count: $tmpShuffleCount, shuffle fallback count:
$tmpShuffleFallbackCounts")
Review Comment:
nit:
```
shuffle fallback counts:
```
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -210,7 +211,10 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
appUniqueId,
conf,
masterClient,
- () => commitManager.commitMetrics() ->
shuffleFallbackCount.sumThenReset(),
+ () =>
+ commitManager.commitMetrics() ->
+ (shuffleCount.sumThenReset(),
+ shuffleFallbackCounts.asScala.filter(_._2 > 0L).toMap),
Review Comment:
we need to reset the `shuffleFallbackCounts` after get values
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -261,8 +261,20 @@ private[celeborn] class Master(
}).sum()
}
+ masterSource.addGauge(MasterSource.SHUFFLE_TOTAL_COUNT) { () =>
+ statusSystem.shuffleTotalCount.sum()
+ }
+
masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
- statusSystem.shuffleTotalFallbackCount.sum()
+ statusSystem.shuffleFallbackCounts.values().asScala.map(_.longValue()).sum
+ }
+
+ statusSystem.shuffleFallbackCounts.entrySet().asScala.foreach {
shuffleFallbackCount =>
Review Comment:
If the initial `shuffleFallbackCounts` is empty, it will not register the
gauge metrics
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -261,8 +261,20 @@ private[celeborn] class Master(
}).sum()
}
+ masterSource.addGauge(MasterSource.SHUFFLE_TOTAL_COUNT) { () =>
+ statusSystem.shuffleTotalCount.sum()
+ }
+
masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
- statusSystem.shuffleTotalFallbackCount.sum()
+ statusSystem.shuffleFallbackCounts.values().asScala.map(_.longValue()).sum
+ }
+
+ statusSystem.shuffleFallbackCounts.entrySet().asScala.foreach {
shuffleFallbackCount =>
Review Comment:
how to register metrics during runtime?
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala:
##########
@@ -55,6 +55,7 @@ object MasterSource {
val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
+ val SHUFFLE_TOTAL_COUNT = "ShuffleTotalCount"
Review Comment:
how about adding comments in the code as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]