This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a1be02b4f [CELEBORN-757] Improve metrics method signature and code 
style
a1be02b4f is described below

commit a1be02b4fa696ca62f8f258161aad0859aad8fd2
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 3 11:56:43 2023 +0800

    [CELEBORN-757] Improve metrics method signature and code style
    
    ### What changes were proposed in this pull request?
    
    - gauge method definition improvement. i.e.
    
      before
      ```
      def addGauge[T](name: String, f: Unit => T, labels: Map[String, String])
      ```
      after
      ```
      def addGauge[T](name: String, labels: Map[String, String])(f: () => T)
      ```
      which improves the caller-side code style
      ```
      addGauge(name, labels) { () =>
        ...
      }
      ```
    
    - remove unnecessary Java/Scala collection conversion. Since Scala 2.11 
does not support SAM, the extra implicit function is required.
    
    - leverage Logging to defer message evaluation
    
    - UPPER_CASE string constants
    
    ### Why are the changes needed?
    
    Improve code quality and performance(maybe)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1670 from pan3793/CELEBORN-757.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/common/meta/FileManagedBuffers.java   |   3 +-
 .../common/network/client/TransportClient.java     |   2 +-
 .../common/network/util/NettyMemoryMetrics.java    |  16 +--
 .../org/apache/celeborn/common/util/JavaUtils.java |   8 +-
 .../celeborn/common/identity/UserIdentifier.scala  |   6 +
 .../apache/celeborn/common/meta/WorkerInfo.scala   |   2 +-
 .../celeborn/common/metrics/MetricLabels.scala     |   8 +-
 .../common/metrics/sink/PrometheusServlet.scala    |  14 +-
 .../common/metrics/source/AbstractSource.scala     |  61 ++++-----
 .../common/metrics/source/JVMCPUSource.scala       |   4 +-
 .../celeborn/common/metrics/source/Source.scala    |   2 +-
 .../celeborn/common/util/FunctionConverter.scala   |   9 +-
 .../metrics/source/CelebornSourceSuite.scala       |  24 ++--
 .../celeborn/service/deploy/master/Master.scala    |  48 ++++---
 .../service/deploy/master/MasterSource.scala       |  18 +--
 .../congestcontrol/CongestionController.java       |  15 +-
 .../service/deploy/worker/storage/FileWriter.java  |   2 +-
 .../worker/storage/PartitionFilesSorter.java       |   4 +-
 .../service/deploy/worker/Controller.scala         |   6 +-
 .../service/deploy/worker/FetchHandler.scala       |  55 ++++----
 .../service/deploy/worker/PushDataHandler.scala    |  54 ++++----
 .../celeborn/service/deploy/worker/Worker.scala    | 151 ++++++++++++---------
 .../service/deploy/worker/WorkerSource.scala       | 148 ++++++++++----------
 .../deploy/worker/storage/DeviceMonitor.scala      |  64 ++++-----
 .../service/deploy/worker/storage/Flusher.scala    |   2 +-
 .../deploy/worker/storage/ObservedDevice.scala     |  88 ++++++------
 .../congestcontrol/TestCongestionController.java   |   4 +-
 .../deploy/worker/storage/FileWriterSuiteJ.java    |  10 +-
 .../deploy/worker/storage/DeviceMonitorSuite.scala |   8 +-
 29 files changed, 421 insertions(+), 415 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
index 72cb3ee59..41fd89049 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
@@ -41,8 +41,7 @@ public class FileManagedBuffers {
         offsets[i] = chunkOffsets.get(i);
       }
     } else {
-      offsets = new long[1];
-      offsets[0] = 0;
+      offsets = new long[] {0};
     }
     this.conf = conf;
   }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 699f1ac78..00c25b56b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -241,7 +241,7 @@ public class TransportClient implements Closeable {
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
-      throw new IOException("Exception in sendRpcSync to:" + 
this.getSocketAddress(), e);
+      throw new IOException("Exception in sendRpcSync to: " + 
this.getSocketAddress(), e);
     }
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
index 2d457f01f..35f4f14a4 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
@@ -91,12 +91,12 @@ public class NettyMemoryMetrics {
       logger.debug("setup netty metrics");
       source.addGauge(
           MetricRegistry.name(metricPrefix, "usedHeapMemory"),
-          pooledAllocatorMetric::usedHeapMemory,
-          labels);
+          labels,
+          pooledAllocatorMetric::usedHeapMemory);
       source.addGauge(
           MetricRegistry.name(metricPrefix, "usedDirectMemory"),
-          pooledAllocatorMetric::usedDirectMemory,
-          labels);
+          labels,
+          pooledAllocatorMetric::usedDirectMemory);
       if (verboseMetricsEnabled) {
         int directArenaIndex = 0;
         for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
@@ -133,26 +133,26 @@ public class NettyMemoryMetrics {
       if (returnType.equals(int.class)) {
         source.addGauge(
             metricName,
+            labels,
             () -> {
               try {
                 return (Integer) m.invoke(arenaMetric);
               } catch (Exception e) {
                 return -1; // Swallow the exceptions.
               }
-            },
-            labels);
+            });
 
       } else if (returnType.equals(long.class)) {
         source.addGauge(
             metricName,
+            labels,
             () -> {
               try {
                 return (Long) m.invoke(arenaMetric);
               } catch (Exception e) {
                 return -1L; // Swallow the exceptions.
               }
-            },
-            labels);
+            });
       }
     }
   }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 727003e73..493ea4369 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -427,18 +427,18 @@ public class JavaUtils {
 
   public static <K, V> ConcurrentHashMap<K, V> newConcurrentHashMap() {
     if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-      return new ConcurrentHashMap();
+      return new ConcurrentHashMap<>();
     } else {
-      return new ConcurrentHashMapForJDK8();
+      return new ConcurrentHashMapForJDK8<>();
     }
   }
 
   public static <K, V> ConcurrentHashMap<K, V> newConcurrentHashMap(
       Map<? extends K, ? extends V> m) {
     if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-      return new ConcurrentHashMap(m);
+      return new ConcurrentHashMap<>(m);
     } else {
-      return new ConcurrentHashMapForJDK8(m);
+      return new ConcurrentHashMapForJDK8<>(m);
     }
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
 
b/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
index 0353ec7cd..cac8b258a 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
@@ -17,6 +17,10 @@
 
 package org.apache.celeborn.common.identity
 
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+
 import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.internal.Logging
 
@@ -30,6 +34,8 @@ case class UserIdentifier(tenantId: String, name: String) {
     Map("tenantId" -> tenantId, "name" -> name)
   }
 
+  def toJMap: JMap[String, String] = toMap.asJava
+
   override def toString: String = {
     s"`$tenantId`.`$name`"
   }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index e4753884e..92f6c91ed 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -120,7 +120,7 @@ class WorkerInfo(
     unknownDiskSlots.remove(shuffleKey)
   }
 
-  def getShuffleKeySet(): util.HashSet[String] = this.synchronized {
+  def getShuffleKeySet: util.HashSet[String] = this.synchronized {
     val shuffleKeySet = new util.HashSet[String]()
     diskInfos.values().asScala.foreach { diskInfo =>
       shuffleKeySet.addAll(diskInfo.getShuffleKeySet())
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
index a591d1671..0883c5e9e 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
@@ -25,12 +25,6 @@ private[metrics] trait MetricLabels {
 
 object MetricLabels {
   def labelString(labels: Map[String, String]): String = {
-    "{" +
-      labels
-        .map { case (key: String, value: String) => s"""$key="$value"""" }
-        .toList
-        .sorted
-        .mkString(", ") +
-      "}"
+    labels.map { case (k, v) => s"""$k="$v"""" }.toArray.sorted.mkString("{", 
",", "}")
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
index c6356ba4e..6913457af 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
@@ -38,10 +38,8 @@ class PrometheusServlet(
     new PrometheusHttpRequestHandler(servletPath, this)
   }
 
-  def getMetricsSnapshot(): String = {
-    val sb = new StringBuilder()
-    sources.foreach(source => sb.append(source.getMetrics()))
-    sb.toString()
+  def getMetricsSnapshot: String = {
+    sources.map(_.getMetrics).mkString
   }
 
   override def start(): Unit = {}
@@ -57,10 +55,10 @@ class PrometheusHttpRequestHandler(
     prometheusServlet: PrometheusServlet) extends Logging {
 
   def handleRequest(uri: String): String = {
-    uri match {
-      case `path` =>
-        prometheusServlet.getMetricsSnapshot()
-      case _ => s"Unknown uri ${uri}!"
+    if (uri == path) {
+      prometheusServlet.getMetricsSnapshot
+    } else {
+      s"Unknown path $uri!"
     }
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index d28986bb0..13a7b77cb 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.common.metrics.source
 
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
ScheduledExecutorService, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -29,6 +30,8 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.{MetricLabels, 
ResettableSlidingWindowReservoir, RssHistogram, RssTimer}
 import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
 
 case class NamedCounter(name: String, counter: Counter, labels: Map[String, 
String])
   extends MetricLabels
@@ -51,55 +54,50 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   val metricsCollectCriticalEnabled: Boolean = 
conf.metricsCollectCriticalEnabled
 
-  final val metricsCapacity = conf.metricsCapacity
+  val metricsCapacity: Int = conf.metricsCapacity
 
   val innerMetrics: ConcurrentLinkedQueue[String] = new 
ConcurrentLinkedQueue[String]()
 
   val timerSupplier = new TimerSupplier(metricsSlidingWindowSize)
 
   val metricsCleaner: ScheduledExecutorService =
-    
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"worker-metrics-cleaner")
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-metrics-cleaner")
 
-  val roleLabel = "role" -> role
-  val staticLabels = conf.metricsExtraLabels + roleLabel
-  val staticLabelsString = MetricLabels.labelString(staticLabels)
+  val roleLabel: (String, String) = "role" -> role
+  val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel
+  val staticLabelsString: String = MetricLabels.labelString(staticLabels)
 
-  protected val namedGauges: java.util.List[NamedGauge[_]] =
-    new java.util.ArrayList[NamedGauge[_]]()
+  protected val namedGauges: JList[NamedGauge[_]] = new 
JArrayList[NamedGauge[_]]()
 
   def addGauge[T](
       name: String,
-      gauge: Gauge[T],
-      labels: Map[String, String]): Unit = {
+      labels: Map[String, String],
+      gauge: Gauge[T]): Unit = {
     namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
   }
 
   def addGauge[T](
       name: String,
-      gauge: Gauge[T],
-      labels: java.util.Map[String, String]): Unit = {
-    addGauge(name, gauge, labels.asScala.toMap)
+      labels: JMap[String, String],
+      gauge: Gauge[T]): Unit = {
+    addGauge(name, labels.asScala.toMap, gauge)
   }
 
-  def addGauge[T](
-      name: String,
-      f: Unit => T,
-      labels: Map[String, String]): Unit = {
+  def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () 
=> T): Unit = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
     if (!metricRegistry.getGauges.containsKey(metricNameWithLabel)) {
       val supplier: MetricRegistry.MetricSupplier[Gauge[_]] = new 
GaugeSupplier[T](f)
       val gauge = metricRegistry.gauge(metricNameWithLabel, supplier)
-      addGauge(name, gauge, labels)
+      addGauge(name, labels, gauge)
     }
   }
 
-  def addGauge[T](name: String, f: Unit => T): Unit = addGauge(name, f, 
Map.empty[String, String])
-
   def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
-    addGauge(name, gauge, Map.empty[String, String])
+    addGauge(name, Map.empty[String, String], gauge)
   }
 
-  protected val namedTimers =
+  protected val namedTimers
+      : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, 
Long])] =
     JavaUtils.newConcurrentHashMap[String, (NamedTimer, 
ConcurrentHashMap[String, Long])]()
 
   def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])
@@ -107,13 +105,14 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
   def addTimer(name: String, labels: Map[String, String]): Unit = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
     if (!metricRegistry.getTimers.containsKey(metricNameWithLabel)) {
-      val timer =
-        metricRegistry.timer(metricNameWithLabel, timerSupplier)
-      namedTimers.putIfAbsent(
+      val timer = metricRegistry.timer(metricNameWithLabel, timerSupplier)
+      namedTimers.computeIfAbsent(
         metricNameWithLabel,
-        (
-          NamedTimer(name, timer, labels ++ staticLabels),
-          JavaUtils.newConcurrentHashMap[String, Long]()))
+        (_: String) => {
+          val namedTimer = NamedTimer(name, timer, labels ++ staticLabels)
+          val values = JavaUtils.newConcurrentHashMap[String, Long]()
+          (namedTimer, values)
+        })
     }
   }
 
@@ -418,10 +417,6 @@ class TimerSupplier(val slidingWindowSize: Int)
   }
 }
 
-class GaugeSupplier[T](f: Unit => T) extends 
MetricRegistry.MetricSupplier[Gauge[_]] {
-  override def newMetric(): Gauge[T] = {
-    new Gauge[T] {
-      override def getValue: T = f(())
-    }
-  }
+class GaugeSupplier[T](f: () => T) extends 
MetricRegistry.MetricSupplier[Gauge[_]] {
+  override def newMetric(): Gauge[T] = new Gauge[T] { override def getValue: T 
= f() }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
index 3b38ec2d3..1b251f0a1 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
@@ -32,7 +32,7 @@ class JVMCPUSource(conf: CelebornConf, role: String) extends 
AbstractSource(conf
   import JVMCPUSource._
 
   addGauge(
-    JVMCPUTime,
+    JVM_CPU_TIME,
     new Gauge[Long] {
       val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
       val name = new ObjectName("java.lang", "type", "OperatingSystem")
@@ -51,5 +51,5 @@ class JVMCPUSource(conf: CelebornConf, role: String) extends 
AbstractSource(conf
 }
 
 object JVMCPUSource {
-  val JVMCPUTime = "JVMCPUTime"
+  val JVM_CPU_TIME = "JVMCPUTime"
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
index 2e02efada..eff7cf744 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
@@ -26,6 +26,6 @@ trait Source {
   def startTimer(metricsName: String, key: String): Unit
   def stopTimer(metricsName: String, key: String): Unit
   def incCounter(metricsName: String, incV: Long): Unit
-  def getMetrics(): String
+  def getMetrics: String
   def destroy(): Unit
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
index 8eef121f4..6c091cb49 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.celeborn.common.util
 
+import java.util.function.Consumer
+
 import scala.language.implicitConversions
 
 /**
@@ -24,11 +26,16 @@ import scala.language.implicitConversions
  */
 object FunctionConverter {
 
-  implicit def scalaFunctionToJava[From, To](function: (From) => To)
+  implicit def scalaFunctionToJava[From, To](function: From => To)
       : java.util.function.Function[From, To] = {
     new java.util.function.Function[From, To] {
       override def apply(input: From): To = function(input)
     }
   }
 
+  implicit def scalaConsumerToJava[T](consumer: T => AnyVal): 
java.util.function.Consumer[T] = {
+    new Consumer[T] {
+      override def accept(t: T): Unit = consumer(t)
+    }
+  }
 }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
 
b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
index 96735c45d..89330bee9 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
@@ -34,8 +34,8 @@ class CelebornSourceSuite extends CelebornFunSuite {
     val user1 = Map("user" -> "user1")
     val user2 = Map("user" -> "user2")
     val user3 = Map("user" -> "user3")
-    mockSource.addGauge("Gauge1", _ => 1000)
-    mockSource.addGauge("Gauge2", _ => 2000, user1)
+    mockSource.addGauge("Gauge1") { () => 1000 }
+    mockSource.addGauge("Gauge2", user1) { () => 2000 }
     mockSource.addCounter("Counter1")
     mockSource.addCounter("Counter2", user2)
     // test operation with and without label
@@ -53,16 +53,14 @@ class CelebornSourceSuite extends CelebornFunSuite {
     val res = mockSource.getMetrics()
     var extraLabelsStr = extraLabels
     if (extraLabels.nonEmpty) {
-      extraLabelsStr = extraLabels + ", "
+      extraLabelsStr = extraLabels + ","
     }
     val exp1 = s"""metrics_Gauge1_Value{${extraLabelsStr}role="mock"} 1000"""
-    val exp2 =
-      s"""metrics_Gauge2_Value{${extraLabelsStr}role="mock", user="user1"} 
2000"""
+    val exp2 = 
s"""metrics_Gauge2_Value{${extraLabelsStr}role="mock",user="user1"} 2000"""
     val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}role="mock"} 3000"""
-    val exp4 =
-      s"""metrics_Counter2_Count{${extraLabelsStr}role="mock", user="user2"} 
4000"""
+    val exp4 = 
s"""metrics_Counter2_Count{${extraLabelsStr}role="mock",user="user2"} 4000"""
     val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}role="mock"} 1"""
-    val exp6 = s"""metrics_Timer2_Count{${extraLabelsStr}role="mock", 
user="user3"} 1"""
+    val exp6 = 
s"""metrics_Timer2_Count{${extraLabelsStr}role="mock",user="user3"} 1"""
 
     assert(res.contains(exp1))
     assert(res.contains(exp2))
@@ -76,19 +74,19 @@ class CelebornSourceSuite extends CelebornFunSuite {
     val conf = new CelebornConf()
     // label's is normal
     conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3")
-    val extraLabels = """l1="v1", l2="v2", l3="v3""""
+    val extraLabels = """l1="v1",l2="v2",l3="v3""""
     createAbstractSourceAndCheck(conf, extraLabels)
 
     // labels' kv not correct
-    assertThrows[IllegalArgumentException]({
+    assertThrows[IllegalArgumentException] {
       conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=")
-      val extraLabels2 = """l1="v1", l2="v2", l3="v3""""
+      val extraLabels2 = """l1="v1",l2="v2",l3="v3""""
       createAbstractSourceAndCheck(conf, extraLabels2)
-    })
+    }
 
     // there are spaces in labels
     conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2  =v2  ,l3 
=v3  ")
-    val extraLabels3 = """l1="v1", l2="v2", l3="v3""""
+    val extraLabels3 = """l1="v1",l2="v2",l3="v3""""
     createAbstractSourceAndCheck(conf, extraLabels3)
 
   }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1c1c1908d..41c3a9657 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -141,14 +141,16 @@ private[celeborn] class Master(
   // init and register master metrics
   val resourceConsumptionSource = new ResourceConsumptionSource(conf)
   private val masterSource = new MasterSource(conf)
-  masterSource.addGauge(
-    MasterSource.RegisteredShuffleCount,
-    _ => statusSystem.registeredShuffle.size())
-  masterSource.addGauge(MasterSource.ExcludedWorkerCount, _ => 
statusSystem.excludedWorkers.size())
-  masterSource.addGauge(MasterSource.WorkerCount, _ => 
statusSystem.workers.size())
-  masterSource.addGauge(MasterSource.LostWorkerCount, _ => 
statusSystem.lostWorkers.size())
-  masterSource.addGauge(MasterSource.PartitionSize, _ => 
statusSystem.estimatedPartitionSize)
-  masterSource.addGauge(MasterSource.IsActiveMaster, _ => isMasterActive)
+  masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
+    statusSystem.registeredShuffle.size
+  }
+  masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () =>
+    statusSystem.excludedWorkers.size
+  }
+  masterSource.addGauge(MasterSource.WORKER_COUNT) { () => 
statusSystem.workers.size }
+  masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () => 
statusSystem.lostWorkers.size }
+  masterSource.addGauge(MasterSource.PARTITION_SIZE) { () => 
statusSystem.estimatedPartitionSize }
+  masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
 
   metricsSystem.registerSource(resourceConsumptionSource)
   metricsSystem.registerSource(masterSource)
@@ -540,7 +542,7 @@ private[celeborn] class Master(
     val availableWorkers = workersAvailable()
     // offer slots
     val slots =
-      masterSource.sample(MasterSource.OfferSlotsTime, 
s"offerSlots-${Random.nextInt()}") {
+      masterSource.sample(MasterSource.OFFER_SLOTS_TIME, 
s"offerSlots-${Random.nextInt()}") {
         statusSystem.workers.synchronized {
           if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && 
!conf.hasHDFSStorage) {
             SlotsAllocator.offerSlotsLoadAware(
@@ -710,22 +712,18 @@ private[celeborn] class Master(
       userIdentifier: UserIdentifier,
       context: RpcCallContext): Unit = {
 
-    resourceConsumptionSource.addGauge(
-      "diskFileCount",
-      _ => computeUserResourceConsumption(userIdentifier).diskFileCount,
-      userIdentifier.toMap)
-    resourceConsumptionSource.addGauge(
-      "diskBytesWritten",
-      _ => computeUserResourceConsumption(userIdentifier).diskBytesWritten,
-      userIdentifier.toMap)
-    resourceConsumptionSource.addGauge(
-      "hdfsFileCount",
-      _ => computeUserResourceConsumption(userIdentifier).hdfsFileCount,
-      userIdentifier.toMap)
-    resourceConsumptionSource.addGauge(
-      "hdfsBytesWritten",
-      _ => computeUserResourceConsumption(userIdentifier).hdfsBytesWritten,
-      userIdentifier.toMap)
+    resourceConsumptionSource.addGauge("diskFileCount", userIdentifier.toMap) 
{ () =>
+      computeUserResourceConsumption(userIdentifier).diskFileCount
+    }
+    resourceConsumptionSource.addGauge("diskBytesWritten", 
userIdentifier.toMap) { () =>
+      computeUserResourceConsumption(userIdentifier).diskBytesWritten
+    }
+    resourceConsumptionSource.addGauge("hdfsFileCount", userIdentifier.toMap) 
{ () =>
+      computeUserResourceConsumption(userIdentifier).hdfsFileCount
+    }
+    resourceConsumptionSource.addGauge("hdfsBytesWritten", 
userIdentifier.toMap) { () =>
+      computeUserResourceConsumption(userIdentifier).hdfsBytesWritten
+    }
 
     val userResourceConsumption = 
computeUserResourceConsumption(userIdentifier)
     val quota = quotaManager.getQuota(userIdentifier)
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 5c3862cce..cd3f1f0ea 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -21,29 +21,29 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.MetricsSystem
 import org.apache.celeborn.common.metrics.source.AbstractSource
-import org.apache.celeborn.service.deploy.master.MasterSource.OfferSlotsTime
+import org.apache.celeborn.service.deploy.master.MasterSource.OFFER_SLOTS_TIME
 
 class MasterSource(conf: CelebornConf)
   extends AbstractSource(conf, MetricsSystem.ROLE_MASTER) with Logging {
   override val sourceName = s"master"
 
-  addTimer(OfferSlotsTime)
+  addTimer(OFFER_SLOTS_TIME)
   // start cleaner
   startCleaner()
 }
 
 object MasterSource {
-  val WorkerCount = "WorkerCount"
+  val WORKER_COUNT = "WorkerCount"
 
-  val LostWorkerCount = "LostWorkers"
+  val LOST_WORKER_COUNT = "LostWorkers"
 
-  val ExcludedWorkerCount = "ExcludedWorkerCount"
+  val EXCLUDED_WORKER_COUNT = "ExcludedWorkerCount"
 
-  val RegisteredShuffleCount = "RegisteredShuffleCount"
+  val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
 
-  val IsActiveMaster = "IsActiveMaster"
+  val IS_ACTIVE_MASTER = "IsActiveMaster"
 
-  val PartitionSize = "PartitionSize"
+  val PARTITION_SIZE = "PartitionSize"
 
-  val OfferSlotsTime = "OfferSlotsTime"
+  val OFFER_SLOTS_TIME = "OfferSlotsTime"
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
index 4249d896d..6b6ecbb3c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
@@ -75,10 +75,10 @@ public class CongestionController {
         this::removeInactiveUsers, 0, userInactiveTimeMills, 
TimeUnit.MILLISECONDS);
 
     this.workerSource.addGauge(
-        WorkerSource.PotentialConsumeSpeed(), this::getPotentialConsumeSpeed);
+        WorkerSource.POTENTIAL_CONSUME_SPEED(), 
this::getPotentialConsumeSpeed);
 
     this.workerSource.addGauge(
-        WorkerSource.WorkerConsumeSpeed(), 
consumedBufferStatusHub::avgBytesPerSec);
+        WorkerSource.WORKER_CONSUME_SPEED(), 
consumedBufferStatusHub::avgBytesPerSec);
   }
 
   public static synchronized CongestionController initialize(
@@ -162,8 +162,7 @@ public class CongestionController {
       long avgConsumeSpeed = getPotentialConsumeSpeed();
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "The user {}, produceSpeed is {},"
-                + " while consumeSpeed is {}, need to congest it: {}",
+            "The user {}, produceSpeed is {}, while consumeSpeed is {}, need 
to congest it: {}",
             userIdentifier,
             userProduceSpeed,
             avgConsumeSpeed,
@@ -184,9 +183,9 @@ public class CongestionController {
               BufferStatusHub bufferStatusHub = new 
BufferStatusHub(sampleTimeWindowSeconds);
               UserBufferInfo userInfo = new UserBufferInfo(currentTimeMillis, 
bufferStatusHub);
               workerSource.addGauge(
-                  WorkerSource.UserProduceSpeed(),
-                  () -> getUserProduceSpeed(userInfo),
-                  userIdentifier.toMap());
+                  WorkerSource.USER_PRODUCE_SPEED(),
+                  userIdentifier.toJMap(),
+                  () -> getUserProduceSpeed(userInfo));
               return userInfo;
             });
 
@@ -241,7 +240,7 @@ public class CongestionController {
         UserBufferInfo userBufferInfo = next.getValue();
         if (currentTimeMillis - userBufferInfo.getTimestamp() >= 
userInactiveTimeMills) {
           userBufferStatuses.remove(userIdentifier);
-          workerSource.removeGauge(WorkerSource.UserProduceSpeed(), 
userIdentifier.toMap());
+          workerSource.removeGauge(WorkerSource.USER_PRODUCE_SPEED(), 
userIdentifier.toMap());
           logger.info("User {} has been expired, remove from rate limit list", 
userIdentifier);
         }
       }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 79c2d885b..7e831fb33 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -357,7 +357,7 @@ public abstract class FileWriter implements DeviceObserver {
     String metricsName = null;
     String fileAbsPath = null;
     if (source.metricsCollectCriticalEnabled()) {
-      metricsName = WorkerSource.TakeBufferTime();
+      metricsName = WorkerSource.TAKE_BUFFER_TIME();
       fileAbsPath = fileInfo.getFilePath();
       source.startTimer(metricsName, fileAbsPath);
     }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 9ab516408..f7963c5d6 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -525,7 +525,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
 
     public void sort() throws InterruptedException {
-      source.startTimer(WorkerSource.SortTime(), fileId);
+      source.startTimer(WorkerSource.SORT_TIME(), fileId);
 
       try {
         initializeFiles();
@@ -594,7 +594,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           sorting.remove(fileId);
         }
       }
-      source.stopTimer(WorkerSource.SortTime(), fileId);
+      source.stopTimer(WorkerSource.SORT_TIME(), fileId);
     }
 
     private void initializeFiles() throws IOException {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 0ee3753e4..8ad590160 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -92,7 +92,7 @@ private[deploy] class Controller(
           userIdentifier,
           pushDataTimeout) =>
       val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
-      workerSource.sample(WorkerSource.ReserveSlotsTime, shuffleKey) {
+      workerSource.sample(WorkerSource.RESERVE_SLOTS_TIME, shuffleKey) {
         logDebug(s"Received ReserveSlots request, $shuffleKey, " +
           s"primary partitions: 
${primaryLocations.asScala.map(_.getUniqueId).mkString(",")}; " +
           s"replica partitions: 
${replicaLocations.asScala.map(_.getUniqueId).mkString(",")}.")
@@ -389,7 +389,7 @@ private[deploy] class Controller(
       } else {
         logInfo(s"Start commitFiles for ${shuffleKey}")
         commitInfo.status = CommitInfo.COMMIT_INPROCESS
-        workerSource.startTimer(WorkerSource.CommitFilesTime, shuffleKey)
+        workerSource.startTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey)
       }
     }
 
@@ -528,7 +528,7 @@ private[deploy] class Controller(
       }
       context.reply(response)
 
-      workerSource.stopTimer(WorkerSource.CommitFilesTime, shuffleKey)
+      workerSource.stopTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey)
     }
 
     if (future != null) {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index a7370d252..950b5208a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -26,6 +26,7 @@ import java.util.function.Consumer
 import com.google.common.base.Throwables
 import io.netty.util.concurrent.{Future, GenericFutureListener}
 
+import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{FileInfo, FileManagedBuffers}
 import org.apache.celeborn.common.network.buffer.NioManagedBuffer
@@ -38,14 +39,14 @@ import org.apache.celeborn.common.protocol.PartitionType
 import org.apache.celeborn.common.util.ExceptionUtils
 import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, 
CreditStreamManager, PartitionFilesSorter, StorageManager}
 
-class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with 
Logging {
-  var celebornConf = conf.getCelebornConf
+class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
+  extends BaseMessageHandler with Logging {
   var chunkStreamManager = new ChunkStreamManager()
   val creditStreamManager = new CreditStreamManager(
-    celebornConf.partitionReadBuffersMin,
-    celebornConf.partitionReadBuffersMax,
-    celebornConf.creditStreamThreadsPerMountpoint,
-    celebornConf.readBuffersToTriggerReadMin)
+    conf.partitionReadBuffersMin,
+    conf.partitionReadBuffersMax,
+    conf.creditStreamThreadsPerMountpoint,
+    conf.readBuffersToTriggerReadMin)
   var workerSource: WorkerSource = _
   var storageManager: StorageManager = _
   var partitionsSorter: PartitionFilesSorter = _
@@ -54,13 +55,13 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
   def init(worker: Worker): Unit = {
     this.workerSource = worker.workerSource
 
-    workerSource.addGauge(
-      WorkerSource.CreditStreamCount,
-      _ => creditStreamManager.getStreamsCount)
+    workerSource.addGauge(WorkerSource.CREDIT_STREAM_COUNT) { () =>
+      creditStreamManager.getStreamsCount
+    }
 
-    workerSource.addGauge(
-      WorkerSource.ActiveMapPartitionCount,
-      _ => creditStreamManager.getActiveMapPartitionCount)
+    workerSource.addGauge(WorkerSource.ACTIVE_MAP_PARTITION_COUNT) { () =>
+      creditStreamManager.getActiveMapPartitionCount
+    }
 
     this.storageManager = worker.storageManager
     this.partitionsSorter = worker.partitionsSorter
@@ -111,10 +112,10 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
           new String(openStreamWithCredit.fileName, StandardCharsets.UTF_8))
       }
     // metrics start
-    workerSource.startTimer(WorkerSource.OpenStreamTime, shuffleKey)
+    workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
     try {
       var fileInfo = getRawFileInfo(shuffleKey, fileName)
-      try fileInfo.getPartitionType() match {
+      try fileInfo.getPartitionType match {
         case PartitionType.REDUCE =>
           val startMapIndex = msg.asInstanceOf[OpenStream].startMapIndex
           val endMapIndex = msg.asInstanceOf[OpenStream].endMapIndex
@@ -134,7 +135,7 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
               request.requestId,
               new NioManagedBuffer(streamHandle.toByteBuffer)))
           } else {
-            val buffers = new FileManagedBuffers(fileInfo, conf)
+            val buffers = new FileManagedBuffers(fileInfo, transportConf)
             val fetchTimeMetrics = 
storageManager.getFetchTimeMetric(fileInfo.getFile)
             val streamId = chunkStreamManager.registerStream(
               shuffleKey,
@@ -161,8 +162,7 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
               val bufferStreamHandle = new StreamHandle(streamId, 0)
               client.getChannel.writeAndFlush(new RpcResponse(
                 request.requestId,
-                new NioManagedBuffer(bufferStreamHandle
-                  .toByteBuffer)))
+                new NioManagedBuffer(bufferStreamHandle.toByteBuffer)))
             }
           }
 
@@ -180,12 +180,12 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
           handleRpcIOException(client, request.requestId, e)
       } finally {
         // metrics end
-        workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
+        workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
         request.body().release()
       }
     } catch {
       case ioe: IOException =>
-        workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
+        workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
         handleRpcIOException(client, request.requestId, ioe)
     }
   }
@@ -215,15 +215,14 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
       s" to fetch block ${req.streamChunkSlice}")
 
     val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
-    if (chunksBeingTransferred > 
celebornConf.shuffleIoMaxChunksBeingTransferred) {
+    if (chunksBeingTransferred > conf.shuffleIoMaxChunksBeingTransferred) {
       val message = "Worker is too busy. The number of chunks being 
transferred " +
         s"$chunksBeingTransferred exceeds 
celeborn.shuffle.maxChunksBeingTransferred " +
-        s"${celebornConf.shuffleIoMaxChunksBeingTransferred}."
+        s"${conf.shuffleIoMaxChunksBeingTransferred}."
       logError(message)
-      client.getChannel.writeAndFlush(
-        new ChunkFetchFailure(req.streamChunkSlice, message))
+      client.getChannel.writeAndFlush(new 
ChunkFetchFailure(req.streamChunkSlice, message))
     } else {
-      workerSource.startTimer(WorkerSource.FetchChunkTime, req.toString)
+      workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
       val fetchTimeMetric = 
chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId)
       val fetchBeginTime = System.nanoTime()
       try {
@@ -240,19 +239,19 @@ class FetchHandler(val conf: TransportConf) extends 
BaseMessageHandler with Logg
               if (fetchTimeMetric != null) {
                 fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
               }
-              workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
+              workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, 
req.toString)
             }
           })
       } catch {
         case e: Exception =>
           logError(
-            String.format(s"Error opening block ${req.streamChunkSlice} for 
request from" +
-              s" ${NettyUtils.getRemoteAddress(client.getChannel)}"),
+            s"Error opening block ${req.streamChunkSlice} for request from " +
+              NettyUtils.getRemoteAddress(client.getChannel),
             e)
           client.getChannel.writeAndFlush(new ChunkFetchFailure(
             req.streamChunkSlice,
             Throwables.getStackTraceAsString(e)))
-          workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
+          workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
       }
     }
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 12783265f..cddb17032 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -147,13 +147,13 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       if (isPrimary) {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.PrimaryPushDataTime,
+          WorkerSource.PRIMARY_PUSH_DATA_TIME,
           key,
           callback)
       } else {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.ReplicaPushDataTime,
+          WorkerSource.REPLICA_PUSH_DATA_TIME,
           key,
           callback)
       }
@@ -227,7 +227,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       logError(
         s"While handling PushData, throw $cause, fileWriter $fileWriter has 
exception.",
         exception)
-      workerSource.incCounter(WorkerSource.WriteDataFailCount)
+      workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
       callbackWithTimer.onFailure(new CelebornIOException(cause))
       return
     }
@@ -250,7 +250,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushData.body().release()
-            
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+            
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
             logError(
               s"PushData replication failed caused by unavailable peer for 
partitionLocation: $location")
             callbackWithTimer.onFailure(
@@ -298,13 +298,13 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
               // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
               // 3. Throw IOException by channel, convert to 
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
               if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
-                
workerSource.incCounter(WorkerSource.ReplicateDataWriteFailCount)
+                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
                 callbackWithTimer.onFailure(e)
               } else if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
-                workerSource.incCounter(WorkerSource.ReplicateDataTimeoutCount)
+                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
                 callbackWithTimer.onFailure(e)
               } else {
-                
workerSource.incCounter(WorkerSource.ReplicateDataConnectionExceptionCount)
+                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
                 callbackWithTimer.onFailure(
                   new 
CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
               }
@@ -322,7 +322,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             case e: Exception =>
               pushData.body().release()
               unavailablePeers.put(peerWorker, System.currentTimeMillis())
-              
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+              
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
               logError(
                 s"PushData replication failed during connecting peer for 
partitionLocation: $location",
                 e)
@@ -395,13 +395,13 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       if (isPrimary) {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.PrimaryPushDataTime,
+          WorkerSource.PRIMARY_PUSH_DATA_TIME,
           key,
           callback)
       } else {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.ReplicaPushDataTime,
+          WorkerSource.REPLICA_PUSH_DATA_TIME,
           key,
           callback)
       }
@@ -491,7 +491,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       logError(
         s"While handling PushMergedData, throw $cause, fileWriter 
$fileWriterWithException has exception.",
         fileWriterWithException.getException)
-      workerSource.incCounter(WorkerSource.WriteDataFailCount)
+      workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
       callbackWithTimer.onFailure(new CelebornIOException(cause))
       return
     }
@@ -512,7 +512,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushMergedData.body().release()
-            
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+            
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
             logError(
               s"PushMergedData replication failed caused by unavailable peer 
for partitionLocation: $location")
             callbackWithTimer.onFailure(
@@ -554,13 +554,13 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
               // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
               // 3. Throw IOException by channel, convert to 
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
               if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
-                
workerSource.incCounter(WorkerSource.ReplicateDataWriteFailCount)
+                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
                 callbackWithTimer.onFailure(e)
               } else if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
-                workerSource.incCounter(WorkerSource.ReplicateDataTimeoutCount)
+                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
                 callbackWithTimer.onFailure(e)
               } else {
-                
workerSource.incCounter(WorkerSource.ReplicateDataConnectionExceptionCount)
+                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
                 callbackWithTimer.onFailure(
                   new 
CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
               }
@@ -583,7 +583,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             case e: Exception =>
               pushMergedData.body().release()
               unavailablePeers.put(peerWorker, System.currentTimeMillis())
-              
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+              
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
               logError(
                 s"PushMergedData replication failed during connecting peer for 
partitionLocation: $location",
                 e)
@@ -750,9 +750,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
 
     val key = s"${pushData.requestId}"
     if (isPrimary) {
-      workerSource.startTimer(WorkerSource.PrimaryPushDataTime, key)
+      workerSource.startTimer(WorkerSource.PRIMARY_PUSH_DATA_TIME, key)
     } else {
-      workerSource.startTimer(WorkerSource.ReplicaPushDataTime, key)
+      workerSource.startTimer(WorkerSource.REPLICA_PUSH_DATA_TIME, key)
     }
 
     // find FileWriter responsible for the data
@@ -770,7 +770,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
         pushData.requestId,
         null,
         location,
-        if (isPrimary) WorkerSource.PrimaryPushDataTime else 
WorkerSource.ReplicaPushDataTime,
+        if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else 
WorkerSource.REPLICA_PUSH_DATA_TIME,
         callback)
 
     if (locationIsNull(
@@ -869,11 +869,13 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     val (workerSourcePrimary, workerSourceReplica) =
       messageType match {
         case Type.PUSH_DATA_HAND_SHAKE =>
-          (WorkerSource.PrimaryPushDataHandshakeTime, 
WorkerSource.ReplicaPushDataHandshakeTime)
+          (
+            WorkerSource.PRIMARY_PUSH_DATA_HANDSHAKE_TIME,
+            WorkerSource.REPLICA_PUSH_DATA_HANDSHAKE_TIME)
         case Type.REGION_START =>
-          (WorkerSource.PrimaryRegionStartTime, 
WorkerSource.ReplicaRegionStartTime)
+          (WorkerSource.PRIMARY_REGION_START_TIME, 
WorkerSource.REPLICA_REGION_START_TIME)
         case Type.REGION_FINISH =>
-          (WorkerSource.PrimaryRegionFinishTime, 
WorkerSource.ReplicaRegionFinishTime)
+          (WorkerSource.PRIMARY_REGION_FINISH_TIME, 
WorkerSource.REPLICA_REGION_FINISH_TIME)
         case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
 
@@ -973,18 +975,18 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       }
       messageType match {
         case Type.PUSH_DATA_HAND_SHAKE =>
-          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          workerSource.incCounter(WorkerSource.PUSH_DATA_HANDSHAKE_FAIL_COUNT)
           callback.onFailure(new CelebornIOException(
             StatusCode.PUSH_DATA_HANDSHAKE_FAIL_REPLICA,
             e))
         case Type.REGION_START =>
-          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          workerSource.incCounter(WorkerSource.REGION_START_FAIL_COUNT)
           callback.onFailure(new 
CelebornIOException(StatusCode.REGION_START_FAIL_REPLICA, e))
         case Type.REGION_FINISH =>
-          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          workerSource.incCounter(WorkerSource.REGION_FINISH_FAIL_COUNT)
           callback.onFailure(new 
CelebornIOException(StatusCode.REGION_FINISH_FAIL_REPLICA, e))
         case _ =>
-          workerSource.incCounter(WorkerSource.ReplicateDataFailCount)
+          workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_COUNT)
           if (e.isInstanceOf[CelebornIOException]) {
             callback.onFailure(e)
           } else {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9d1d9f1dc..572d03247 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -19,7 +19,7 @@ package org.apache.celeborn.service.deploy.worker
 
 import java.io.File
 import java.lang.{Long => JLong}
-import java.util.{HashMap => JHashMap, HashSet => JHashSet}
+import java.util.{HashMap => JHashMap, HashSet => JHashSet, Map => JMap}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
@@ -43,6 +43,8 @@ import 
org.apache.celeborn.common.protocol.message.ControlMessages._
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc._
 import org.apache.celeborn.common.util.{JavaUtils, ShutdownHookManager, 
ThreadUtils, Utils}
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
 import org.apache.celeborn.server.common.{HttpService, Service}
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, 
MemoryManager}
@@ -60,7 +62,7 @@ private[celeborn] class Worker(
   override val metricsSystem: MetricsSystem =
     MetricsSystem.createMetricsSystem(serviceName, conf, 
MetricsSystem.SERVLET_PATH)
 
-  val rpcEnv = RpcEnv.create(
+  val rpcEnv: RpcEnv = RpcEnv.create(
     RpcNameConstants.WORKER_SYS,
     workerArgs.host,
     workerArgs.host,
@@ -77,8 +79,8 @@ private[celeborn] class Worker(
   private val gracefulShutdown = conf.workerGracefulShutdown
   assert(
     !gracefulShutdown || (gracefulShutdown &&
-      conf.workerRpcPort != 0 && conf.workerFetchPort != 0 &&
-      conf.workerPushPort != 0 && conf.workerReplicatePort != 0),
+      conf.workerRpcPort > 0 && conf.workerFetchPort > 0 &&
+      conf.workerPushPort > 0 && conf.workerReplicatePort > 0),
     "If enable graceful shutdown, the worker should use stable server port.")
   if (gracefulShutdown) {
     try {
@@ -102,7 +104,7 @@ private[celeborn] class Worker(
 
   val storageManager = new StorageManager(conf, workerSource)
 
-  val memoryManager = MemoryManager.initialize(conf)
+  val memoryManager: MemoryManager = MemoryManager.initialize(conf)
   memoryManager.registerMemoryListener(storageManager)
 
   val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, 
workerSource)
@@ -169,7 +171,7 @@ private[celeborn] class Worker(
     val numThreads = 
conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, 
numThreads)
-    fetchHandler = new FetchHandler(transportConf)
+    fetchHandler = new FetchHandler(conf, transportConf)
     val transportContext: TransportContext =
       new TransportContext(
         transportConf,
@@ -181,23 +183,26 @@ private[celeborn] class Worker(
   }
 
   private val pushPort = pushServer.getPort
+  assert(pushPort > 0, "worker push bind port should be positive")
+
   private val fetchPort = fetchServer.getPort
-  private val replicatePort = replicateServer.getPort
+  assert(fetchPort > 0, "worker fetch bind port should be positive")
 
-  assert(pushPort > 0)
-  assert(fetchPort > 0)
-  assert(replicatePort > 0)
+  private val replicatePort = replicateServer.getPort
+  assert(replicatePort > 0, "worker replica bind port should be positive")
 
   storageManager.updateDiskInfos()
+
   // WorkerInfo's diskInfos is a reference to storageManager.diskInfos
   val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
-  storageManager.disksSnapshot().foreach { case diskInfo =>
+  storageManager.disksSnapshot().foreach { diskInfo =>
     diskInfos.put(diskInfo.mountPoint, diskInfo)
   }
 
   // need to ensure storageManager has recovered fileinfos data if enable 
graceful shutdown before retrieve consumption
-  val userResourceConsumption = JavaUtils.newConcurrentHashMap[UserIdentifier, 
ResourceConsumption](
-    storageManager.userResourceConsumptionSnapshot().asJava)
+  val userResourceConsumption: ConcurrentHashMap[UserIdentifier, 
ResourceConsumption] =
+    JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](
+      storageManager.userResourceConsumptionSnapshot().asJava)
 
   val workerInfo =
     new WorkerInfo(
@@ -211,63 +216,83 @@ private[celeborn] class Worker(
 
   // whether this Worker registered to Master successfully
   val registered = new AtomicBoolean(false)
-  val shuffleMapperAttempts = JavaUtils.newConcurrentHashMap[String, 
AtomicIntegerArray]()
-  val shufflePartitionType = JavaUtils.newConcurrentHashMap[String, 
PartitionType]
-  var shufflePushDataTimeout = JavaUtils.newConcurrentHashMap[String, Long]
+  val shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] =
+    JavaUtils.newConcurrentHashMap[String, AtomicIntegerArray]()
+  val shufflePartitionType: ConcurrentHashMap[String, PartitionType] =
+    JavaUtils.newConcurrentHashMap[String, PartitionType]
+  var shufflePushDataTimeout: ConcurrentHashMap[String, Long] =
+    JavaUtils.newConcurrentHashMap[String, Long]
   val partitionLocationInfo = new WorkerPartitionLocationInfo
 
-  val shuffleCommitInfos =
+  val shuffleCommitInfos: ConcurrentHashMap[String, ConcurrentHashMap[Long, 
CommitInfo]] =
     JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long, 
CommitInfo]]()
 
   private val masterClient = new MasterClient(rpcEnv, conf)
 
   // (workerInfo -> last connect timeout timestamp)
-  val unavailablePeers = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
+  val unavailablePeers: ConcurrentHashMap[WorkerInfo, Long] =
+    JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
 
   // Threads
-  private val forwardMessageScheduler =
+  private val forwardMessageScheduler: ScheduledExecutorService =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
   private var sendHeartbeatTask: ScheduledFuture[_] = _
-  private var checkFastfailTask: ScheduledFuture[_] = _
-  val replicateThreadPool = ThreadUtils.newDaemonCachedThreadPool(
-    "worker-replicate-data",
-    conf.workerReplicateThreads)
-  val commitThreadPool = ThreadUtils.newDaemonCachedThreadPool(
-    "Worker-CommitFiles",
-    conf.workerCommitThreads)
-  val asyncReplyPool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply")
+  private var checkFastFailTask: ScheduledFuture[_] = _
+
+  val replicateThreadPool: ThreadPoolExecutor =
+    ThreadUtils.newDaemonCachedThreadPool("worker-replicate-data", 
conf.workerReplicateThreads)
+  val commitThreadPool: ThreadPoolExecutor =
+    ThreadUtils.newDaemonCachedThreadPool("Worker-CommitFiles", 
conf.workerCommitThreads)
+  val asyncReplyPool: ScheduledExecutorService =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply")
   val timer = new HashedWheelTimer()
 
   // Configs
-  private val HEARTBEAT_MILLIS = conf.workerHeartbeatTimeout / 4
-  private val REPLICATE_FAST_FAIL_DURATION = 
conf.workerReplicateFastFailDuration
+  private val heartbeatInterval = conf.workerHeartbeatTimeout / 4
+  private val replicaFastFailDuration = conf.workerReplicateFastFailDuration
 
   private val cleanTaskQueue = new LinkedBlockingQueue[JHashSet[String]]
   var cleaner: Thread = _
 
-  workerSource.addGauge(
-    WorkerSource.RegisteredShuffleCount,
-    _ => workerInfo.getShuffleKeySet.size())
-  workerSource.addGauge(WorkerSource.SlotsAllocated, _ => 
workerInfo.allocationsInLastHour())
-  workerSource.addGauge(WorkerSource.SortMemory, _ => 
memoryManager.getSortMemoryCounter.get())
-  workerSource.addGauge(WorkerSource.SortingFiles, _ => 
partitionsSorter.getSortingCount)
-  workerSource.addGauge(WorkerSource.SortedFiles, _ => 
partitionsSorter.getSortedCount)
-  workerSource.addGauge(WorkerSource.SortedFileSize, _ => 
partitionsSorter.getSortedSize)
-  workerSource.addGauge(WorkerSource.DiskBuffer, _ => 
memoryManager.getDiskBufferCounter.get())
-  workerSource.addGauge(WorkerSource.NettyMemory, _ => 
memoryManager.getNettyUsedDirectMemory())
-  workerSource.addGauge(WorkerSource.PausePushDataCount, _ => 
memoryManager.getPausePushDataCounter)
-  workerSource.addGauge(
-    WorkerSource.PausePushDataAndReplicateCount,
-    _ => memoryManager.getPausePushDataAndReplicateCounter)
-  workerSource.addGauge(
-    WorkerSource.BufferStreamReadBuffer,
-    _ => memoryManager.getReadBufferCounter())
-  workerSource.addGauge(
-    WorkerSource.ReadBufferDispatcherRequestsLength,
-    _ => memoryManager.dispatchRequestsLength)
-  workerSource.addGauge(
-    WorkerSource.ReadBufferAllocatedCount,
-    _ => memoryManager.getAllocatedReadBuffers)
+  workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
+    workerInfo.getShuffleKeySet.size
+  }
+  workerSource.addGauge(WorkerSource.SLOTS_ALLOCATED) { () =>
+    workerInfo.allocationsInLastHour()
+  }
+  workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
+    memoryManager.getSortMemoryCounter.get()
+  }
+  workerSource.addGauge(WorkerSource.SORTING_FILES) { () =>
+    partitionsSorter.getSortingCount
+  }
+  workerSource.addGauge(WorkerSource.SORTED_FILES) { () =>
+    partitionsSorter.getSortedCount
+  }
+  workerSource.addGauge(WorkerSource.SORTED_FILE_SIZE) { () =>
+    partitionsSorter.getSortedSize
+  }
+  workerSource.addGauge(WorkerSource.DISK_BUFFER) { () =>
+    memoryManager.getDiskBufferCounter.get()
+  }
+  workerSource.addGauge(WorkerSource.NETTY_MEMORY) { () =>
+    memoryManager.getNettyUsedDirectMemory
+  }
+  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () =>
+    memoryManager.getPausePushDataCounter
+  }
+  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_COUNT) { () 
=>
+    memoryManager.getPausePushDataAndReplicateCounter
+  }
+  workerSource.addGauge(WorkerSource.BUFFER_STREAM_READ_BUFFER) { () =>
+    memoryManager.getReadBufferCounter
+  }
+  workerSource.addGauge(WorkerSource.READ_BUFFER_DISPATCHER_REQUESTS_LENGTH) { 
() =>
+    memoryManager.dispatchRequestsLength
+  }
+  workerSource.addGauge(WorkerSource.READ_BUFFER_ALLOCATED_COUNT) { () =>
+    memoryManager.getAllocatedReadBuffers
+  }
 
   private def heartbeatToMaster(): Unit = {
     val activeShuffleKeys = new JHashSet[String]()
@@ -321,26 +346,24 @@ private[celeborn] class Worker(
     // start heartbeat
     sendHeartbeatTask = forwardMessageScheduler.scheduleAtFixedRate(
       new Runnable {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          heartbeatToMaster()
-        }
+        override def run(): Unit = Utils.tryLogNonFatalError { 
heartbeatToMaster() }
       },
-      HEARTBEAT_MILLIS,
-      HEARTBEAT_MILLIS,
+      heartbeatInterval,
+      heartbeatInterval,
       TimeUnit.MILLISECONDS)
 
-    checkFastfailTask = forwardMessageScheduler.scheduleAtFixedRate(
+    checkFastFailTask = forwardMessageScheduler.scheduleAtFixedRate(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
-          unavailablePeers.entrySet().asScala.foreach { entry =>
-            if (System.currentTimeMillis() - entry.getValue > 
REPLICATE_FAST_FAIL_DURATION) {
+          unavailablePeers.entrySet().forEach { entry: JMap.Entry[WorkerInfo, 
Long] =>
+            if (System.currentTimeMillis() - entry.getValue > 
replicaFastFailDuration) {
               unavailablePeers.remove(entry.getKey)
             }
           }
         }
       },
       0,
-      REPLICATE_FAST_FAIL_DURATION,
+      replicaFastFailDuration,
       TimeUnit.MILLISECONDS)
 
     cleaner = new Thread("Cleaner") {
@@ -377,9 +400,9 @@ private[celeborn] class Worker(
         sendHeartbeatTask.cancel(true)
         sendHeartbeatTask = null
       }
-      if (checkFastfailTask != null) {
-        checkFastfailTask.cancel(true)
-        checkFastfailTask = null
+      if (checkFastFailTask != null) {
+        checkFastFailTask.cancel(true)
+        checkFastFailTask = null
       }
       forwardMessageScheduler.shutdownNow()
       replicateThreadPool.shutdownNow()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index e29fbff03..87c755517 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -26,102 +26,98 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
 
   import WorkerSource._
   // add counters
-  addCounter(WriteDataFailCount)
-  addCounter(ReplicateDataFailCount)
-  addCounter(ReplicateDataWriteFailCount)
-  addCounter(ReplicateDataCreateConnectionFailCount)
-  addCounter(ReplicateDataConnectionExceptionCount)
-  addCounter(ReplicateDataTimeoutCount)
+  addCounter(WRITE_DATA_FAIL_COUNT)
+  addCounter(REPLICATE_DATA_FAIL_COUNT)
+  addCounter(REPLICATE_DATA_WRITE_FAIL_COUNT)
+  addCounter(REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
+  addCounter(REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
+  addCounter(REPLICATE_DATA_TIMEOUT_COUNT)
 
-  addCounter(PushDataHandshakeFailCount)
-  addCounter(RegionStartFailCount)
-  addCounter(RegionFinishFailCount)
+  addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
+  addCounter(REGION_START_FAIL_COUNT)
+  addCounter(REGION_FINISH_FAIL_COUNT)
 
   // add Timers
-  addTimer(CommitFilesTime)
-  addTimer(ReserveSlotsTime)
-  addTimer(FlushDataTime)
-  addTimer(PrimaryPushDataTime)
-  addTimer(ReplicaPushDataTime)
-
-  addTimer(PrimaryPushDataHandshakeTime)
-  addTimer(ReplicaPushDataHandshakeTime)
-  addTimer(PrimaryRegionStartTime)
-  addTimer(ReplicaRegionStartTime)
-  addTimer(PrimaryRegionFinishTime)
-  addTimer(ReplicaRegionFinishTime)
-
-  addTimer(FetchChunkTime)
-  addTimer(OpenStreamTime)
-  addTimer(TakeBufferTime)
-  addTimer(SortTime)
+  addTimer(COMMIT_FILES_TIME)
+  addTimer(RESERVE_SLOTS_TIME)
+  addTimer(FLUSH_DATA_TIME)
+  addTimer(PRIMARY_PUSH_DATA_TIME)
+  addTimer(REPLICA_PUSH_DATA_TIME)
+
+  addTimer(PRIMARY_PUSH_DATA_HANDSHAKE_TIME)
+  addTimer(REPLICA_PUSH_DATA_HANDSHAKE_TIME)
+  addTimer(PRIMARY_REGION_START_TIME)
+  addTimer(REPLICA_REGION_START_TIME)
+  addTimer(PRIMARY_REGION_FINISH_TIME)
+  addTimer(REPLICA_REGION_FINISH_TIME)
+
+  addTimer(FETCH_CHUNK_TIME)
+  addTimer(OPEN_STREAM_TIME)
+  addTimer(TAKE_BUFFER_TIME)
+  addTimer(SORT_TIME)
 
   // start cleaner thread
   startCleaner()
 }
 
 object WorkerSource {
-  val CommitFilesTime = "CommitFilesTime"
-
-  val ReserveSlotsTime = "ReserveSlotsTime"
-
-  val FlushDataTime = "FlushDataTime"
-
-  val OpenStreamTime = "OpenStreamTime"
-
-  val FetchChunkTime = "FetchChunkTime"
+  val COMMIT_FILES_TIME = "CommitFilesTime"
+  val RESERVE_SLOTS_TIME = "ReserveSlotsTime"
+  val FLUSH_DATA_TIME = "FlushDataTime"
+  val OPEN_STREAM_TIME = "OpenStreamTime"
+  val FETCH_CHUNK_TIME = "FetchChunkTime"
 
   // push data
-  val PrimaryPushDataTime = "PrimaryPushDataTime"
-  val ReplicaPushDataTime = "ReplicaPushDataTime"
-  val WriteDataFailCount = "WriteDataFailCount"
-  val ReplicateDataFailCount = "ReplicateDataFailCount"
-  val ReplicateDataWriteFailCount = "ReplicateDataWriteFailCount"
-  val ReplicateDataCreateConnectionFailCount = 
"ReplicateDataCreateConnectionFailCount"
-  val ReplicateDataConnectionExceptionCount = 
"ReplicateDataConnectionExceptionCount"
-  val ReplicateDataTimeoutCount = "ReplicateDataTimeoutCount"
-  val PushDataHandshakeFailCount = "PushDataHandshakeFailCount"
-  val RegionStartFailCount = "RegionStartFailCount"
-  val RegionFinishFailCount = "RegionFinishFailCount"
-  val PrimaryPushDataHandshakeTime = "PrimaryPushDataHandshakeTime"
-  val ReplicaPushDataHandshakeTime = "ReplicaPushDataHandshakeTime"
-  val PrimaryRegionStartTime = "PrimaryRegionStartTime"
-  val ReplicaRegionStartTime = "ReplicaRegionStartTime"
-  val PrimaryRegionFinishTime = "PrimaryRegionFinishTime"
-  val ReplicaRegionFinishTime = "ReplicaRegionFinishTime"
+  val PRIMARY_PUSH_DATA_TIME = "PrimaryPushDataTime"
+  val REPLICA_PUSH_DATA_TIME = "ReplicaPushDataTime"
+  val WRITE_DATA_FAIL_COUNT = "WriteDataFailCount"
+  val REPLICATE_DATA_FAIL_COUNT = "ReplicateDataFailCount"
+  val REPLICATE_DATA_WRITE_FAIL_COUNT = "ReplicateDataWriteFailCount"
+  val REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT = 
"ReplicateDataCreateConnectionFailCount"
+  val REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT = 
"ReplicateDataConnectionExceptionCount"
+  val REPLICATE_DATA_TIMEOUT_COUNT = "ReplicateDataTimeoutCount"
+  val PUSH_DATA_HANDSHAKE_FAIL_COUNT = "PushDataHandshakeFailCount"
+  val REGION_START_FAIL_COUNT = "RegionStartFailCount"
+  val REGION_FINISH_FAIL_COUNT = "RegionFinishFailCount"
+  val PRIMARY_PUSH_DATA_HANDSHAKE_TIME = "PrimaryPushDataHandshakeTime"
+  val REPLICA_PUSH_DATA_HANDSHAKE_TIME = "ReplicaPushDataHandshakeTime"
+  val PRIMARY_REGION_START_TIME = "PrimaryRegionStartTime"
+  val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
+  val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
+  val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
 
   // flush
-  val TakeBufferTime = "TakeBufferTime"
+  val TAKE_BUFFER_TIME = "TakeBufferTime"
 
-  val RegisteredShuffleCount = "RegisteredShuffleCount"
+  val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
 
   // slots
-  val SlotsAllocated = "SlotsAllocated"
+  val SLOTS_ALLOCATED = "SlotsAllocated"
 
   // memory
-  val NettyMemory = "NettyMemory"
-  val SortTime = "SortTime"
-  val SortMemory = "SortMemory"
-  val SortingFiles = "SortingFiles"
-  val SortedFiles = "SortedFiles"
-  val SortedFileSize = "SortedFileSize"
-  val DiskBuffer = "DiskBuffer"
-  val PausePushDataCount = "PausePushData"
-  val PausePushDataAndReplicateCount = "PausePushDataAndReplicate"
-  val BufferStreamReadBuffer = "BufferStreamReadBuffer"
-  val ReadBufferDispatcherRequestsLength = "ReadBufferDispatcherRequestsLength"
-  val ReadBufferAllocatedCount = "ReadBufferAllocatedCount"
-  val CreditStreamCount = "CreditStreamCount"
-  val ActiveMapPartitionCount = "ActiveMapPartitionCount"
+  val NETTY_MEMORY = "NettyMemory"
+  val SORT_TIME = "SortTime"
+  val SORT_MEMORY = "SortMemory"
+  val SORTING_FILES = "SortingFiles"
+  val SORTED_FILES = "SortedFiles"
+  val SORTED_FILE_SIZE = "SortedFileSize"
+  val DISK_BUFFER = "DiskBuffer"
+  val PAUSE_PUSH_DATA_COUNT = "PausePushData"
+  val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate"
+  val BUFFER_STREAM_READ_BUFFER = "BufferStreamReadBuffer"
+  val READ_BUFFER_DISPATCHER_REQUESTS_LENGTH = 
"ReadBufferDispatcherRequestsLength"
+  val READ_BUFFER_ALLOCATED_COUNT = "ReadBufferAllocatedCount"
+  val CREDIT_STREAM_COUNT = "CreditStreamCount"
+  val ACTIVE_MAP_PARTITION_COUNT = "ActiveMapPartitionCount"
 
   // local device
-  val DeviceOSFreeCapacity = "DeviceOSFreeBytes"
-  val DeviceOSTotalCapacity = "DeviceOSTotalBytes"
-  val DeviceCelebornFreeCapacity = "DeviceCelebornFreeBytes"
-  val DeviceCelebornTotalCapacity = "DeviceCelebornTotalBytes"
+  val DEVICE_OS_FREE_CAPACITY = "DeviceOSFreeBytes"
+  val DEVICE_OS_TOTAL_CAPACITY = "DeviceOSTotalBytes"
+  val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
+  val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"
 
   // Congestion control
-  val PotentialConsumeSpeed = "PotentialConsumeSpeed"
-  val UserProduceSpeed = "UserProduceSpeed"
-  val WorkerConsumeSpeed = "WorkerConsumeSpeed"
+  val POTENTIAL_CONSUME_SPEED = "PotentialConsumeSpeed"
+  val USER_PRODUCE_SPEED = "UserProduceSpeed"
+  val WORKER_CONSUME_SPEED = "WorkerConsumeSpeed"
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 47b85c8fe..aeea6c232 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -28,6 +28,7 @@ import org.apache.commons.io.FileUtils
 import org.slf4j.LoggerFactory
 
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
 import org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.util.{ThreadUtils, Utils}
@@ -52,8 +53,7 @@ class LocalDeviceMonitor(
     observer: DeviceObserver,
     deviceInfos: util.Map[String, DeviceInfo],
     diskInfos: util.Map[String, DiskInfo],
-    workerSource: AbstractSource) extends DeviceMonitor {
-  val logger = LoggerFactory.getLogger(classOf[LocalDeviceMonitor])
+    workerSource: AbstractSource) extends DeviceMonitor with Logging {
 
   // (deviceName -> ObservedDevice)
   var observedDevices: util.Map[DeviceInfo, ObservedDevice] = _
@@ -71,14 +71,14 @@ class LocalDeviceMonitor(
   def init(): Unit = {
     this.observedDevices = new util.HashMap[DeviceInfo, ObservedDevice]()
     deviceInfos.asScala.filter(!_._2.deviceStatAvailable).foreach { case 
(deviceName, _) =>
-      logger.warn(s"Device monitor may not work properly on $deviceName " +
+      logWarning(s"Device monitor may not work properly on $deviceName " +
         s"because device $deviceName not exists.")
     }
-    deviceInfos.asScala.foreach(entry => {
+    deviceInfos.asScala.foreach { entry =>
       val observedDevice = new ObservedDevice(entry._2, conf, workerSource)
       observedDevice.addObserver(observer)
       observedDevices.put(entry._2, observedDevice)
-    })
+    }
     diskInfos
       .asScala
       .values
@@ -87,22 +87,18 @@ class LocalDeviceMonitor(
       .foreach { case (deviceInfo: DeviceInfo, diskInfos: List[DiskInfo]) =>
         val deviceLabel = Map("device" -> deviceInfo.name)
         def usage = DeviceMonitor.getDiskUsageInfos(diskInfos.head)
-        workerSource.addGauge(
-          s"${WorkerSource.DeviceOSTotalCapacity}",
-          _ => usage(usage.length - 5).toLong,
-          deviceLabel)
-        workerSource.addGauge(
-          s"${WorkerSource.DeviceOSFreeCapacity}",
-          _ => usage(usage.length - 3).toLong,
-          deviceLabel)
-        workerSource.addGauge(
-          s"${WorkerSource.DeviceCelebornTotalCapacity}",
-          _ => diskInfos.map(_.configuredUsableSpace).sum,
-          deviceLabel)
-        workerSource.addGauge(
-          s"${WorkerSource.DeviceCelebornFreeCapacity}",
-          _ => diskInfos.map(_.actualUsableSpace).sum,
-          deviceLabel)
+        workerSource.addGauge(WorkerSource.DEVICE_OS_TOTAL_CAPACITY, 
deviceLabel) { () =>
+          usage(usage.length - 5).toLong
+        }
+        workerSource.addGauge(WorkerSource.DEVICE_OS_FREE_CAPACITY, 
deviceLabel) { () =>
+          usage(usage.length - 3).toLong
+        }
+        workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY, 
deviceLabel) { () =>
+          diskInfos.map(_.configuredUsableSpace).sum
+        }
+        workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_FREE_CAPACITY, 
deviceLabel) { () =>
+          diskInfos.map(_.actualUsableSpace).sum
+        }
       }
   }
 
@@ -110,7 +106,7 @@ class LocalDeviceMonitor(
     diskChecker.scheduleAtFixedRate(
       new Runnable {
         override def run(): Unit = {
-          logger.debug("Device check start")
+          logDebug("Device check start")
           try {
             observedDevices.values().asScala.foreach(device => {
               val mountPoints = device.diskInfos.keySet.asScala.toList
@@ -125,7 +121,7 @@ class LocalDeviceMonitor(
               }
               val nonCriticalErrorSum = 
device.nonCriticalErrors.values().asScala.map(_.size).sum
               if (nonCriticalErrorSum > device.notifyErrorThreshold) {
-                logger.error(s"Device ${device.deviceInfo.name} has 
accumulated $nonCriticalErrorSum non-critical " +
+                logError(s"Device ${device.deviceInfo.name} has accumulated 
$nonCriticalErrorSum non-critical " +
                   s"error within the past 
${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
                   s"exceed the threshold (${device.notifyErrorThreshold}), 
device monitor will notify error to " +
                   s"observed device.")
@@ -133,18 +129,17 @@ class LocalDeviceMonitor(
                 device.notifyObserversOnError(mountPoints, 
DiskStatus.CRITICAL_ERROR)
               } else {
                 if (checkIoHang && device.ioHang()) {
-                  logger.error(s"Encounter device io hang error!" +
+                  logError(s"Encounter device io hang error!" +
                     s"${device.deviceInfo.name}, notify observers")
                   device.notifyObserversOnNonCriticalError(mountPoints, 
DiskStatus.IO_HANG)
                 } else {
                   device.diskInfos.values().asScala.foreach { diskInfo =>
                     if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, 
diskInfo)) {
-                      logger.error(
-                        s"${diskInfo.mountPoint} high_disk_usage error, notify 
observers")
+                      logError(s"${diskInfo.mountPoint} high_disk_usage error, 
notify observers")
                       
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
                     } else if (checkReadWrite &&
                       DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
-                      logger.error(s"${diskInfo.mountPoint} read-write error, 
notify observers")
+                      logError(s"${diskInfo.mountPoint} read-write error, 
notify observers")
                       // We think that if one dir in device has read-write 
problem, if possible all
                       // dirs in this device have the problem
                       device.notifyObserversOnNonCriticalError(
@@ -159,7 +154,7 @@ class LocalDeviceMonitor(
             })
           } catch {
             case t: Throwable =>
-              logger.error("Device check failed.", t)
+              logError("Device check failed.", t)
           }
         }
       },
@@ -190,7 +185,7 @@ class LocalDeviceMonitor(
       mountPoint: String,
       e: IOException,
       diskStatus: DiskStatus): Unit = {
-    logger.error(s"Receive non-critical exception, disk: $mountPoint, $e")
+    logError(s"Receive non-critical exception, disk: $mountPoint, $e")
     observedDevices.get(diskInfos.get(mountPoint).deviceInfo)
       .notifyObserversOnNonCriticalError(List(mountPoint), diskStatus)
   }
@@ -202,8 +197,7 @@ class LocalDeviceMonitor(
   }
 }
 
-object DeviceMonitor {
-  val logger = LoggerFactory.getLogger(classOf[DeviceMonitor])
+object DeviceMonitor extends Logging {
   val deviceCheckThreadPool = 
ThreadUtils.newDaemonCachedThreadPool("device-check-thread", 5)
 
   def createDeviceMonitor(
@@ -217,14 +211,14 @@ object DeviceMonitor {
         val monitor =
           new LocalDeviceMonitor(conf, deviceObserver, deviceInfos, diskInfos, 
workerSource)
         monitor.init()
-        logger.info("Device monitor init success")
+        logInfo("Device monitor init success")
         monitor
       } else {
         EmptyDeviceMonitor
       }
     } catch {
       case t: Throwable =>
-        logger.error("Device monitor init failed.", t)
+        logError("Device monitor init failed.", t)
         throw t
     }
   }
@@ -251,7 +245,7 @@ object DeviceMonitor {
       val highDiskUsage =
         freeSpace.toLong < conf.workerDiskReserveSize || 
diskInfo.actualUsableSpace <= 0
       if (highDiskUsage) {
-        logger.warn(s"${diskInfo.mountPoint} usage is above threshold." +
+        logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
           s" Disk usage(Report by 
OS):{total:${Utils.bytesToString(totalSpace.toLong)}," +
           s" free:${Utils.bytesToString(freeSpace.toLong)}, 
used_percent:$used_percent} " +
           s"usage(Report by Celeborn):{" +
@@ -305,7 +299,7 @@ object DeviceMonitor {
         }
       } catch {
         case t: Throwable =>
-          logger.error(s"Disk dir $dataDir cannot read or write", t)
+          logError(s"Disk dir $dataDir cannot read or write", t)
           true
       }
     })(false)(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 6cb872cde..e364869a3 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -62,7 +62,7 @@ abstract private[worker] class Flusher(
           while (!stopFlag.get()) {
             val task = workingQueues(index).take()
             val key = s"Flusher-$this-${rand.nextInt()}"
-            workerSource.sample(WorkerSource.FlushDataTime, key) {
+            workerSource.sample(WorkerSource.FLUSH_DATA_TIME, key) {
               if (!task.notifier.hasException) {
                 try {
                   val flushBeginTime = System.nanoTime()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
index 510f06667..b1d47b491 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
@@ -19,41 +19,41 @@ package org.apache.celeborn.service.deploy.worker.storage
 
 import java.io.File
 import java.util
-import java.util.{Set => jSet}
+import java.util.{Set => JSet}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 import scala.io.Source
 
-import org.slf4j.LoggerFactory
-
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
 import org.apache.celeborn.common.metrics.source.AbstractSource
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
 import org.apache.celeborn.common.util.JavaUtils
 
-class ObservedDevice(val deviceInfo: DeviceInfo, conf: CelebornConf, 
workerSource: AbstractSource) {
-
-  val logger = LoggerFactory.getLogger(classOf[ObservedDevice])
+class ObservedDevice(val deviceInfo: DeviceInfo, conf: CelebornConf, 
workerSource: AbstractSource)
+  extends Logging {
 
   val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
-  deviceInfo.diskInfos.foreach { case diskInfo =>
-    diskInfos.put(diskInfo.mountPoint, diskInfo)
-  }
-  val observers: jSet[DeviceObserver] = 
ConcurrentHashMap.newKeySet[DeviceObserver]()
+  deviceInfo.diskInfos.foreach { diskInfo => 
diskInfos.put(diskInfo.mountPoint, diskInfo) }
 
-  val sysBlockDir = conf.workerDiskMonitorSysBlockDir
+  val observers: JSet[DeviceObserver] = 
ConcurrentHashMap.newKeySet[DeviceObserver]()
+
+  val sysBlockDir: String = conf.workerDiskMonitorSysBlockDir
   val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
   val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
 
-  val nonCriticalErrors = JavaUtils.newConcurrentHashMap[DiskStatus, 
util.Set[Long]]()
-  val notifyErrorThreshold = conf.workerDiskMonitorNotifyErrorThreshold
-  val notifyErrorExpireTimeout = conf.workerDiskMonitorNotifyErrorExpireTimeout
+  val nonCriticalErrors: ConcurrentHashMap[DiskStatus, JSet[Long]] =
+    JavaUtils.newConcurrentHashMap[DiskStatus, JSet[Long]]()
+  val notifyErrorThreshold: Int = conf.workerDiskMonitorNotifyErrorThreshold
+  val notifyErrorExpireTimeout: Long = 
conf.workerDiskMonitorNotifyErrorExpireTimeout
 
   var lastReadComplete: Long = -1
   var lastWriteComplete: Long = -1
-  var lastReadInflight: Long = -1
-  var lastWriteInflight: Long = -1
+  var lastReadInFlight: Long = -1
+  var lastWriteInFlight: Long = -1
 
   def addObserver(observer: DeviceObserver): Unit = {
     observers.add(observer)
@@ -65,14 +65,14 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf: 
CelebornConf, workerSourc
 
   def notifyObserversOnError(mountPoints: List[String], diskStatus: 
DiskStatus): Unit =
     this.synchronized {
-      mountPoints.foreach { case mountPoint =>
+      mountPoints.foreach { mountPoint =>
         diskInfos.get(mountPoint).setStatus(diskStatus)
       }
       // observer.notifyDeviceError might remove itself from observers,
       // so we need to use tmpObservers
       val tmpObservers = new util.HashSet[DeviceObserver](observers)
-      tmpObservers.asScala.foreach { ob =>
-        mountPoints.foreach { case mountPoint =>
+      tmpObservers.forEach { ob: DeviceObserver =>
+        mountPoints.foreach { mountPoint =>
           ob.notifyError(mountPoint, diskStatus)
         }
       }
@@ -80,23 +80,21 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf: 
CelebornConf, workerSourc
 
   def notifyObserversOnNonCriticalError(mountPoints: List[String], diskStatus: 
DiskStatus): Unit =
     this.synchronized {
-      val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus, 
util.Set[Long]] {
-        override def apply(t: DiskStatus): util.Set[Long] = {
+      nonCriticalErrors.computeIfAbsent(
+        diskStatus,
+        (_: DiskStatus) => {
           val set = ConcurrentHashMap.newKeySet[Long]()
-          workerSource.addGauge(
-            s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count",
-            _ => set.size())
+          
workerSource.addGauge(s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count")
 { () =>
+            set.size()
+          }
           set
-        }
-      }
-      nonCriticalErrors.computeIfAbsent(diskStatus, 
nonCriticalErrorSetFunc).add(
-        System.currentTimeMillis())
-      mountPoints.foreach { case mountPoint =>
+        }).add(System.currentTimeMillis())
+      mountPoints.foreach { mountPoint =>
         diskInfos.get(mountPoint).setStatus(diskStatus)
       }
       val tmpObservers = new util.HashSet[DeviceObserver](observers)
-      tmpObservers.asScala.foreach { ob =>
-        mountPoints.foreach { case mountPoint =>
+      tmpObservers.forEach { ob: DeviceObserver =>
+        mountPoints.foreach { mountPoint =>
           ob.notifyNonCriticalError(mountPoint, diskStatus)
         }
       }
@@ -105,17 +103,17 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf: 
CelebornConf, workerSourc
   def notifyObserversOnHealthy(mountPoint: String): Unit = this.synchronized {
     diskInfos.get(mountPoint).setStatus(DiskStatus.HEALTHY)
     val tmpObservers = new util.HashSet[DeviceObserver](observers)
-    tmpObservers.asScala.foreach(ob => {
+    tmpObservers.forEach { ob: DeviceObserver =>
       ob.notifyHealthy(mountPoint)
-    })
+    }
   }
 
   def notifyObserversOnHighDiskUsage(mountPoint: String): Unit = 
this.synchronized {
     diskInfos.get(mountPoint).setStatus(DiskStatus.HIGH_DISK_USAGE)
     val tmpObservers = new util.HashSet[DeviceObserver](observers)
-    tmpObservers.asScala.foreach(ob => {
+    tmpObservers.forEach { ob: DeviceObserver =>
       ob.notifyHighDiskUsage(mountPoint)
-    })
+    }
   }
 
   /**
@@ -141,33 +139,33 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf: 
CelebornConf, workerSourc
         if (lastReadComplete == -1) {
           lastReadComplete = readComplete
           lastWriteComplete = writeComplete
-          lastReadInflight = readInflight
-          lastWriteInflight = writeInflight
+          lastReadInFlight = readInflight
+          lastWriteInFlight = writeInflight
           false
         } else {
           val isReadHang = lastReadComplete == readComplete &&
-            readInflight >= lastReadInflight && lastReadInflight > 0
+            readInflight >= lastReadInFlight && lastReadInFlight > 0
           val isWriteHang = lastWriteComplete == writeComplete &&
-            writeInflight >= lastWriteInflight && lastWriteInflight > 0
+            writeInflight >= lastWriteInFlight && lastWriteInFlight > 0
 
           if (isReadHang || isWriteHang) {
-            logger.info(s"Result of DeviceInfo.checkIoHang, DeviceName: 
${deviceInfo.name}" +
+            logInfo(s"Result of DeviceInfo.checkIoHang, DeviceName: 
${deviceInfo.name}" +
               s"($readComplete,$writeComplete,$readInflight,$writeInflight)\t" 
+
-              
s"($lastReadComplete,$lastWriteComplete,$lastReadInflight,$lastWriteInflight)\t"
 +
+              
s"($lastReadComplete,$lastWriteComplete,$lastReadInFlight,$lastWriteInFlight)\t"
 +
               s"Observer cnt: ${observers.size()}")
-            logger.error(s"IO Hang! ReadHang: $isReadHang, WriteHang: 
$isWriteHang")
+            logError(s"IO Hang! ReadHang: $isReadHang, WriteHang: 
$isWriteHang")
           }
 
           lastReadComplete = readComplete
           lastWriteComplete = writeComplete
-          lastReadInflight = readInflight
-          lastWriteInflight = writeInflight
+          lastReadInFlight = readInflight
+          lastWriteInFlight = writeInflight
 
           isReadHang || isWriteHang
         }
       } catch {
         case e: Exception =>
-          logger.warn(s"Encounter Exception when check IO hang for device 
${deviceInfo.name}", e)
+          logWarning(s"Encounter Exception when check IO hang for device 
${deviceInfo.name}", e)
           // we should only return true if we have direct evidence that the 
device is hang
           false
       } finally {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index fbb79fde7..f9d564079 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -116,14 +116,14 @@ public class TestCongestionController {
 
     Assert.assertTrue(
         isGaugeExist(
-            WorkerSource.UserProduceSpeed(),
+            WorkerSource.USER_PRODUCE_SPEED(),
             JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
 
     Thread.sleep(userInactiveTimeMills * 2);
 
     Assert.assertFalse(
         isGaugeExist(
-            WorkerSource.UserProduceSpeed(),
+            WorkerSource.USER_PRODUCE_SPEED(),
             JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
   }
 
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index 6a4ec4774..64091a1ee 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -131,9 +131,9 @@ public class FileWriterSuiteJ {
     MemoryManager.initialize(conf);
   }
 
-  public static void setupChunkServer(FileInfo info) throws Exception {
+  public static void setupChunkServer(FileInfo info) {
     FetchHandler handler =
-        new FetchHandler(transConf) {
+        new FetchHandler(transConf.getCelebornConf(), transConf) {
           @Override
           public StorageManager storageManager() {
             return new StorageManager(CONF, source);
@@ -210,9 +210,9 @@ public class FileWriterSuiteJ {
     final Semaphore sem = new Semaphore(0);
 
     final FetchResult res = new FetchResult();
-    res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
-    res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
-    res.buffers = Collections.synchronizedList(new 
LinkedList<ManagedBuffer>());
+    res.successChunks = Collections.synchronizedSet(new HashSet<>());
+    res.failedChunks = Collections.synchronizedSet(new HashSet<>());
+    res.buffers = Collections.synchronizedList(new LinkedList<>());
 
     ChunkReceivedCallback callback =
         new ChunkReceivedCallback() {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index 96e906f52..ab031f87d 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -392,13 +392,13 @@ class DeviceMonitorSuite extends AnyFunSuite {
       deviceMonitor2.init()
 
       val metrics1 = workerSource2.gauges().filter(
-        
_.name.startsWith(WorkerSource.DeviceOSTotalCapacity)).sortBy(_.labels("device"))
+        
_.name.startsWith(WorkerSource.DEVICE_OS_TOTAL_CAPACITY)).sortBy(_.labels("device"))
       val metrics2 = workerSource2.gauges().filter(
-        
_.name.startsWith(WorkerSource.DeviceOSFreeCapacity)).sortBy(_.labels("device"))
+        
_.name.startsWith(WorkerSource.DEVICE_OS_FREE_CAPACITY)).sortBy(_.labels("device"))
       val metrics3 = workerSource2.gauges().filter(
-        
_.name.startsWith(WorkerSource.DeviceCelebornTotalCapacity)).sortBy(_.labels("device"))
+        
_.name.startsWith(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY)).sortBy(_.labels("device"))
       val metrics4 = workerSource2.gauges().filter(
-        
_.name.startsWith(WorkerSource.DeviceCelebornFreeCapacity)).sortBy(_.labels("device"))
+        
_.name.startsWith(WorkerSource.DEVICE_CELEBORN_FREE_CAPACITY)).sortBy(_.labels("device"))
 
       assertEquals("vda", metrics1.head.labels("device"))
       assertEquals(1395864371200L, metrics1.head.gauge.getValue)


Reply via email to