This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a1be02b4f [CELEBORN-757] Improve metrics method signature and code
style
a1be02b4f is described below
commit a1be02b4fa696ca62f8f258161aad0859aad8fd2
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 3 11:56:43 2023 +0800
[CELEBORN-757] Improve metrics method signature and code style
### What changes were proposed in this pull request?
- gauge method definition improvement. i.e.
before
```
def addGauge[T](name: String, f: Unit => T, labels: Map[String, String])
```
after
```
def addGauge[T](name: String, labels: Map[String, String])(f: () => T)
```
which improves the caller-side code style
```
addGauge(name, labels) { () =>
...
}
```
- remove unnecessary Java/Scala collection conversion. Since Scala 2.11
does not support SAM, the extra implicit function is required.
- leverage Logging to defer message evaluation
- UPPER_CASE string constants
### Why are the changes needed?
Improve code quality and performance(maybe)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1670 from pan3793/CELEBORN-757.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/common/meta/FileManagedBuffers.java | 3 +-
.../common/network/client/TransportClient.java | 2 +-
.../common/network/util/NettyMemoryMetrics.java | 16 +--
.../org/apache/celeborn/common/util/JavaUtils.java | 8 +-
.../celeborn/common/identity/UserIdentifier.scala | 6 +
.../apache/celeborn/common/meta/WorkerInfo.scala | 2 +-
.../celeborn/common/metrics/MetricLabels.scala | 8 +-
.../common/metrics/sink/PrometheusServlet.scala | 14 +-
.../common/metrics/source/AbstractSource.scala | 61 ++++-----
.../common/metrics/source/JVMCPUSource.scala | 4 +-
.../celeborn/common/metrics/source/Source.scala | 2 +-
.../celeborn/common/util/FunctionConverter.scala | 9 +-
.../metrics/source/CelebornSourceSuite.scala | 24 ++--
.../celeborn/service/deploy/master/Master.scala | 48 ++++---
.../service/deploy/master/MasterSource.scala | 18 +--
.../congestcontrol/CongestionController.java | 15 +-
.../service/deploy/worker/storage/FileWriter.java | 2 +-
.../worker/storage/PartitionFilesSorter.java | 4 +-
.../service/deploy/worker/Controller.scala | 6 +-
.../service/deploy/worker/FetchHandler.scala | 55 ++++----
.../service/deploy/worker/PushDataHandler.scala | 54 ++++----
.../celeborn/service/deploy/worker/Worker.scala | 151 ++++++++++++---------
.../service/deploy/worker/WorkerSource.scala | 148 ++++++++++----------
.../deploy/worker/storage/DeviceMonitor.scala | 64 ++++-----
.../service/deploy/worker/storage/Flusher.scala | 2 +-
.../deploy/worker/storage/ObservedDevice.scala | 88 ++++++------
.../congestcontrol/TestCongestionController.java | 4 +-
.../deploy/worker/storage/FileWriterSuiteJ.java | 10 +-
.../deploy/worker/storage/DeviceMonitorSuite.scala | 8 +-
29 files changed, 421 insertions(+), 415 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
index 72cb3ee59..41fd89049 100644
---
a/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
+++
b/common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java
@@ -41,8 +41,7 @@ public class FileManagedBuffers {
offsets[i] = chunkOffsets.get(i);
}
} else {
- offsets = new long[1];
- offsets[0] = 0;
+ offsets = new long[] {0};
}
this.conf = conf;
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 699f1ac78..00c25b56b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -241,7 +241,7 @@ public class TransportClient implements Closeable {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
- throw new IOException("Exception in sendRpcSync to:" +
this.getSocketAddress(), e);
+ throw new IOException("Exception in sendRpcSync to: " +
this.getSocketAddress(), e);
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
index 2d457f01f..35f4f14a4 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java
@@ -91,12 +91,12 @@ public class NettyMemoryMetrics {
logger.debug("setup netty metrics");
source.addGauge(
MetricRegistry.name(metricPrefix, "usedHeapMemory"),
- pooledAllocatorMetric::usedHeapMemory,
- labels);
+ labels,
+ pooledAllocatorMetric::usedHeapMemory);
source.addGauge(
MetricRegistry.name(metricPrefix, "usedDirectMemory"),
- pooledAllocatorMetric::usedDirectMemory,
- labels);
+ labels,
+ pooledAllocatorMetric::usedDirectMemory);
if (verboseMetricsEnabled) {
int directArenaIndex = 0;
for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
@@ -133,26 +133,26 @@ public class NettyMemoryMetrics {
if (returnType.equals(int.class)) {
source.addGauge(
metricName,
+ labels,
() -> {
try {
return (Integer) m.invoke(arenaMetric);
} catch (Exception e) {
return -1; // Swallow the exceptions.
}
- },
- labels);
+ });
} else if (returnType.equals(long.class)) {
source.addGauge(
metricName,
+ labels,
() -> {
try {
return (Long) m.invoke(arenaMetric);
} catch (Exception e) {
return -1L; // Swallow the exceptions.
}
- },
- labels);
+ });
}
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 727003e73..493ea4369 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -427,18 +427,18 @@ public class JavaUtils {
public static <K, V> ConcurrentHashMap<K, V> newConcurrentHashMap() {
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
- return new ConcurrentHashMap();
+ return new ConcurrentHashMap<>();
} else {
- return new ConcurrentHashMapForJDK8();
+ return new ConcurrentHashMapForJDK8<>();
}
}
public static <K, V> ConcurrentHashMap<K, V> newConcurrentHashMap(
Map<? extends K, ? extends V> m) {
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
- return new ConcurrentHashMap(m);
+ return new ConcurrentHashMap<>(m);
} else {
- return new ConcurrentHashMapForJDK8(m);
+ return new ConcurrentHashMapForJDK8<>(m);
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
b/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
index 0353ec7cd..cac8b258a 100644
---
a/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/identity/UserIdentifier.scala
@@ -17,6 +17,10 @@
package org.apache.celeborn.common.identity
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
@@ -30,6 +34,8 @@ case class UserIdentifier(tenantId: String, name: String) {
Map("tenantId" -> tenantId, "name" -> name)
}
+ def toJMap: JMap[String, String] = toMap.asJava
+
override def toString: String = {
s"`$tenantId`.`$name`"
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index e4753884e..92f6c91ed 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -120,7 +120,7 @@ class WorkerInfo(
unknownDiskSlots.remove(shuffleKey)
}
- def getShuffleKeySet(): util.HashSet[String] = this.synchronized {
+ def getShuffleKeySet: util.HashSet[String] = this.synchronized {
val shuffleKeySet = new util.HashSet[String]()
diskInfos.values().asScala.foreach { diskInfo =>
shuffleKeySet.addAll(diskInfo.getShuffleKeySet())
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
index a591d1671..0883c5e9e 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricLabels.scala
@@ -25,12 +25,6 @@ private[metrics] trait MetricLabels {
object MetricLabels {
def labelString(labels: Map[String, String]): String = {
- "{" +
- labels
- .map { case (key: String, value: String) => s"""$key="$value"""" }
- .toList
- .sorted
- .mkString(", ") +
- "}"
+ labels.map { case (k, v) => s"""$k="$v"""" }.toArray.sorted.mkString("{",
",", "}")
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
index c6356ba4e..6913457af 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
@@ -38,10 +38,8 @@ class PrometheusServlet(
new PrometheusHttpRequestHandler(servletPath, this)
}
- def getMetricsSnapshot(): String = {
- val sb = new StringBuilder()
- sources.foreach(source => sb.append(source.getMetrics()))
- sb.toString()
+ def getMetricsSnapshot: String = {
+ sources.map(_.getMetrics).mkString
}
override def start(): Unit = {}
@@ -57,10 +55,10 @@ class PrometheusHttpRequestHandler(
prometheusServlet: PrometheusServlet) extends Logging {
def handleRequest(uri: String): String = {
- uri match {
- case `path` =>
- prometheusServlet.getMetricsSnapshot()
- case _ => s"Unknown uri ${uri}!"
+ if (uri == path) {
+ prometheusServlet.getMetricsSnapshot
+ } else {
+ s"Unknown path $uri!"
}
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index d28986bb0..13a7b77cb 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -17,6 +17,7 @@
package org.apache.celeborn.common.metrics.source
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -29,6 +30,8 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.{MetricLabels,
ResettableSlidingWindowReservoir, RssHistogram, RssTimer}
import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
case class NamedCounter(name: String, counter: Counter, labels: Map[String,
String])
extends MetricLabels
@@ -51,55 +54,50 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
val metricsCollectCriticalEnabled: Boolean =
conf.metricsCollectCriticalEnabled
- final val metricsCapacity = conf.metricsCapacity
+ val metricsCapacity: Int = conf.metricsCapacity
val innerMetrics: ConcurrentLinkedQueue[String] = new
ConcurrentLinkedQueue[String]()
val timerSupplier = new TimerSupplier(metricsSlidingWindowSize)
val metricsCleaner: ScheduledExecutorService =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"worker-metrics-cleaner")
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-metrics-cleaner")
- val roleLabel = "role" -> role
- val staticLabels = conf.metricsExtraLabels + roleLabel
- val staticLabelsString = MetricLabels.labelString(staticLabels)
+ val roleLabel: (String, String) = "role" -> role
+ val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel
+ val staticLabelsString: String = MetricLabels.labelString(staticLabels)
- protected val namedGauges: java.util.List[NamedGauge[_]] =
- new java.util.ArrayList[NamedGauge[_]]()
+ protected val namedGauges: JList[NamedGauge[_]] = new
JArrayList[NamedGauge[_]]()
def addGauge[T](
name: String,
- gauge: Gauge[T],
- labels: Map[String, String]): Unit = {
+ labels: Map[String, String],
+ gauge: Gauge[T]): Unit = {
namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
}
def addGauge[T](
name: String,
- gauge: Gauge[T],
- labels: java.util.Map[String, String]): Unit = {
- addGauge(name, gauge, labels.asScala.toMap)
+ labels: JMap[String, String],
+ gauge: Gauge[T]): Unit = {
+ addGauge(name, labels.asScala.toMap, gauge)
}
- def addGauge[T](
- name: String,
- f: Unit => T,
- labels: Map[String, String]): Unit = {
+ def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: ()
=> T): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
if (!metricRegistry.getGauges.containsKey(metricNameWithLabel)) {
val supplier: MetricRegistry.MetricSupplier[Gauge[_]] = new
GaugeSupplier[T](f)
val gauge = metricRegistry.gauge(metricNameWithLabel, supplier)
- addGauge(name, gauge, labels)
+ addGauge(name, labels, gauge)
}
}
- def addGauge[T](name: String, f: Unit => T): Unit = addGauge(name, f,
Map.empty[String, String])
-
def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
- addGauge(name, gauge, Map.empty[String, String])
+ addGauge(name, Map.empty[String, String], gauge)
}
- protected val namedTimers =
+ protected val namedTimers
+ : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String,
Long])] =
JavaUtils.newConcurrentHashMap[String, (NamedTimer,
ConcurrentHashMap[String, Long])]()
def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])
@@ -107,13 +105,14 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
def addTimer(name: String, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
if (!metricRegistry.getTimers.containsKey(metricNameWithLabel)) {
- val timer =
- metricRegistry.timer(metricNameWithLabel, timerSupplier)
- namedTimers.putIfAbsent(
+ val timer = metricRegistry.timer(metricNameWithLabel, timerSupplier)
+ namedTimers.computeIfAbsent(
metricNameWithLabel,
- (
- NamedTimer(name, timer, labels ++ staticLabels),
- JavaUtils.newConcurrentHashMap[String, Long]()))
+ (_: String) => {
+ val namedTimer = NamedTimer(name, timer, labels ++ staticLabels)
+ val values = JavaUtils.newConcurrentHashMap[String, Long]()
+ (namedTimer, values)
+ })
}
}
@@ -418,10 +417,6 @@ class TimerSupplier(val slidingWindowSize: Int)
}
}
-class GaugeSupplier[T](f: Unit => T) extends
MetricRegistry.MetricSupplier[Gauge[_]] {
- override def newMetric(): Gauge[T] = {
- new Gauge[T] {
- override def getValue: T = f(())
- }
- }
+class GaugeSupplier[T](f: () => T) extends
MetricRegistry.MetricSupplier[Gauge[_]] {
+ override def newMetric(): Gauge[T] = new Gauge[T] { override def getValue: T
= f() }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
index 3b38ec2d3..1b251f0a1 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMCPUSource.scala
@@ -32,7 +32,7 @@ class JVMCPUSource(conf: CelebornConf, role: String) extends
AbstractSource(conf
import JVMCPUSource._
addGauge(
- JVMCPUTime,
+ JVM_CPU_TIME,
new Gauge[Long] {
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
val name = new ObjectName("java.lang", "type", "OperatingSystem")
@@ -51,5 +51,5 @@ class JVMCPUSource(conf: CelebornConf, role: String) extends
AbstractSource(conf
}
object JVMCPUSource {
- val JVMCPUTime = "JVMCPUTime"
+ val JVM_CPU_TIME = "JVMCPUTime"
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
index 2e02efada..eff7cf744 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala
@@ -26,6 +26,6 @@ trait Source {
def startTimer(metricsName: String, key: String): Unit
def stopTimer(metricsName: String, key: String): Unit
def incCounter(metricsName: String, incV: Long): Unit
- def getMetrics(): String
+ def getMetrics: String
def destroy(): Unit
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
index 8eef121f4..6c091cb49 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
@@ -17,6 +17,8 @@
package org.apache.celeborn.common.util
+import java.util.function.Consumer
+
import scala.language.implicitConversions
/**
@@ -24,11 +26,16 @@ import scala.language.implicitConversions
*/
object FunctionConverter {
- implicit def scalaFunctionToJava[From, To](function: (From) => To)
+ implicit def scalaFunctionToJava[From, To](function: From => To)
: java.util.function.Function[From, To] = {
new java.util.function.Function[From, To] {
override def apply(input: From): To = function(input)
}
}
+ implicit def scalaConsumerToJava[T](consumer: T => AnyVal):
java.util.function.Consumer[T] = {
+ new Consumer[T] {
+ override def accept(t: T): Unit = consumer(t)
+ }
+ }
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
index 96735c45d..89330bee9 100644
---
a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
@@ -34,8 +34,8 @@ class CelebornSourceSuite extends CelebornFunSuite {
val user1 = Map("user" -> "user1")
val user2 = Map("user" -> "user2")
val user3 = Map("user" -> "user3")
- mockSource.addGauge("Gauge1", _ => 1000)
- mockSource.addGauge("Gauge2", _ => 2000, user1)
+ mockSource.addGauge("Gauge1") { () => 1000 }
+ mockSource.addGauge("Gauge2", user1) { () => 2000 }
mockSource.addCounter("Counter1")
mockSource.addCounter("Counter2", user2)
// test operation with and without label
@@ -53,16 +53,14 @@ class CelebornSourceSuite extends CelebornFunSuite {
val res = mockSource.getMetrics()
var extraLabelsStr = extraLabels
if (extraLabels.nonEmpty) {
- extraLabelsStr = extraLabels + ", "
+ extraLabelsStr = extraLabels + ","
}
val exp1 = s"""metrics_Gauge1_Value{${extraLabelsStr}role="mock"} 1000"""
- val exp2 =
- s"""metrics_Gauge2_Value{${extraLabelsStr}role="mock", user="user1"}
2000"""
+ val exp2 =
s"""metrics_Gauge2_Value{${extraLabelsStr}role="mock",user="user1"} 2000"""
val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}role="mock"} 3000"""
- val exp4 =
- s"""metrics_Counter2_Count{${extraLabelsStr}role="mock", user="user2"}
4000"""
+ val exp4 =
s"""metrics_Counter2_Count{${extraLabelsStr}role="mock",user="user2"} 4000"""
val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}role="mock"} 1"""
- val exp6 = s"""metrics_Timer2_Count{${extraLabelsStr}role="mock",
user="user3"} 1"""
+ val exp6 =
s"""metrics_Timer2_Count{${extraLabelsStr}role="mock",user="user3"} 1"""
assert(res.contains(exp1))
assert(res.contains(exp2))
@@ -76,19 +74,19 @@ class CelebornSourceSuite extends CelebornFunSuite {
val conf = new CelebornConf()
// label's is normal
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3")
- val extraLabels = """l1="v1", l2="v2", l3="v3""""
+ val extraLabels = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels)
// labels' kv not correct
- assertThrows[IllegalArgumentException]({
+ assertThrows[IllegalArgumentException] {
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=")
- val extraLabels2 = """l1="v1", l2="v2", l3="v3""""
+ val extraLabels2 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels2)
- })
+ }
// there are spaces in labels
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3
=v3 ")
- val extraLabels3 = """l1="v1", l2="v2", l3="v3""""
+ val extraLabels3 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels3)
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1c1c1908d..41c3a9657 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -141,14 +141,16 @@ private[celeborn] class Master(
// init and register master metrics
val resourceConsumptionSource = new ResourceConsumptionSource(conf)
private val masterSource = new MasterSource(conf)
- masterSource.addGauge(
- MasterSource.RegisteredShuffleCount,
- _ => statusSystem.registeredShuffle.size())
- masterSource.addGauge(MasterSource.ExcludedWorkerCount, _ =>
statusSystem.excludedWorkers.size())
- masterSource.addGauge(MasterSource.WorkerCount, _ =>
statusSystem.workers.size())
- masterSource.addGauge(MasterSource.LostWorkerCount, _ =>
statusSystem.lostWorkers.size())
- masterSource.addGauge(MasterSource.PartitionSize, _ =>
statusSystem.estimatedPartitionSize)
- masterSource.addGauge(MasterSource.IsActiveMaster, _ => isMasterActive)
+ masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
+ statusSystem.registeredShuffle.size
+ }
+ masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () =>
+ statusSystem.excludedWorkers.size
+ }
+ masterSource.addGauge(MasterSource.WORKER_COUNT) { () =>
statusSystem.workers.size }
+ masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () =>
statusSystem.lostWorkers.size }
+ masterSource.addGauge(MasterSource.PARTITION_SIZE) { () =>
statusSystem.estimatedPartitionSize }
+ masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
metricsSystem.registerSource(resourceConsumptionSource)
metricsSystem.registerSource(masterSource)
@@ -540,7 +542,7 @@ private[celeborn] class Master(
val availableWorkers = workersAvailable()
// offer slots
val slots =
- masterSource.sample(MasterSource.OfferSlotsTime,
s"offerSlots-${Random.nextInt()}") {
+ masterSource.sample(MasterSource.OFFER_SLOTS_TIME,
s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE &&
!conf.hasHDFSStorage) {
SlotsAllocator.offerSlotsLoadAware(
@@ -710,22 +712,18 @@ private[celeborn] class Master(
userIdentifier: UserIdentifier,
context: RpcCallContext): Unit = {
- resourceConsumptionSource.addGauge(
- "diskFileCount",
- _ => computeUserResourceConsumption(userIdentifier).diskFileCount,
- userIdentifier.toMap)
- resourceConsumptionSource.addGauge(
- "diskBytesWritten",
- _ => computeUserResourceConsumption(userIdentifier).diskBytesWritten,
- userIdentifier.toMap)
- resourceConsumptionSource.addGauge(
- "hdfsFileCount",
- _ => computeUserResourceConsumption(userIdentifier).hdfsFileCount,
- userIdentifier.toMap)
- resourceConsumptionSource.addGauge(
- "hdfsBytesWritten",
- _ => computeUserResourceConsumption(userIdentifier).hdfsBytesWritten,
- userIdentifier.toMap)
+ resourceConsumptionSource.addGauge("diskFileCount", userIdentifier.toMap)
{ () =>
+ computeUserResourceConsumption(userIdentifier).diskFileCount
+ }
+ resourceConsumptionSource.addGauge("diskBytesWritten",
userIdentifier.toMap) { () =>
+ computeUserResourceConsumption(userIdentifier).diskBytesWritten
+ }
+ resourceConsumptionSource.addGauge("hdfsFileCount", userIdentifier.toMap)
{ () =>
+ computeUserResourceConsumption(userIdentifier).hdfsFileCount
+ }
+ resourceConsumptionSource.addGauge("hdfsBytesWritten",
userIdentifier.toMap) { () =>
+ computeUserResourceConsumption(userIdentifier).hdfsBytesWritten
+ }
val userResourceConsumption =
computeUserResourceConsumption(userIdentifier)
val quota = quotaManager.getQuota(userIdentifier)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 5c3862cce..cd3f1f0ea 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -21,29 +21,29 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.AbstractSource
-import org.apache.celeborn.service.deploy.master.MasterSource.OfferSlotsTime
+import org.apache.celeborn.service.deploy.master.MasterSource.OFFER_SLOTS_TIME
class MasterSource(conf: CelebornConf)
extends AbstractSource(conf, MetricsSystem.ROLE_MASTER) with Logging {
override val sourceName = s"master"
- addTimer(OfferSlotsTime)
+ addTimer(OFFER_SLOTS_TIME)
// start cleaner
startCleaner()
}
object MasterSource {
- val WorkerCount = "WorkerCount"
+ val WORKER_COUNT = "WorkerCount"
- val LostWorkerCount = "LostWorkers"
+ val LOST_WORKER_COUNT = "LostWorkers"
- val ExcludedWorkerCount = "ExcludedWorkerCount"
+ val EXCLUDED_WORKER_COUNT = "ExcludedWorkerCount"
- val RegisteredShuffleCount = "RegisteredShuffleCount"
+ val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
- val IsActiveMaster = "IsActiveMaster"
+ val IS_ACTIVE_MASTER = "IsActiveMaster"
- val PartitionSize = "PartitionSize"
+ val PARTITION_SIZE = "PartitionSize"
- val OfferSlotsTime = "OfferSlotsTime"
+ val OFFER_SLOTS_TIME = "OfferSlotsTime"
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
index 4249d896d..6b6ecbb3c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
@@ -75,10 +75,10 @@ public class CongestionController {
this::removeInactiveUsers, 0, userInactiveTimeMills,
TimeUnit.MILLISECONDS);
this.workerSource.addGauge(
- WorkerSource.PotentialConsumeSpeed(), this::getPotentialConsumeSpeed);
+ WorkerSource.POTENTIAL_CONSUME_SPEED(),
this::getPotentialConsumeSpeed);
this.workerSource.addGauge(
- WorkerSource.WorkerConsumeSpeed(),
consumedBufferStatusHub::avgBytesPerSec);
+ WorkerSource.WORKER_CONSUME_SPEED(),
consumedBufferStatusHub::avgBytesPerSec);
}
public static synchronized CongestionController initialize(
@@ -162,8 +162,7 @@ public class CongestionController {
long avgConsumeSpeed = getPotentialConsumeSpeed();
if (logger.isDebugEnabled()) {
logger.debug(
- "The user {}, produceSpeed is {},"
- + " while consumeSpeed is {}, need to congest it: {}",
+ "The user {}, produceSpeed is {}, while consumeSpeed is {}, need
to congest it: {}",
userIdentifier,
userProduceSpeed,
avgConsumeSpeed,
@@ -184,9 +183,9 @@ public class CongestionController {
BufferStatusHub bufferStatusHub = new
BufferStatusHub(sampleTimeWindowSeconds);
UserBufferInfo userInfo = new UserBufferInfo(currentTimeMillis,
bufferStatusHub);
workerSource.addGauge(
- WorkerSource.UserProduceSpeed(),
- () -> getUserProduceSpeed(userInfo),
- userIdentifier.toMap());
+ WorkerSource.USER_PRODUCE_SPEED(),
+ userIdentifier.toJMap(),
+ () -> getUserProduceSpeed(userInfo));
return userInfo;
});
@@ -241,7 +240,7 @@ public class CongestionController {
UserBufferInfo userBufferInfo = next.getValue();
if (currentTimeMillis - userBufferInfo.getTimestamp() >=
userInactiveTimeMills) {
userBufferStatuses.remove(userIdentifier);
- workerSource.removeGauge(WorkerSource.UserProduceSpeed(),
userIdentifier.toMap());
+ workerSource.removeGauge(WorkerSource.USER_PRODUCE_SPEED(),
userIdentifier.toMap());
logger.info("User {} has been expired, remove from rate limit list",
userIdentifier);
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 79c2d885b..7e831fb33 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -357,7 +357,7 @@ public abstract class FileWriter implements DeviceObserver {
String metricsName = null;
String fileAbsPath = null;
if (source.metricsCollectCriticalEnabled()) {
- metricsName = WorkerSource.TakeBufferTime();
+ metricsName = WorkerSource.TAKE_BUFFER_TIME();
fileAbsPath = fileInfo.getFilePath();
source.startTimer(metricsName, fileAbsPath);
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 9ab516408..f7963c5d6 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -525,7 +525,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
public void sort() throws InterruptedException {
- source.startTimer(WorkerSource.SortTime(), fileId);
+ source.startTimer(WorkerSource.SORT_TIME(), fileId);
try {
initializeFiles();
@@ -594,7 +594,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
sorting.remove(fileId);
}
}
- source.stopTimer(WorkerSource.SortTime(), fileId);
+ source.stopTimer(WorkerSource.SORT_TIME(), fileId);
}
private void initializeFiles() throws IOException {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 0ee3753e4..8ad590160 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -92,7 +92,7 @@ private[deploy] class Controller(
userIdentifier,
pushDataTimeout) =>
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
- workerSource.sample(WorkerSource.ReserveSlotsTime, shuffleKey) {
+ workerSource.sample(WorkerSource.RESERVE_SLOTS_TIME, shuffleKey) {
logDebug(s"Received ReserveSlots request, $shuffleKey, " +
s"primary partitions:
${primaryLocations.asScala.map(_.getUniqueId).mkString(",")}; " +
s"replica partitions:
${replicaLocations.asScala.map(_.getUniqueId).mkString(",")}.")
@@ -389,7 +389,7 @@ private[deploy] class Controller(
} else {
logInfo(s"Start commitFiles for ${shuffleKey}")
commitInfo.status = CommitInfo.COMMIT_INPROCESS
- workerSource.startTimer(WorkerSource.CommitFilesTime, shuffleKey)
+ workerSource.startTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey)
}
}
@@ -528,7 +528,7 @@ private[deploy] class Controller(
}
context.reply(response)
- workerSource.stopTimer(WorkerSource.CommitFilesTime, shuffleKey)
+ workerSource.stopTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey)
}
if (future != null) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index a7370d252..950b5208a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -26,6 +26,7 @@ import java.util.function.Consumer
import com.google.common.base.Throwables
import io.netty.util.concurrent.{Future, GenericFutureListener}
+import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{FileInfo, FileManagedBuffers}
import org.apache.celeborn.common.network.buffer.NioManagedBuffer
@@ -38,14 +39,14 @@ import org.apache.celeborn.common.protocol.PartitionType
import org.apache.celeborn.common.util.ExceptionUtils
import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager,
CreditStreamManager, PartitionFilesSorter, StorageManager}
-class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with
Logging {
- var celebornConf = conf.getCelebornConf
+class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
+ extends BaseMessageHandler with Logging {
var chunkStreamManager = new ChunkStreamManager()
val creditStreamManager = new CreditStreamManager(
- celebornConf.partitionReadBuffersMin,
- celebornConf.partitionReadBuffersMax,
- celebornConf.creditStreamThreadsPerMountpoint,
- celebornConf.readBuffersToTriggerReadMin)
+ conf.partitionReadBuffersMin,
+ conf.partitionReadBuffersMax,
+ conf.creditStreamThreadsPerMountpoint,
+ conf.readBuffersToTriggerReadMin)
var workerSource: WorkerSource = _
var storageManager: StorageManager = _
var partitionsSorter: PartitionFilesSorter = _
@@ -54,13 +55,13 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
def init(worker: Worker): Unit = {
this.workerSource = worker.workerSource
- workerSource.addGauge(
- WorkerSource.CreditStreamCount,
- _ => creditStreamManager.getStreamsCount)
+ workerSource.addGauge(WorkerSource.CREDIT_STREAM_COUNT) { () =>
+ creditStreamManager.getStreamsCount
+ }
- workerSource.addGauge(
- WorkerSource.ActiveMapPartitionCount,
- _ => creditStreamManager.getActiveMapPartitionCount)
+ workerSource.addGauge(WorkerSource.ACTIVE_MAP_PARTITION_COUNT) { () =>
+ creditStreamManager.getActiveMapPartitionCount
+ }
this.storageManager = worker.storageManager
this.partitionsSorter = worker.partitionsSorter
@@ -111,10 +112,10 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
new String(openStreamWithCredit.fileName, StandardCharsets.UTF_8))
}
// metrics start
- workerSource.startTimer(WorkerSource.OpenStreamTime, shuffleKey)
+ workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
try {
var fileInfo = getRawFileInfo(shuffleKey, fileName)
- try fileInfo.getPartitionType() match {
+ try fileInfo.getPartitionType match {
case PartitionType.REDUCE =>
val startMapIndex = msg.asInstanceOf[OpenStream].startMapIndex
val endMapIndex = msg.asInstanceOf[OpenStream].endMapIndex
@@ -134,7 +135,7 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
request.requestId,
new NioManagedBuffer(streamHandle.toByteBuffer)))
} else {
- val buffers = new FileManagedBuffers(fileInfo, conf)
+ val buffers = new FileManagedBuffers(fileInfo, transportConf)
val fetchTimeMetrics =
storageManager.getFetchTimeMetric(fileInfo.getFile)
val streamId = chunkStreamManager.registerStream(
shuffleKey,
@@ -161,8 +162,7 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
val bufferStreamHandle = new StreamHandle(streamId, 0)
client.getChannel.writeAndFlush(new RpcResponse(
request.requestId,
- new NioManagedBuffer(bufferStreamHandle
- .toByteBuffer)))
+ new NioManagedBuffer(bufferStreamHandle.toByteBuffer)))
}
}
@@ -180,12 +180,12 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
handleRpcIOException(client, request.requestId, e)
} finally {
// metrics end
- workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
+ workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
request.body().release()
}
} catch {
case ioe: IOException =>
- workerSource.stopTimer(WorkerSource.OpenStreamTime, shuffleKey)
+ workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
handleRpcIOException(client, request.requestId, ioe)
}
}
@@ -215,15 +215,14 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
s" to fetch block ${req.streamChunkSlice}")
val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
- if (chunksBeingTransferred >
celebornConf.shuffleIoMaxChunksBeingTransferred) {
+ if (chunksBeingTransferred > conf.shuffleIoMaxChunksBeingTransferred) {
val message = "Worker is too busy. The number of chunks being
transferred " +
s"$chunksBeingTransferred exceeds
celeborn.shuffle.maxChunksBeingTransferred " +
- s"${celebornConf.shuffleIoMaxChunksBeingTransferred}."
+ s"${conf.shuffleIoMaxChunksBeingTransferred}."
logError(message)
- client.getChannel.writeAndFlush(
- new ChunkFetchFailure(req.streamChunkSlice, message))
+ client.getChannel.writeAndFlush(new
ChunkFetchFailure(req.streamChunkSlice, message))
} else {
- workerSource.startTimer(WorkerSource.FetchChunkTime, req.toString)
+ workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
val fetchTimeMetric =
chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId)
val fetchBeginTime = System.nanoTime()
try {
@@ -240,19 +239,19 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
if (fetchTimeMetric != null) {
fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
}
- workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
+ workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME,
req.toString)
}
})
} catch {
case e: Exception =>
logError(
- String.format(s"Error opening block ${req.streamChunkSlice} for
request from" +
- s" ${NettyUtils.getRemoteAddress(client.getChannel)}"),
+ s"Error opening block ${req.streamChunkSlice} for request from " +
+ NettyUtils.getRemoteAddress(client.getChannel),
e)
client.getChannel.writeAndFlush(new ChunkFetchFailure(
req.streamChunkSlice,
Throwables.getStackTraceAsString(e)))
- workerSource.stopTimer(WorkerSource.FetchChunkTime, req.toString)
+ workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
}
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 12783265f..cddb17032 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -147,13 +147,13 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (isPrimary) {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.PrimaryPushDataTime,
+ WorkerSource.PRIMARY_PUSH_DATA_TIME,
key,
callback)
} else {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.ReplicaPushDataTime,
+ WorkerSource.REPLICA_PUSH_DATA_TIME,
key,
callback)
}
@@ -227,7 +227,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
logError(
s"While handling PushData, throw $cause, fileWriter $fileWriter has
exception.",
exception)
- workerSource.incCounter(WorkerSource.WriteDataFailCount)
+ workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
callbackWithTimer.onFailure(new CelebornIOException(cause))
return
}
@@ -250,7 +250,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
-
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed caused by unavailable peer for
partitionLocation: $location")
callbackWithTimer.onFailure(
@@ -298,13 +298,13 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if
(e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
-
workerSource.incCounter(WorkerSource.ReplicateDataWriteFailCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if
(e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
- workerSource.incCounter(WorkerSource.ReplicateDataTimeoutCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else {
-
workerSource.incCounter(WorkerSource.ReplicateDataConnectionExceptionCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new
CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
}
@@ -322,7 +322,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
case e: Exception =>
pushData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
-
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed during connecting peer for
partitionLocation: $location",
e)
@@ -395,13 +395,13 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (isPrimary) {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.PrimaryPushDataTime,
+ WorkerSource.PRIMARY_PUSH_DATA_TIME,
key,
callback)
} else {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.ReplicaPushDataTime,
+ WorkerSource.REPLICA_PUSH_DATA_TIME,
key,
callback)
}
@@ -491,7 +491,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
logError(
s"While handling PushMergedData, throw $cause, fileWriter
$fileWriterWithException has exception.",
fileWriterWithException.getException)
- workerSource.incCounter(WorkerSource.WriteDataFailCount)
+ workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
callbackWithTimer.onFailure(new CelebornIOException(cause))
return
}
@@ -512,7 +512,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
-
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed caused by unavailable peer
for partitionLocation: $location")
callbackWithTimer.onFailure(
@@ -554,13 +554,13 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
if
(e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
-
workerSource.incCounter(WorkerSource.ReplicateDataWriteFailCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
callbackWithTimer.onFailure(e)
} else if
(e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
- workerSource.incCounter(WorkerSource.ReplicateDataTimeoutCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
callbackWithTimer.onFailure(e)
} else {
-
workerSource.incCounter(WorkerSource.ReplicateDataConnectionExceptionCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
callbackWithTimer.onFailure(
new
CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
}
@@ -583,7 +583,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
case e: Exception =>
pushMergedData.body().release()
unavailablePeers.put(peerWorker, System.currentTimeMillis())
-
workerSource.incCounter(WorkerSource.ReplicateDataCreateConnectionFailCount)
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed during connecting peer for
partitionLocation: $location",
e)
@@ -750,9 +750,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
val key = s"${pushData.requestId}"
if (isPrimary) {
- workerSource.startTimer(WorkerSource.PrimaryPushDataTime, key)
+ workerSource.startTimer(WorkerSource.PRIMARY_PUSH_DATA_TIME, key)
} else {
- workerSource.startTimer(WorkerSource.ReplicaPushDataTime, key)
+ workerSource.startTimer(WorkerSource.REPLICA_PUSH_DATA_TIME, key)
}
// find FileWriter responsible for the data
@@ -770,7 +770,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
pushData.requestId,
null,
location,
- if (isPrimary) WorkerSource.PrimaryPushDataTime else
WorkerSource.ReplicaPushDataTime,
+ if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else
WorkerSource.REPLICA_PUSH_DATA_TIME,
callback)
if (locationIsNull(
@@ -869,11 +869,13 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
val (workerSourcePrimary, workerSourceReplica) =
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
- (WorkerSource.PrimaryPushDataHandshakeTime,
WorkerSource.ReplicaPushDataHandshakeTime)
+ (
+ WorkerSource.PRIMARY_PUSH_DATA_HANDSHAKE_TIME,
+ WorkerSource.REPLICA_PUSH_DATA_HANDSHAKE_TIME)
case Type.REGION_START =>
- (WorkerSource.PrimaryRegionStartTime,
WorkerSource.ReplicaRegionStartTime)
+ (WorkerSource.PRIMARY_REGION_START_TIME,
WorkerSource.REPLICA_REGION_START_TIME)
case Type.REGION_FINISH =>
- (WorkerSource.PrimaryRegionFinishTime,
WorkerSource.ReplicaRegionFinishTime)
+ (WorkerSource.PRIMARY_REGION_FINISH_TIME,
WorkerSource.REPLICA_REGION_FINISH_TIME)
case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
@@ -973,18 +975,18 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
}
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
- workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+ workerSource.incCounter(WorkerSource.PUSH_DATA_HANDSHAKE_FAIL_COUNT)
callback.onFailure(new CelebornIOException(
StatusCode.PUSH_DATA_HANDSHAKE_FAIL_REPLICA,
e))
case Type.REGION_START =>
- workerSource.incCounter(WorkerSource.RegionStartFailCount)
+ workerSource.incCounter(WorkerSource.REGION_START_FAIL_COUNT)
callback.onFailure(new
CelebornIOException(StatusCode.REGION_START_FAIL_REPLICA, e))
case Type.REGION_FINISH =>
- workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+ workerSource.incCounter(WorkerSource.REGION_FINISH_FAIL_COUNT)
callback.onFailure(new
CelebornIOException(StatusCode.REGION_FINISH_FAIL_REPLICA, e))
case _ =>
- workerSource.incCounter(WorkerSource.ReplicateDataFailCount)
+ workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_COUNT)
if (e.isInstanceOf[CelebornIOException]) {
callback.onFailure(e)
} else {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9d1d9f1dc..572d03247 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -19,7 +19,7 @@ package org.apache.celeborn.service.deploy.worker
import java.io.File
import java.lang.{Long => JLong}
-import java.util.{HashMap => JHashMap, HashSet => JHashSet}
+import java.util.{HashMap => JHashMap, HashSet => JHashSet, Map => JMap}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
@@ -43,6 +43,8 @@ import
org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.util.{JavaUtils, ShutdownHookManager,
ThreadUtils, Utils}
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
import org.apache.celeborn.server.common.{HttpService, Service}
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter,
MemoryManager}
@@ -60,7 +62,7 @@ private[celeborn] class Worker(
override val metricsSystem: MetricsSystem =
MetricsSystem.createMetricsSystem(serviceName, conf,
MetricsSystem.SERVLET_PATH)
- val rpcEnv = RpcEnv.create(
+ val rpcEnv: RpcEnv = RpcEnv.create(
RpcNameConstants.WORKER_SYS,
workerArgs.host,
workerArgs.host,
@@ -77,8 +79,8 @@ private[celeborn] class Worker(
private val gracefulShutdown = conf.workerGracefulShutdown
assert(
!gracefulShutdown || (gracefulShutdown &&
- conf.workerRpcPort != 0 && conf.workerFetchPort != 0 &&
- conf.workerPushPort != 0 && conf.workerReplicatePort != 0),
+ conf.workerRpcPort > 0 && conf.workerFetchPort > 0 &&
+ conf.workerPushPort > 0 && conf.workerReplicatePort > 0),
"If enable graceful shutdown, the worker should use stable server port.")
if (gracefulShutdown) {
try {
@@ -102,7 +104,7 @@ private[celeborn] class Worker(
val storageManager = new StorageManager(conf, workerSource)
- val memoryManager = MemoryManager.initialize(conf)
+ val memoryManager: MemoryManager = MemoryManager.initialize(conf)
memoryManager.registerMemoryListener(storageManager)
val partitionsSorter = new PartitionFilesSorter(memoryManager, conf,
workerSource)
@@ -169,7 +171,7 @@ private[celeborn] class Worker(
val numThreads =
conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE,
numThreads)
- fetchHandler = new FetchHandler(transportConf)
+ fetchHandler = new FetchHandler(conf, transportConf)
val transportContext: TransportContext =
new TransportContext(
transportConf,
@@ -181,23 +183,26 @@ private[celeborn] class Worker(
}
private val pushPort = pushServer.getPort
+ assert(pushPort > 0, "worker push bind port should be positive")
+
private val fetchPort = fetchServer.getPort
- private val replicatePort = replicateServer.getPort
+ assert(fetchPort > 0, "worker fetch bind port should be positive")
- assert(pushPort > 0)
- assert(fetchPort > 0)
- assert(replicatePort > 0)
+ private val replicatePort = replicateServer.getPort
+ assert(replicatePort > 0, "worker replica bind port should be positive")
storageManager.updateDiskInfos()
+
// WorkerInfo's diskInfos is a reference to storageManager.diskInfos
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
- storageManager.disksSnapshot().foreach { case diskInfo =>
+ storageManager.disksSnapshot().foreach { diskInfo =>
diskInfos.put(diskInfo.mountPoint, diskInfo)
}
// need to ensure storageManager has recovered fileinfos data if enable
graceful shutdown before retrieve consumption
- val userResourceConsumption = JavaUtils.newConcurrentHashMap[UserIdentifier,
ResourceConsumption](
- storageManager.userResourceConsumptionSnapshot().asJava)
+ val userResourceConsumption: ConcurrentHashMap[UserIdentifier,
ResourceConsumption] =
+ JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](
+ storageManager.userResourceConsumptionSnapshot().asJava)
val workerInfo =
new WorkerInfo(
@@ -211,63 +216,83 @@ private[celeborn] class Worker(
// whether this Worker registered to Master successfully
val registered = new AtomicBoolean(false)
- val shuffleMapperAttempts = JavaUtils.newConcurrentHashMap[String,
AtomicIntegerArray]()
- val shufflePartitionType = JavaUtils.newConcurrentHashMap[String,
PartitionType]
- var shufflePushDataTimeout = JavaUtils.newConcurrentHashMap[String, Long]
+ val shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] =
+ JavaUtils.newConcurrentHashMap[String, AtomicIntegerArray]()
+ val shufflePartitionType: ConcurrentHashMap[String, PartitionType] =
+ JavaUtils.newConcurrentHashMap[String, PartitionType]
+ var shufflePushDataTimeout: ConcurrentHashMap[String, Long] =
+ JavaUtils.newConcurrentHashMap[String, Long]
val partitionLocationInfo = new WorkerPartitionLocationInfo
- val shuffleCommitInfos =
+ val shuffleCommitInfos: ConcurrentHashMap[String, ConcurrentHashMap[Long,
CommitInfo]] =
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long,
CommitInfo]]()
private val masterClient = new MasterClient(rpcEnv, conf)
// (workerInfo -> last connect timeout timestamp)
- val unavailablePeers = JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
+ val unavailablePeers: ConcurrentHashMap[WorkerInfo, Long] =
+ JavaUtils.newConcurrentHashMap[WorkerInfo, Long]()
// Threads
- private val forwardMessageScheduler =
+ private val forwardMessageScheduler: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
private var sendHeartbeatTask: ScheduledFuture[_] = _
- private var checkFastfailTask: ScheduledFuture[_] = _
- val replicateThreadPool = ThreadUtils.newDaemonCachedThreadPool(
- "worker-replicate-data",
- conf.workerReplicateThreads)
- val commitThreadPool = ThreadUtils.newDaemonCachedThreadPool(
- "Worker-CommitFiles",
- conf.workerCommitThreads)
- val asyncReplyPool =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply")
+ private var checkFastFailTask: ScheduledFuture[_] = _
+
+ val replicateThreadPool: ThreadPoolExecutor =
+ ThreadUtils.newDaemonCachedThreadPool("worker-replicate-data",
conf.workerReplicateThreads)
+ val commitThreadPool: ThreadPoolExecutor =
+ ThreadUtils.newDaemonCachedThreadPool("Worker-CommitFiles",
conf.workerCommitThreads)
+ val asyncReplyPool: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply")
val timer = new HashedWheelTimer()
// Configs
- private val HEARTBEAT_MILLIS = conf.workerHeartbeatTimeout / 4
- private val REPLICATE_FAST_FAIL_DURATION =
conf.workerReplicateFastFailDuration
+ private val heartbeatInterval = conf.workerHeartbeatTimeout / 4
+ private val replicaFastFailDuration = conf.workerReplicateFastFailDuration
private val cleanTaskQueue = new LinkedBlockingQueue[JHashSet[String]]
var cleaner: Thread = _
- workerSource.addGauge(
- WorkerSource.RegisteredShuffleCount,
- _ => workerInfo.getShuffleKeySet.size())
- workerSource.addGauge(WorkerSource.SlotsAllocated, _ =>
workerInfo.allocationsInLastHour())
- workerSource.addGauge(WorkerSource.SortMemory, _ =>
memoryManager.getSortMemoryCounter.get())
- workerSource.addGauge(WorkerSource.SortingFiles, _ =>
partitionsSorter.getSortingCount)
- workerSource.addGauge(WorkerSource.SortedFiles, _ =>
partitionsSorter.getSortedCount)
- workerSource.addGauge(WorkerSource.SortedFileSize, _ =>
partitionsSorter.getSortedSize)
- workerSource.addGauge(WorkerSource.DiskBuffer, _ =>
memoryManager.getDiskBufferCounter.get())
- workerSource.addGauge(WorkerSource.NettyMemory, _ =>
memoryManager.getNettyUsedDirectMemory())
- workerSource.addGauge(WorkerSource.PausePushDataCount, _ =>
memoryManager.getPausePushDataCounter)
- workerSource.addGauge(
- WorkerSource.PausePushDataAndReplicateCount,
- _ => memoryManager.getPausePushDataAndReplicateCounter)
- workerSource.addGauge(
- WorkerSource.BufferStreamReadBuffer,
- _ => memoryManager.getReadBufferCounter())
- workerSource.addGauge(
- WorkerSource.ReadBufferDispatcherRequestsLength,
- _ => memoryManager.dispatchRequestsLength)
- workerSource.addGauge(
- WorkerSource.ReadBufferAllocatedCount,
- _ => memoryManager.getAllocatedReadBuffers)
+ workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
+ workerInfo.getShuffleKeySet.size
+ }
+ workerSource.addGauge(WorkerSource.SLOTS_ALLOCATED) { () =>
+ workerInfo.allocationsInLastHour()
+ }
+ workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
+ memoryManager.getSortMemoryCounter.get()
+ }
+ workerSource.addGauge(WorkerSource.SORTING_FILES) { () =>
+ partitionsSorter.getSortingCount
+ }
+ workerSource.addGauge(WorkerSource.SORTED_FILES) { () =>
+ partitionsSorter.getSortedCount
+ }
+ workerSource.addGauge(WorkerSource.SORTED_FILE_SIZE) { () =>
+ partitionsSorter.getSortedSize
+ }
+ workerSource.addGauge(WorkerSource.DISK_BUFFER) { () =>
+ memoryManager.getDiskBufferCounter.get()
+ }
+ workerSource.addGauge(WorkerSource.NETTY_MEMORY) { () =>
+ memoryManager.getNettyUsedDirectMemory
+ }
+ workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () =>
+ memoryManager.getPausePushDataCounter
+ }
+ workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_COUNT) { ()
=>
+ memoryManager.getPausePushDataAndReplicateCounter
+ }
+ workerSource.addGauge(WorkerSource.BUFFER_STREAM_READ_BUFFER) { () =>
+ memoryManager.getReadBufferCounter
+ }
+ workerSource.addGauge(WorkerSource.READ_BUFFER_DISPATCHER_REQUESTS_LENGTH) {
() =>
+ memoryManager.dispatchRequestsLength
+ }
+ workerSource.addGauge(WorkerSource.READ_BUFFER_ALLOCATED_COUNT) { () =>
+ memoryManager.getAllocatedReadBuffers
+ }
private def heartbeatToMaster(): Unit = {
val activeShuffleKeys = new JHashSet[String]()
@@ -321,26 +346,24 @@ private[celeborn] class Worker(
// start heartbeat
sendHeartbeatTask = forwardMessageScheduler.scheduleAtFixedRate(
new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- heartbeatToMaster()
- }
+ override def run(): Unit = Utils.tryLogNonFatalError {
heartbeatToMaster() }
},
- HEARTBEAT_MILLIS,
- HEARTBEAT_MILLIS,
+ heartbeatInterval,
+ heartbeatInterval,
TimeUnit.MILLISECONDS)
- checkFastfailTask = forwardMessageScheduler.scheduleAtFixedRate(
+ checkFastFailTask = forwardMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
- unavailablePeers.entrySet().asScala.foreach { entry =>
- if (System.currentTimeMillis() - entry.getValue >
REPLICATE_FAST_FAIL_DURATION) {
+ unavailablePeers.entrySet().forEach { entry: JMap.Entry[WorkerInfo,
Long] =>
+ if (System.currentTimeMillis() - entry.getValue >
replicaFastFailDuration) {
unavailablePeers.remove(entry.getKey)
}
}
}
},
0,
- REPLICATE_FAST_FAIL_DURATION,
+ replicaFastFailDuration,
TimeUnit.MILLISECONDS)
cleaner = new Thread("Cleaner") {
@@ -377,9 +400,9 @@ private[celeborn] class Worker(
sendHeartbeatTask.cancel(true)
sendHeartbeatTask = null
}
- if (checkFastfailTask != null) {
- checkFastfailTask.cancel(true)
- checkFastfailTask = null
+ if (checkFastFailTask != null) {
+ checkFastFailTask.cancel(true)
+ checkFastFailTask = null
}
forwardMessageScheduler.shutdownNow()
replicateThreadPool.shutdownNow()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index e29fbff03..87c755517 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -26,102 +26,98 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
import WorkerSource._
// add counters
- addCounter(WriteDataFailCount)
- addCounter(ReplicateDataFailCount)
- addCounter(ReplicateDataWriteFailCount)
- addCounter(ReplicateDataCreateConnectionFailCount)
- addCounter(ReplicateDataConnectionExceptionCount)
- addCounter(ReplicateDataTimeoutCount)
+ addCounter(WRITE_DATA_FAIL_COUNT)
+ addCounter(REPLICATE_DATA_FAIL_COUNT)
+ addCounter(REPLICATE_DATA_WRITE_FAIL_COUNT)
+ addCounter(REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
+ addCounter(REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
+ addCounter(REPLICATE_DATA_TIMEOUT_COUNT)
- addCounter(PushDataHandshakeFailCount)
- addCounter(RegionStartFailCount)
- addCounter(RegionFinishFailCount)
+ addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
+ addCounter(REGION_START_FAIL_COUNT)
+ addCounter(REGION_FINISH_FAIL_COUNT)
// add Timers
- addTimer(CommitFilesTime)
- addTimer(ReserveSlotsTime)
- addTimer(FlushDataTime)
- addTimer(PrimaryPushDataTime)
- addTimer(ReplicaPushDataTime)
-
- addTimer(PrimaryPushDataHandshakeTime)
- addTimer(ReplicaPushDataHandshakeTime)
- addTimer(PrimaryRegionStartTime)
- addTimer(ReplicaRegionStartTime)
- addTimer(PrimaryRegionFinishTime)
- addTimer(ReplicaRegionFinishTime)
-
- addTimer(FetchChunkTime)
- addTimer(OpenStreamTime)
- addTimer(TakeBufferTime)
- addTimer(SortTime)
+ addTimer(COMMIT_FILES_TIME)
+ addTimer(RESERVE_SLOTS_TIME)
+ addTimer(FLUSH_DATA_TIME)
+ addTimer(PRIMARY_PUSH_DATA_TIME)
+ addTimer(REPLICA_PUSH_DATA_TIME)
+
+ addTimer(PRIMARY_PUSH_DATA_HANDSHAKE_TIME)
+ addTimer(REPLICA_PUSH_DATA_HANDSHAKE_TIME)
+ addTimer(PRIMARY_REGION_START_TIME)
+ addTimer(REPLICA_REGION_START_TIME)
+ addTimer(PRIMARY_REGION_FINISH_TIME)
+ addTimer(REPLICA_REGION_FINISH_TIME)
+
+ addTimer(FETCH_CHUNK_TIME)
+ addTimer(OPEN_STREAM_TIME)
+ addTimer(TAKE_BUFFER_TIME)
+ addTimer(SORT_TIME)
// start cleaner thread
startCleaner()
}
object WorkerSource {
- val CommitFilesTime = "CommitFilesTime"
-
- val ReserveSlotsTime = "ReserveSlotsTime"
-
- val FlushDataTime = "FlushDataTime"
-
- val OpenStreamTime = "OpenStreamTime"
-
- val FetchChunkTime = "FetchChunkTime"
+ val COMMIT_FILES_TIME = "CommitFilesTime"
+ val RESERVE_SLOTS_TIME = "ReserveSlotsTime"
+ val FLUSH_DATA_TIME = "FlushDataTime"
+ val OPEN_STREAM_TIME = "OpenStreamTime"
+ val FETCH_CHUNK_TIME = "FetchChunkTime"
// push data
- val PrimaryPushDataTime = "PrimaryPushDataTime"
- val ReplicaPushDataTime = "ReplicaPushDataTime"
- val WriteDataFailCount = "WriteDataFailCount"
- val ReplicateDataFailCount = "ReplicateDataFailCount"
- val ReplicateDataWriteFailCount = "ReplicateDataWriteFailCount"
- val ReplicateDataCreateConnectionFailCount =
"ReplicateDataCreateConnectionFailCount"
- val ReplicateDataConnectionExceptionCount =
"ReplicateDataConnectionExceptionCount"
- val ReplicateDataTimeoutCount = "ReplicateDataTimeoutCount"
- val PushDataHandshakeFailCount = "PushDataHandshakeFailCount"
- val RegionStartFailCount = "RegionStartFailCount"
- val RegionFinishFailCount = "RegionFinishFailCount"
- val PrimaryPushDataHandshakeTime = "PrimaryPushDataHandshakeTime"
- val ReplicaPushDataHandshakeTime = "ReplicaPushDataHandshakeTime"
- val PrimaryRegionStartTime = "PrimaryRegionStartTime"
- val ReplicaRegionStartTime = "ReplicaRegionStartTime"
- val PrimaryRegionFinishTime = "PrimaryRegionFinishTime"
- val ReplicaRegionFinishTime = "ReplicaRegionFinishTime"
+ val PRIMARY_PUSH_DATA_TIME = "PrimaryPushDataTime"
+ val REPLICA_PUSH_DATA_TIME = "ReplicaPushDataTime"
+ val WRITE_DATA_FAIL_COUNT = "WriteDataFailCount"
+ val REPLICATE_DATA_FAIL_COUNT = "ReplicateDataFailCount"
+ val REPLICATE_DATA_WRITE_FAIL_COUNT = "ReplicateDataWriteFailCount"
+ val REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT =
"ReplicateDataCreateConnectionFailCount"
+ val REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT =
"ReplicateDataConnectionExceptionCount"
+ val REPLICATE_DATA_TIMEOUT_COUNT = "ReplicateDataTimeoutCount"
+ val PUSH_DATA_HANDSHAKE_FAIL_COUNT = "PushDataHandshakeFailCount"
+ val REGION_START_FAIL_COUNT = "RegionStartFailCount"
+ val REGION_FINISH_FAIL_COUNT = "RegionFinishFailCount"
+ val PRIMARY_PUSH_DATA_HANDSHAKE_TIME = "PrimaryPushDataHandshakeTime"
+ val REPLICA_PUSH_DATA_HANDSHAKE_TIME = "ReplicaPushDataHandshakeTime"
+ val PRIMARY_REGION_START_TIME = "PrimaryRegionStartTime"
+ val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
+ val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
+ val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
// flush
- val TakeBufferTime = "TakeBufferTime"
+ val TAKE_BUFFER_TIME = "TakeBufferTime"
- val RegisteredShuffleCount = "RegisteredShuffleCount"
+ val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
// slots
- val SlotsAllocated = "SlotsAllocated"
+ val SLOTS_ALLOCATED = "SlotsAllocated"
// memory
- val NettyMemory = "NettyMemory"
- val SortTime = "SortTime"
- val SortMemory = "SortMemory"
- val SortingFiles = "SortingFiles"
- val SortedFiles = "SortedFiles"
- val SortedFileSize = "SortedFileSize"
- val DiskBuffer = "DiskBuffer"
- val PausePushDataCount = "PausePushData"
- val PausePushDataAndReplicateCount = "PausePushDataAndReplicate"
- val BufferStreamReadBuffer = "BufferStreamReadBuffer"
- val ReadBufferDispatcherRequestsLength = "ReadBufferDispatcherRequestsLength"
- val ReadBufferAllocatedCount = "ReadBufferAllocatedCount"
- val CreditStreamCount = "CreditStreamCount"
- val ActiveMapPartitionCount = "ActiveMapPartitionCount"
+ val NETTY_MEMORY = "NettyMemory"
+ val SORT_TIME = "SortTime"
+ val SORT_MEMORY = "SortMemory"
+ val SORTING_FILES = "SortingFiles"
+ val SORTED_FILES = "SortedFiles"
+ val SORTED_FILE_SIZE = "SortedFileSize"
+ val DISK_BUFFER = "DiskBuffer"
+ val PAUSE_PUSH_DATA_COUNT = "PausePushData"
+ val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate"
+ val BUFFER_STREAM_READ_BUFFER = "BufferStreamReadBuffer"
+ val READ_BUFFER_DISPATCHER_REQUESTS_LENGTH =
"ReadBufferDispatcherRequestsLength"
+ val READ_BUFFER_ALLOCATED_COUNT = "ReadBufferAllocatedCount"
+ val CREDIT_STREAM_COUNT = "CreditStreamCount"
+ val ACTIVE_MAP_PARTITION_COUNT = "ActiveMapPartitionCount"
// local device
- val DeviceOSFreeCapacity = "DeviceOSFreeBytes"
- val DeviceOSTotalCapacity = "DeviceOSTotalBytes"
- val DeviceCelebornFreeCapacity = "DeviceCelebornFreeBytes"
- val DeviceCelebornTotalCapacity = "DeviceCelebornTotalBytes"
+ val DEVICE_OS_FREE_CAPACITY = "DeviceOSFreeBytes"
+ val DEVICE_OS_TOTAL_CAPACITY = "DeviceOSTotalBytes"
+ val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
+ val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"
// Congestion control
- val PotentialConsumeSpeed = "PotentialConsumeSpeed"
- val UserProduceSpeed = "UserProduceSpeed"
- val WorkerConsumeSpeed = "WorkerConsumeSpeed"
+ val POTENTIAL_CONSUME_SPEED = "PotentialConsumeSpeed"
+ val USER_PRODUCE_SPEED = "UserProduceSpeed"
+ val WORKER_CONSUME_SPEED = "WorkerConsumeSpeed"
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 47b85c8fe..aeea6c232 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -28,6 +28,7 @@ import org.apache.commons.io.FileUtils
import org.slf4j.LoggerFactory
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
@@ -52,8 +53,7 @@ class LocalDeviceMonitor(
observer: DeviceObserver,
deviceInfos: util.Map[String, DeviceInfo],
diskInfos: util.Map[String, DiskInfo],
- workerSource: AbstractSource) extends DeviceMonitor {
- val logger = LoggerFactory.getLogger(classOf[LocalDeviceMonitor])
+ workerSource: AbstractSource) extends DeviceMonitor with Logging {
// (deviceName -> ObservedDevice)
var observedDevices: util.Map[DeviceInfo, ObservedDevice] = _
@@ -71,14 +71,14 @@ class LocalDeviceMonitor(
def init(): Unit = {
this.observedDevices = new util.HashMap[DeviceInfo, ObservedDevice]()
deviceInfos.asScala.filter(!_._2.deviceStatAvailable).foreach { case
(deviceName, _) =>
- logger.warn(s"Device monitor may not work properly on $deviceName " +
+ logWarning(s"Device monitor may not work properly on $deviceName " +
s"because device $deviceName not exists.")
}
- deviceInfos.asScala.foreach(entry => {
+ deviceInfos.asScala.foreach { entry =>
val observedDevice = new ObservedDevice(entry._2, conf, workerSource)
observedDevice.addObserver(observer)
observedDevices.put(entry._2, observedDevice)
- })
+ }
diskInfos
.asScala
.values
@@ -87,22 +87,18 @@ class LocalDeviceMonitor(
.foreach { case (deviceInfo: DeviceInfo, diskInfos: List[DiskInfo]) =>
val deviceLabel = Map("device" -> deviceInfo.name)
def usage = DeviceMonitor.getDiskUsageInfos(diskInfos.head)
- workerSource.addGauge(
- s"${WorkerSource.DeviceOSTotalCapacity}",
- _ => usage(usage.length - 5).toLong,
- deviceLabel)
- workerSource.addGauge(
- s"${WorkerSource.DeviceOSFreeCapacity}",
- _ => usage(usage.length - 3).toLong,
- deviceLabel)
- workerSource.addGauge(
- s"${WorkerSource.DeviceCelebornTotalCapacity}",
- _ => diskInfos.map(_.configuredUsableSpace).sum,
- deviceLabel)
- workerSource.addGauge(
- s"${WorkerSource.DeviceCelebornFreeCapacity}",
- _ => diskInfos.map(_.actualUsableSpace).sum,
- deviceLabel)
+ workerSource.addGauge(WorkerSource.DEVICE_OS_TOTAL_CAPACITY,
deviceLabel) { () =>
+ usage(usage.length - 5).toLong
+ }
+ workerSource.addGauge(WorkerSource.DEVICE_OS_FREE_CAPACITY,
deviceLabel) { () =>
+ usage(usage.length - 3).toLong
+ }
+ workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY,
deviceLabel) { () =>
+ diskInfos.map(_.configuredUsableSpace).sum
+ }
+ workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_FREE_CAPACITY,
deviceLabel) { () =>
+ diskInfos.map(_.actualUsableSpace).sum
+ }
}
}
@@ -110,7 +106,7 @@ class LocalDeviceMonitor(
diskChecker.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = {
- logger.debug("Device check start")
+ logDebug("Device check start")
try {
observedDevices.values().asScala.foreach(device => {
val mountPoints = device.diskInfos.keySet.asScala.toList
@@ -125,7 +121,7 @@ class LocalDeviceMonitor(
}
val nonCriticalErrorSum =
device.nonCriticalErrors.values().asScala.map(_.size).sum
if (nonCriticalErrorSum > device.notifyErrorThreshold) {
- logger.error(s"Device ${device.deviceInfo.name} has
accumulated $nonCriticalErrorSum non-critical " +
+ logError(s"Device ${device.deviceInfo.name} has accumulated
$nonCriticalErrorSum non-critical " +
s"error within the past
${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
s"exceed the threshold (${device.notifyErrorThreshold}),
device monitor will notify error to " +
s"observed device.")
@@ -133,18 +129,17 @@ class LocalDeviceMonitor(
device.notifyObserversOnError(mountPoints,
DiskStatus.CRITICAL_ERROR)
} else {
if (checkIoHang && device.ioHang()) {
- logger.error(s"Encounter device io hang error!" +
+ logError(s"Encounter device io hang error!" +
s"${device.deviceInfo.name}, notify observers")
device.notifyObserversOnNonCriticalError(mountPoints,
DiskStatus.IO_HANG)
} else {
device.diskInfos.values().asScala.foreach { diskInfo =>
if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf,
diskInfo)) {
- logger.error(
- s"${diskInfo.mountPoint} high_disk_usage error, notify
observers")
+ logError(s"${diskInfo.mountPoint} high_disk_usage error,
notify observers")
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
} else if (checkReadWrite &&
DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
- logger.error(s"${diskInfo.mountPoint} read-write error,
notify observers")
+ logError(s"${diskInfo.mountPoint} read-write error,
notify observers")
// We think that if one dir in device has read-write
problem, if possible all
// dirs in this device have the problem
device.notifyObserversOnNonCriticalError(
@@ -159,7 +154,7 @@ class LocalDeviceMonitor(
})
} catch {
case t: Throwable =>
- logger.error("Device check failed.", t)
+ logError("Device check failed.", t)
}
}
},
@@ -190,7 +185,7 @@ class LocalDeviceMonitor(
mountPoint: String,
e: IOException,
diskStatus: DiskStatus): Unit = {
- logger.error(s"Receive non-critical exception, disk: $mountPoint, $e")
+ logError(s"Receive non-critical exception, disk: $mountPoint, $e")
observedDevices.get(diskInfos.get(mountPoint).deviceInfo)
.notifyObserversOnNonCriticalError(List(mountPoint), diskStatus)
}
@@ -202,8 +197,7 @@ class LocalDeviceMonitor(
}
}
-object DeviceMonitor {
- val logger = LoggerFactory.getLogger(classOf[DeviceMonitor])
+object DeviceMonitor extends Logging {
val deviceCheckThreadPool =
ThreadUtils.newDaemonCachedThreadPool("device-check-thread", 5)
def createDeviceMonitor(
@@ -217,14 +211,14 @@ object DeviceMonitor {
val monitor =
new LocalDeviceMonitor(conf, deviceObserver, deviceInfos, diskInfos,
workerSource)
monitor.init()
- logger.info("Device monitor init success")
+ logInfo("Device monitor init success")
monitor
} else {
EmptyDeviceMonitor
}
} catch {
case t: Throwable =>
- logger.error("Device monitor init failed.", t)
+ logError("Device monitor init failed.", t)
throw t
}
}
@@ -251,7 +245,7 @@ object DeviceMonitor {
val highDiskUsage =
freeSpace.toLong < conf.workerDiskReserveSize ||
diskInfo.actualUsableSpace <= 0
if (highDiskUsage) {
- logger.warn(s"${diskInfo.mountPoint} usage is above threshold." +
+ logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
s" Disk usage(Report by
OS):{total:${Utils.bytesToString(totalSpace.toLong)}," +
s" free:${Utils.bytesToString(freeSpace.toLong)},
used_percent:$used_percent} " +
s"usage(Report by Celeborn):{" +
@@ -305,7 +299,7 @@ object DeviceMonitor {
}
} catch {
case t: Throwable =>
- logger.error(s"Disk dir $dataDir cannot read or write", t)
+ logError(s"Disk dir $dataDir cannot read or write", t)
true
}
})(false)(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 6cb872cde..e364869a3 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -62,7 +62,7 @@ abstract private[worker] class Flusher(
while (!stopFlag.get()) {
val task = workingQueues(index).take()
val key = s"Flusher-$this-${rand.nextInt()}"
- workerSource.sample(WorkerSource.FlushDataTime, key) {
+ workerSource.sample(WorkerSource.FLUSH_DATA_TIME, key) {
if (!task.notifier.hasException) {
try {
val flushBeginTime = System.nanoTime()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
index 510f06667..b1d47b491 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
@@ -19,41 +19,41 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.io.File
import java.util
-import java.util.{Set => jSet}
+import java.util.{Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.io.Source
-import org.slf4j.LoggerFactory
-
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import org.apache.celeborn.common.metrics.source.AbstractSource
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
import org.apache.celeborn.common.util.JavaUtils
-class ObservedDevice(val deviceInfo: DeviceInfo, conf: CelebornConf,
workerSource: AbstractSource) {
-
- val logger = LoggerFactory.getLogger(classOf[ObservedDevice])
+class ObservedDevice(val deviceInfo: DeviceInfo, conf: CelebornConf,
workerSource: AbstractSource)
+ extends Logging {
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
- deviceInfo.diskInfos.foreach { case diskInfo =>
- diskInfos.put(diskInfo.mountPoint, diskInfo)
- }
- val observers: jSet[DeviceObserver] =
ConcurrentHashMap.newKeySet[DeviceObserver]()
+ deviceInfo.diskInfos.foreach { diskInfo =>
diskInfos.put(diskInfo.mountPoint, diskInfo) }
- val sysBlockDir = conf.workerDiskMonitorSysBlockDir
+ val observers: JSet[DeviceObserver] =
ConcurrentHashMap.newKeySet[DeviceObserver]()
+
+ val sysBlockDir: String = conf.workerDiskMonitorSysBlockDir
val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
- val nonCriticalErrors = JavaUtils.newConcurrentHashMap[DiskStatus,
util.Set[Long]]()
- val notifyErrorThreshold = conf.workerDiskMonitorNotifyErrorThreshold
- val notifyErrorExpireTimeout = conf.workerDiskMonitorNotifyErrorExpireTimeout
+ val nonCriticalErrors: ConcurrentHashMap[DiskStatus, JSet[Long]] =
+ JavaUtils.newConcurrentHashMap[DiskStatus, JSet[Long]]()
+ val notifyErrorThreshold: Int = conf.workerDiskMonitorNotifyErrorThreshold
+ val notifyErrorExpireTimeout: Long =
conf.workerDiskMonitorNotifyErrorExpireTimeout
var lastReadComplete: Long = -1
var lastWriteComplete: Long = -1
- var lastReadInflight: Long = -1
- var lastWriteInflight: Long = -1
+ var lastReadInFlight: Long = -1
+ var lastWriteInFlight: Long = -1
def addObserver(observer: DeviceObserver): Unit = {
observers.add(observer)
@@ -65,14 +65,14 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf:
CelebornConf, workerSourc
def notifyObserversOnError(mountPoints: List[String], diskStatus:
DiskStatus): Unit =
this.synchronized {
- mountPoints.foreach { case mountPoint =>
+ mountPoints.foreach { mountPoint =>
diskInfos.get(mountPoint).setStatus(diskStatus)
}
// observer.notifyDeviceError might remove itself from observers,
// so we need to use tmpObservers
val tmpObservers = new util.HashSet[DeviceObserver](observers)
- tmpObservers.asScala.foreach { ob =>
- mountPoints.foreach { case mountPoint =>
+ tmpObservers.forEach { ob: DeviceObserver =>
+ mountPoints.foreach { mountPoint =>
ob.notifyError(mountPoint, diskStatus)
}
}
@@ -80,23 +80,21 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf:
CelebornConf, workerSourc
def notifyObserversOnNonCriticalError(mountPoints: List[String], diskStatus:
DiskStatus): Unit =
this.synchronized {
- val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus,
util.Set[Long]] {
- override def apply(t: DiskStatus): util.Set[Long] = {
+ nonCriticalErrors.computeIfAbsent(
+ diskStatus,
+ (_: DiskStatus) => {
val set = ConcurrentHashMap.newKeySet[Long]()
- workerSource.addGauge(
- s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count",
- _ => set.size())
+
workerSource.addGauge(s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count")
{ () =>
+ set.size()
+ }
set
- }
- }
- nonCriticalErrors.computeIfAbsent(diskStatus,
nonCriticalErrorSetFunc).add(
- System.currentTimeMillis())
- mountPoints.foreach { case mountPoint =>
+ }).add(System.currentTimeMillis())
+ mountPoints.foreach { mountPoint =>
diskInfos.get(mountPoint).setStatus(diskStatus)
}
val tmpObservers = new util.HashSet[DeviceObserver](observers)
- tmpObservers.asScala.foreach { ob =>
- mountPoints.foreach { case mountPoint =>
+ tmpObservers.forEach { ob: DeviceObserver =>
+ mountPoints.foreach { mountPoint =>
ob.notifyNonCriticalError(mountPoint, diskStatus)
}
}
@@ -105,17 +103,17 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf:
CelebornConf, workerSourc
def notifyObserversOnHealthy(mountPoint: String): Unit = this.synchronized {
diskInfos.get(mountPoint).setStatus(DiskStatus.HEALTHY)
val tmpObservers = new util.HashSet[DeviceObserver](observers)
- tmpObservers.asScala.foreach(ob => {
+ tmpObservers.forEach { ob: DeviceObserver =>
ob.notifyHealthy(mountPoint)
- })
+ }
}
def notifyObserversOnHighDiskUsage(mountPoint: String): Unit =
this.synchronized {
diskInfos.get(mountPoint).setStatus(DiskStatus.HIGH_DISK_USAGE)
val tmpObservers = new util.HashSet[DeviceObserver](observers)
- tmpObservers.asScala.foreach(ob => {
+ tmpObservers.forEach { ob: DeviceObserver =>
ob.notifyHighDiskUsage(mountPoint)
- })
+ }
}
/**
@@ -141,33 +139,33 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf:
CelebornConf, workerSourc
if (lastReadComplete == -1) {
lastReadComplete = readComplete
lastWriteComplete = writeComplete
- lastReadInflight = readInflight
- lastWriteInflight = writeInflight
+ lastReadInFlight = readInflight
+ lastWriteInFlight = writeInflight
false
} else {
val isReadHang = lastReadComplete == readComplete &&
- readInflight >= lastReadInflight && lastReadInflight > 0
+ readInflight >= lastReadInFlight && lastReadInFlight > 0
val isWriteHang = lastWriteComplete == writeComplete &&
- writeInflight >= lastWriteInflight && lastWriteInflight > 0
+ writeInflight >= lastWriteInFlight && lastWriteInFlight > 0
if (isReadHang || isWriteHang) {
- logger.info(s"Result of DeviceInfo.checkIoHang, DeviceName:
${deviceInfo.name}" +
+ logInfo(s"Result of DeviceInfo.checkIoHang, DeviceName:
${deviceInfo.name}" +
s"($readComplete,$writeComplete,$readInflight,$writeInflight)\t"
+
-
s"($lastReadComplete,$lastWriteComplete,$lastReadInflight,$lastWriteInflight)\t"
+
+
s"($lastReadComplete,$lastWriteComplete,$lastReadInFlight,$lastWriteInFlight)\t"
+
s"Observer cnt: ${observers.size()}")
- logger.error(s"IO Hang! ReadHang: $isReadHang, WriteHang:
$isWriteHang")
+ logError(s"IO Hang! ReadHang: $isReadHang, WriteHang:
$isWriteHang")
}
lastReadComplete = readComplete
lastWriteComplete = writeComplete
- lastReadInflight = readInflight
- lastWriteInflight = writeInflight
+ lastReadInFlight = readInflight
+ lastWriteInFlight = writeInflight
isReadHang || isWriteHang
}
} catch {
case e: Exception =>
- logger.warn(s"Encounter Exception when check IO hang for device
${deviceInfo.name}", e)
+ logWarning(s"Encounter Exception when check IO hang for device
${deviceInfo.name}", e)
// we should only return true if we have direct evidence that the
device is hang
false
} finally {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index fbb79fde7..f9d564079 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -116,14 +116,14 @@ public class TestCongestionController {
Assert.assertTrue(
isGaugeExist(
- WorkerSource.UserProduceSpeed(),
+ WorkerSource.USER_PRODUCE_SPEED(),
JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
Thread.sleep(userInactiveTimeMills * 2);
Assert.assertFalse(
isGaugeExist(
- WorkerSource.UserProduceSpeed(),
+ WorkerSource.USER_PRODUCE_SPEED(),
JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index 6a4ec4774..64091a1ee 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -131,9 +131,9 @@ public class FileWriterSuiteJ {
MemoryManager.initialize(conf);
}
- public static void setupChunkServer(FileInfo info) throws Exception {
+ public static void setupChunkServer(FileInfo info) {
FetchHandler handler =
- new FetchHandler(transConf) {
+ new FetchHandler(transConf.getCelebornConf(), transConf) {
@Override
public StorageManager storageManager() {
return new StorageManager(CONF, source);
@@ -210,9 +210,9 @@ public class FileWriterSuiteJ {
final Semaphore sem = new Semaphore(0);
final FetchResult res = new FetchResult();
- res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
- res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
- res.buffers = Collections.synchronizedList(new
LinkedList<ManagedBuffer>());
+ res.successChunks = Collections.synchronizedSet(new HashSet<>());
+ res.failedChunks = Collections.synchronizedSet(new HashSet<>());
+ res.buffers = Collections.synchronizedList(new LinkedList<>());
ChunkReceivedCallback callback =
new ChunkReceivedCallback() {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index 96e906f52..ab031f87d 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -392,13 +392,13 @@ class DeviceMonitorSuite extends AnyFunSuite {
deviceMonitor2.init()
val metrics1 = workerSource2.gauges().filter(
-
_.name.startsWith(WorkerSource.DeviceOSTotalCapacity)).sortBy(_.labels("device"))
+
_.name.startsWith(WorkerSource.DEVICE_OS_TOTAL_CAPACITY)).sortBy(_.labels("device"))
val metrics2 = workerSource2.gauges().filter(
-
_.name.startsWith(WorkerSource.DeviceOSFreeCapacity)).sortBy(_.labels("device"))
+
_.name.startsWith(WorkerSource.DEVICE_OS_FREE_CAPACITY)).sortBy(_.labels("device"))
val metrics3 = workerSource2.gauges().filter(
-
_.name.startsWith(WorkerSource.DeviceCelebornTotalCapacity)).sortBy(_.labels("device"))
+
_.name.startsWith(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY)).sortBy(_.labels("device"))
val metrics4 = workerSource2.gauges().filter(
-
_.name.startsWith(WorkerSource.DeviceCelebornFreeCapacity)).sortBy(_.labels("device"))
+
_.name.startsWith(WorkerSource.DEVICE_CELEBORN_FREE_CAPACITY)).sortBy(_.labels("device"))
assertEquals("vda", metrics1.head.labels("device"))
assertEquals(1395864371200L, metrics1.head.gauge.getValue)