This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new b2414df3f [CELEBORN-1491] introduce flusher working queue size metric
b2414df3f is described below

commit b2414df3fe859b11a5a6e549396b683519881639
Author: mingji <[email protected]>
AuthorDate: Fri Jul 5 09:55:02 2024 +0800

    [CELEBORN-1491] introduce flusher working queue size metric
    
    Add metrics about flusher working queue size.
    
    To show if there is an accumulation of flush tasks.
    
    NO.
    
    GA.
    
    Closes #2598 from FMX/b1491.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit cb6e2202ae80232cdfbaddb7f960654dd3004c31)
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/service/deploy/worker/WorkerSource.scala      |  1 +
 .../celeborn/service/deploy/worker/storage/Flusher.scala   | 14 +++++++++++---
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index f4b152b08..5358fab02 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -168,6 +168,7 @@ object WorkerSource {
   val TAKE_BUFFER_TIME = "TakeBufferTime"
   val FLUSH_DATA_TIME = "FlushDataTime"
   val COMMIT_FILES_TIME = "CommitFilesTime"
+  val FLUSH_WORKING_QUEUE_SIZE = "FlushWorkingQueueSize"
 
   // slots
   val SLOTS_ALLOCATED = "SlotsAllocated"
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index e5d93fa32..cfc94e962 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -33,6 +33,7 @@ import 
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
 import org.apache.celeborn.common.protocol.StorageInfo
 import org.apache.celeborn.common.util.{ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker.WorkerSource
+import 
org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 
@@ -41,7 +42,8 @@ abstract private[worker] class Flusher(
     val threadCount: Int,
     val allocator: PooledByteBufAllocator,
     val maxComponents: Int,
-    flushTimeMetric: TimeWindow) extends Logging {
+    flushTimeMetric: TimeWindow,
+    mountPoint: String) extends Logging {
   protected lazy val flusherId: Int = System.identityHashCode(this)
   protected val workingQueues = new 
Array[LinkedBlockingQueue[FlushTask]](threadCount)
   protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
@@ -95,6 +97,10 @@ abstract private[worker] class Flusher(
           }
         }
       })
+      workerSource.addGauge(FLUSH_WORKING_QUEUE_SIZE, Map("mountpoint" -> 
s"$mountPoint-$index")) {
+        () =>
+          workingQueues(index).size()
+      }
     }
     ThreadPoolSource.registerSource(s"$this", workers)
   }
@@ -147,7 +153,8 @@ private[worker] class LocalFlusher(
     threadCount,
     allocator,
     maxComponents,
-    timeWindow)
+    timeWindow,
+    mountPoint)
   with DeviceObserver with Logging {
 
   deviceMonitor.registerFlusher(this)
@@ -182,7 +189,8 @@ final private[worker] class HdfsFlusher(
     hdfsFlusherThreads,
     allocator,
     maxComponents,
-    null) with Logging {
+    null,
+    "HDFS") with Logging {
 
   override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
     logError(s"$this write failed, reason $deviceErrorType ,exception: $e")

Reply via email to