This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new b2414df3f [CELEBORN-1491] introduce flusher working queue size metric
b2414df3f is described below
commit b2414df3fe859b11a5a6e549396b683519881639
Author: mingji <[email protected]>
AuthorDate: Fri Jul 5 09:55:02 2024 +0800
[CELEBORN-1491] introduce flusher working queue size metric
Add metrics about flusher working queue size.
To show if there is an accumulation of flush tasks.
NO.
GA.
Closes #2598 from FMX/b1491.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit cb6e2202ae80232cdfbaddb7f960654dd3004c31)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/service/deploy/worker/WorkerSource.scala | 1 +
.../celeborn/service/deploy/worker/storage/Flusher.scala | 14 +++++++++++---
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index f4b152b08..5358fab02 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -168,6 +168,7 @@ object WorkerSource {
val TAKE_BUFFER_TIME = "TakeBufferTime"
val FLUSH_DATA_TIME = "FlushDataTime"
val COMMIT_FILES_TIME = "CommitFilesTime"
+ val FLUSH_WORKING_QUEUE_SIZE = "FlushWorkingQueueSize"
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index e5d93fa32..cfc94e962 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -33,6 +33,7 @@ import
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker.WorkerSource
+import
org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -41,7 +42,8 @@ abstract private[worker] class Flusher(
val threadCount: Int,
val allocator: PooledByteBufAllocator,
val maxComponents: Int,
- flushTimeMetric: TimeWindow) extends Logging {
+ flushTimeMetric: TimeWindow,
+ mountPoint: String) extends Logging {
protected lazy val flusherId: Int = System.identityHashCode(this)
protected val workingQueues = new
Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
@@ -95,6 +97,10 @@ abstract private[worker] class Flusher(
}
}
})
+ workerSource.addGauge(FLUSH_WORKING_QUEUE_SIZE, Map("mountpoint" ->
s"$mountPoint-$index")) {
+ () =>
+ workingQueues(index).size()
+ }
}
ThreadPoolSource.registerSource(s"$this", workers)
}
@@ -147,7 +153,8 @@ private[worker] class LocalFlusher(
threadCount,
allocator,
maxComponents,
- timeWindow)
+ timeWindow,
+ mountPoint)
with DeviceObserver with Logging {
deviceMonitor.registerFlusher(this)
@@ -182,7 +189,8 @@ final private[worker] class HdfsFlusher(
hdfsFlusherThreads,
allocator,
maxComponents,
- null) with Logging {
+ null,
+ "HDFS") with Logging {
override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")