This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f1bda46de [CELEBORN-1680] Introduce ShuffleFallbackCount metrics
f1bda46de is described below
commit f1bda46de40438376ad344a05e6c0c4c8eb15fbf
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 7 11:42:17 2024 +0800
[CELEBORN-1680] Introduce ShuffleFallbackCount metrics
### What changes were proposed in this pull request?
As title, introduce metrics_ShuffleFallbackCount_Value.
### Why are the changes needed?
To provide the insights that how many shuffles fallback to spark built-in
shuffle service. It is helpful for us to deprecate the ESS progressively.
Currently, we plan to set the
`celeborn.client.spark.shuffle.fallback.numPartitionsThreshold` to fallback the
shuffle with too large shuffle partitions number, for example: 50k.
In the future, we plan to limit the acceptable maximum shuffle partition
number so that the bad job would be rejected and not impact the celeborn master
health.
### Does this PR introduce _any_ user-facing change?
Yes, new metrics.
### How was this patch tested?
UT.
<img width="1188" alt="image"
src="https://github.com/user-attachments/assets/8193c12c-5dc9-4783-b64b-6a8449a1bea4">
Closes #2866 from turboFei/record_fallback.
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: mingji <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 92 ++++++++++++++++++++++
.../shuffle/celeborn/SparkShuffleManager.java | 1 +
.../celeborn/client/ApplicationHeartbeater.scala | 8 +-
.../apache/celeborn/client/LifecycleManager.scala | 5 +-
common/src/main/proto/TransportMessages.proto | 2 +
.../common/metrics/source/AbstractSource.scala | 59 ++++++++++++++
.../common/protocol/message/ControlMessages.scala | 4 +
.../apache/celeborn/common/util/PbSerDeUtils.scala | 2 +
docs/monitoring.md | 1 +
.../master/clustermeta/AbstractMetaManager.java | 8 +-
.../master/clustermeta/IMetadataHandler.java | 7 +-
.../clustermeta/SingleMasterMetaManager.java | 9 ++-
.../master/clustermeta/ha/HAMasterMetaManager.java | 8 +-
.../deploy/master/clustermeta/ha/MetaHandler.java | 7 +-
master/src/main/proto/Resource.proto | 1 +
.../celeborn/service/deploy/master/Master.scala | 8 ++
.../service/deploy/master/MasterSource.scala | 2 +
.../clustermeta/DefaultMetaSystemSuiteJ.java | 27 +++++--
.../ha/RatisMasterStatusSystemSuiteJ.java | 29 +++++--
19 files changed, 253 insertions(+), 27 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index b9297b88c..15d81a725 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1466,6 +1466,98 @@
],
"title": "metrics_LostWorkerCount_Value",
"type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "The count of shuffle fallbacks.",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 34
+ },
+ "id": 218,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr":
"metrics_ShuffleFallbackCount_Value{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ShuffleFallbackCount_Value",
+ "type": "timeseries"
}
],
"title": "Master",
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index fdb13102d..cbebde096 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -162,6 +162,7 @@ public class SparkShuffleManager implements ShuffleManager {
initializeLifecycleManager(appId);
if (fallbackPolicyRunner.applyFallbackPolicies(dependency,
lifecycleManager)) {
+ lifecycleManager.shuffleFallbackCount().increment();
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
logger.error(
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index b558df596..ee337deb6 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -37,7 +37,7 @@ class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
masterClient: MasterClient,
- shuffleMetrics: () => (Long, Long),
+ shuffleMetrics: () => ((Long, Long), Long),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
@@ -59,9 +59,10 @@ class ApplicationHeartbeater(
override def run(): Unit = {
try {
require(masterClient != null, "When sending a heartbeat, client
shouldn't be null.")
- val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
+ val ((tmpTotalWritten, tmpTotalFileCount),
tmpShuffleFallbackCount) = shuffleMetrics()
logInfo("Send app heartbeat with " +
- s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count:
$tmpTotalFileCount")
+ s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count:
$tmpTotalFileCount, " +
+ s"shuffle fallback count: $tmpShuffleFallbackCount")
// UserResourceConsumption and DiskInfo are eliminated from
WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
@@ -69,6 +70,7 @@ class ApplicationHeartbeater(
appId,
tmpTotalWritten,
tmpTotalFileCount,
+ tmpShuffleFallbackCount,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index afab1a56e..bf3ccf63d 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -23,7 +23,7 @@ import java.security.SecureRandom
import java.util
import java.util.{function, List => JList}
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
import java.util.function.{BiConsumer, Consumer}
import scala.collection.JavaConverters._
@@ -84,6 +84,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int,
Long]()
val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
+ val shuffleFallbackCount = new LongAdder()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
// shuffle id -> (partitionId -> newest PartitionLocation)
@@ -209,7 +210,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
appUniqueId,
conf,
masterClient,
- () => commitManager.commitMetrics(),
+ () => commitManager.commitMetrics() ->
shuffleFallbackCount.sumThenReset(),
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 9ed21d180..1228c1732 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -442,6 +442,7 @@ message PbHeartbeatFromApplication {
string requestId = 4;
repeated PbWorkerInfo needCheckedWorkerList = 5;
bool shouldResponse = 6;
+ int64 shuffleFallbackCount = 7;
}
message PbHeartbeatFromApplicationResponse {
@@ -674,6 +675,7 @@ message PbSnapshotMetaInfo {
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
repeated PbWorkerInfo decommissionWorkers = 17;
+ int64 shuffleTotalFallbackCount = 18;
}
message PbOpenStream {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 95562c911..f000c9052 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -39,6 +39,9 @@ case class NamedCounter(name: String, counter: Counter,
labels: Map[String, Stri
case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String,
String])
extends MetricLabels
+case class NamedMeter(name: String, meter: Meter, labels: Map[String, String])
+ extends MetricLabels
+
case class NamedHistogram(name: String, histogram: Histogram, labels:
Map[String, String])
extends MetricLabels
@@ -79,6 +82,9 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
+ protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
+ JavaUtils.newConcurrentHashMap[String, NamedMeter]()
+
def addGauge[T](
name: String,
labels: Map[String, String],
@@ -112,6 +118,33 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
addGauge(name, Map.empty[String, String], gauge)
}
+ def addMeter(
+ name: String,
+ labels: Map[String, String],
+ meter: Meter): Unit = {
+ namedMeters.putIfAbsent(
+ metricNameWithCustomizedLabels(name, labels),
+ NamedMeter(name, meter, labels ++ staticLabels))
+ }
+
+ def addMeter(
+ name: String,
+ labels: JMap[String, String],
+ meter: Meter): Unit = {
+ addMeter(name, labels.asScala.toMap, meter)
+ }
+
+ def addMeter(name: String, labels: Map[String, String] = Map.empty)(f: () =>
Long): Unit = {
+ addMeter(
+ name,
+ labels,
+ metricRegistry.meter(metricNameWithCustomizedLabels(name, labels), new
MeterSupplier(f)))
+ }
+
+ def addMeter(name: String, meter: Meter): Unit = {
+ addMeter(name, Map.empty[String, String], meter)
+ }
+
protected val namedTimers
: ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String,
Long])] =
JavaUtils.newConcurrentHashMap[String, (NamedTimer,
ConcurrentHashMap[String, Long])]()
@@ -152,6 +185,10 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedGauges.values().asScala.toList
}
+ def meters(): List[NamedMeter] = {
+ namedMeters.values().asScala.toList
+ }
+
def histograms(): List[NamedHistogram] = {
List.empty[NamedHistogram]
}
@@ -321,6 +358,22 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
updateInnerMetrics(sb.toString())
}
+ def recordMeter(nm: NamedMeter): Unit = {
+ val timestamp = System.currentTimeMillis
+ val sb = new StringBuilder
+ val label = nm.labelString
+ sb.append(s"${normalizeKey(nm.name)}Count$label ${nm.meter.getCount}
$timestamp\n")
+ sb.append(s"${normalizeKey(nm.name)}MeanRate$label ${nm.meter.getMeanRate}
$timestamp\n")
+ sb.append(
+ s"${normalizeKey(nm.name)}OneMinuteRate$label
${nm.meter.getOneMinuteRate} $timestamp\n")
+ sb.append(
+ s"${normalizeKey(nm.name)}FiveMinuteRate$label
${nm.meter.getFiveMinuteRate} $timestamp\n")
+ sb.append(
+ s"${normalizeKey(nm.name)}FifteenMinuteRate$label
${nm.meter.getFifteenMinuteRate} $timestamp\n")
+
+ updateInnerMetrics(sb.toString())
+ }
+
def recordHistogram(nh: NamedHistogram): Unit = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
@@ -377,6 +430,7 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
innerMetrics.synchronized {
counters().foreach(c => recordCounter(c))
gauges().foreach(g => recordGauge(g))
+ meters().foreach(m => recordMeter(m))
histograms().foreach(h => {
recordHistogram(h)
h.asInstanceOf[CelebornHistogram].reservoir
@@ -400,6 +454,7 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
metricsCleaner.shutdown()
namedCounters.clear()
namedGauges.clear()
+ namedMeters.clear()
namedTimers.clear()
innerMetrics.clear()
metricRegistry.removeMatching(new MetricFilter {
@@ -436,3 +491,7 @@ class TimerSupplier(val slidingWindowSize: Int)
class GaugeSupplier[T](f: () => T) extends
MetricRegistry.MetricSupplier[Gauge[_]] {
override def newMetric(): Gauge[T] = new Gauge[T] { override def getValue: T
= f() }
}
+
+class MeterSupplier(f: () => Long) extends
MetricRegistry.MetricSupplier[Meter] {
+ override def newMetric(): Meter = new Meter { override def getCount: Long =
f() }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 4a96184b6..24d3fe601 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -412,6 +412,7 @@ object ControlMessages extends Logging {
appId: String,
totalWritten: Long,
fileCount: Long,
+ shuffleFallbackCount: Long,
needCheckedWorkerList: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID,
shouldResponse: Boolean = false) extends MasterRequestMessage
@@ -809,6 +810,7 @@ object ControlMessages extends Logging {
appId,
totalWritten,
fileCount,
+ shuffleFallbackCount,
needCheckedWorkerList,
requestId,
shouldResponse) =>
@@ -817,6 +819,7 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
+ .setShuffleFallbackCount(shuffleFallbackCount)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
@@ -1209,6 +1212,7 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
+ pbHeartbeatFromApplication.getShuffleFallbackCount,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index f9f01b9a7..9a14a3dee 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -459,6 +459,7 @@ object PbSerDeUtils {
workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
+ shuffleTotalFallbackCount: java.lang.Long,
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
@@ -480,6 +481,7 @@ object PbSerDeUtils {
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true,
false)).asJava)
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
+ .setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
// appDiskUsageMetricSnapshots can have null values,
// protobuf repeated value can't support null value in list.
.addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_
!= null)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index b037f6a0f..c07c16d1e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -100,6 +100,7 @@ These metrics are exposed by Celeborn master.
| RunningApplicationCount | The count of running applications.
|
| ActiveShuffleSize | The active shuffle size of workers.
|
| ActiveShuffleFileCount | The active shuffle file count of workers.
|
+ | ShuffleFallbackCount | The count of shuffle fallbacks.
|
| WorkerCount | The count of active workers.
|
| LostWorkerCount | The count of workers in lost list.
|
| ExcludedWorkerCount | The count of workers in excluded list.
|
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index f1d8a37e8..80988cfff 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -85,6 +85,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public double unhealthyDiskRatioThreshold;
public final LongAdder partitionTotalWritten = new LongAdder();
public final LongAdder partitionTotalFileCount = new LongAdder();
+ public final LongAdder shuffleTotalFallbackCount = new LongAdder();
public AppDiskUsageMetric appDiskUsageMetric = null;
public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
@@ -139,10 +140,12 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
}
- public void updateAppHeartbeatMeta(String appId, long time, long
totalWritten, long fileCount) {
+ public void updateAppHeartbeatMeta(
+ String appId, long time, long totalWritten, long fileCount, long
shuffleFallbackCount) {
appHeartbeatTime.put(appId, time);
partitionTotalWritten.add(totalWritten);
partitionTotalFileCount.add(fileCount);
+ shuffleTotalFallbackCount.add(shuffleFallbackCount);
}
public void updateAppLostMeta(String appId) {
@@ -316,6 +319,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
new HashSet(workersMap.values()),
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
+ shuffleTotalFallbackCount.sum(),
appDiskUsageMetric.snapShots(),
appDiskUsageMetric.currentSnapShot().get(),
lostWorkers,
@@ -416,6 +420,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
+
shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
.map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
@@ -457,6 +462,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
workerLostEvents.clear();
partitionTotalWritten.reset();
partitionTotalFileCount.reset();
+ shuffleTotalFallbackCount.reset();
workerEventInfos.clear();
applicationMetas.clear();
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 9e17728d6..e765a9d4d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -39,7 +39,12 @@ public interface IMetadataHandler {
void handleBatchUnRegisterShuffles(List<String> shuffleKeys, String
requestId);
void handleAppHeartbeat(
- String appId, long totalWritten, long fileCount, long time, String
requestId);
+ String appId,
+ long totalWritten,
+ long fileCount,
+ long shuffleFallbackCount,
+ long time,
+ String requestId);
void handleAppLost(String appId, String requestId);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 228f9c3db..04c74e51e 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -73,8 +73,13 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
@Override
public void handleAppHeartbeat(
- String appId, long totalWritten, long fileCount, long time, String
requestId) {
- updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
+ String appId,
+ long totalWritten,
+ long fileCount,
+ long shuffleFallbackCount,
+ long time,
+ String requestId) {
+ updateAppHeartbeatMeta(appId, time, totalWritten, fileCount,
shuffleFallbackCount);
}
@Override
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index fd229c904..a87470d9d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -130,7 +130,12 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
@Override
public void handleAppHeartbeat(
- String appId, long totalWritten, long fileCount, long time, String
requestId) {
+ String appId,
+ long totalWritten,
+ long fileCount,
+ long shuffleFallbackCount,
+ long time,
+ String requestId) {
try {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
@@ -142,6 +147,7 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
.setTime(time)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
+ .setShuffleFallbackCount(shuffleFallbackCount)
.build())
.build());
} catch (CelebornRuntimeException e) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 4ec1dac37..626378f0e 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -141,7 +141,12 @@ public class MetaHandler {
long time = request.getAppHeartbeatRequest().getTime();
long totalWritten =
request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
- metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten,
fileCount);
+ long shuffleFallbackCount =
request.getAppHeartbeatRequest().getShuffleFallbackCount();
+ if (shuffleFallbackCount > 0) {
+ LOG.warn("{} shuffle fallbacks in app {}", shuffleFallbackCount,
appId);
+ }
+ metaSystem.updateAppHeartbeatMeta(
+ appId, time, totalWritten, fileCount, shuffleFallbackCount);
break;
case AppLost:
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index acb1d6097..1e65943a9 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -120,6 +120,7 @@ message AppHeartbeatRequest {
required int64 time = 2;
required int64 totalWritten = 3;
required int64 fileCount = 4;
+ optional int64 shuffleFallbackCount = 5;
}
message AppLostRequest {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 117b4875a..519d36019 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -261,6 +261,10 @@ private[celeborn] class Master(
}).sum()
}
+ masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
+ statusSystem.shuffleTotalFallbackCount.sum()
+ }
+
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
}
@@ -391,6 +395,7 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
+ fallbackShuffles,
needCheckedWorkerList,
requestId,
shouldResponse) =>
@@ -403,6 +408,7 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
+ fallbackShuffles,
needCheckedWorkerList,
requestId,
shouldResponse))
@@ -1094,6 +1100,7 @@ private[celeborn] class Master(
appId: String,
totalWritten: Long,
fileCount: Long,
+ shuffleFallbackCount: Long,
needCheckedWorkerList: util.List[WorkerInfo],
requestId: String,
shouldResponse: Boolean): Unit = {
@@ -1101,6 +1108,7 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
+ shuffleFallbackCount,
System.currentTimeMillis(),
requestId)
val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index b2e725244..13f35cc84 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -55,6 +55,8 @@ object MasterSource {
val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
+ val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
+
val OFFER_SLOTS_TIME = "OfferSlotsTime"
// Capacity
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index cde62c4cf..810f12d19 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -637,11 +637,11 @@ public class DefaultMetaSystemSuiteJ {
@Test
public void testHandleAppHeartbeat() {
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2));
assertEquals(2, statusSystem.appHeartbeatTime.size());
@@ -811,23 +811,23 @@ public class DefaultMetaSystemSuiteJ {
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy,
getNewReqeustId());
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
// Max size
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
- statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy,
getNewReqeustId());
// Min size
statusSystem.handleUpdatePartitionSize();
@@ -897,4 +897,15 @@ public class DefaultMetaSystemSuiteJ {
assertEquals(1, statusSystem.decommissionWorkers.size());
assertTrue(statusSystem.excludedWorkers.isEmpty());
}
+
+ @Test
+ public void testShuffleFallbackCount() {
+ statusSystem.shuffleTotalFallbackCount.reset();
+
+ Long dummy = 1235L;
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy,
getNewReqeustId());
+
+ assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+ }
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index e4170a02c..ae70ed54b 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -946,14 +946,14 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertNotNull(statusSystem);
long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
@@ -1315,23 +1315,23 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy,
getNewReqeustId());
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
// Max size
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
- statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy,
getNewReqeustId());
// Min size
statusSystem.handleUpdatePartitionSize();
@@ -1501,4 +1501,17 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
}
+
+ @Test
+ public void testShuffleFallbackCount() {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+ statusSystem.shuffleTotalFallbackCount.reset();
+
+ Long dummy = 1235L;
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy,
getNewReqeustId());
+
+ Assert.assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+ }
}