This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f1bda46de [CELEBORN-1680] Introduce ShuffleFallbackCount metrics
f1bda46de is described below

commit f1bda46de40438376ad344a05e6c0c4c8eb15fbf
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 7 11:42:17 2024 +0800

    [CELEBORN-1680] Introduce ShuffleFallbackCount metrics
    
    ### What changes were proposed in this pull request?
    
    As title, introduce metrics_ShuffleFallbackCount_Value.
    
    ### Why are the changes needed?
    To provide the insights that how many shuffles fallback to spark built-in 
shuffle service. It is helpful for us  to deprecate the ESS progressively.
    
    Currently, we plan to set the 
`celeborn.client.spark.shuffle.fallback.numPartitionsThreshold` to fallback the 
shuffle with too large shuffle partitions number, for example: 50k.
    
    In the future, we plan to limit the acceptable maximum shuffle partition 
number so that the bad job would be rejected and not impact the celeborn master 
health.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, new metrics.
    
    ### How was this patch tested?
    UT.
    <img width="1188" alt="image" 
src="https://github.com/user-attachments/assets/8193c12c-5dc9-4783-b64b-6a8449a1bea4";>
    
    Closes #2866 from turboFei/record_fallback.
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: Fei Wang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 92 ++++++++++++++++++++++
 .../shuffle/celeborn/SparkShuffleManager.java      |  1 +
 .../celeborn/client/ApplicationHeartbeater.scala   |  8 +-
 .../apache/celeborn/client/LifecycleManager.scala  |  5 +-
 common/src/main/proto/TransportMessages.proto      |  2 +
 .../common/metrics/source/AbstractSource.scala     | 59 ++++++++++++++
 .../common/protocol/message/ControlMessages.scala  |  4 +
 .../apache/celeborn/common/util/PbSerDeUtils.scala |  2 +
 docs/monitoring.md                                 |  1 +
 .../master/clustermeta/AbstractMetaManager.java    |  8 +-
 .../master/clustermeta/IMetadataHandler.java       |  7 +-
 .../clustermeta/SingleMasterMetaManager.java       |  9 ++-
 .../master/clustermeta/ha/HAMasterMetaManager.java |  8 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  7 +-
 master/src/main/proto/Resource.proto               |  1 +
 .../celeborn/service/deploy/master/Master.scala    |  8 ++
 .../service/deploy/master/MasterSource.scala       |  2 +
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 27 +++++--
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 29 +++++--
 19 files changed, 253 insertions(+), 27 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index b9297b88c..15d81a725 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1466,6 +1466,98 @@
           ],
           "title": "metrics_LostWorkerCount_Value",
           "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "The count of shuffle fallbacks.",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 34
+          },
+          "id": 218,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": 
"metrics_ShuffleFallbackCount_Value{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ShuffleFallbackCount_Value",
+          "type": "timeseries"
         }
       ],
       "title": "Master",
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index fdb13102d..cbebde096 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -162,6 +162,7 @@ public class SparkShuffleManager implements ShuffleManager {
     initializeLifecycleManager(appId);
 
     if (fallbackPolicyRunner.applyFallbackPolicies(dependency, 
lifecycleManager)) {
+      lifecycleManager.shuffleFallbackCount().increment();
       if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
           && !conf.getBoolean("spark.shuffle.service.enabled", false)) {
         logger.error(
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index b558df596..ee337deb6 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -37,7 +37,7 @@ class ApplicationHeartbeater(
     appId: String,
     conf: CelebornConf,
     masterClient: MasterClient,
-    shuffleMetrics: () => (Long, Long),
+    shuffleMetrics: () => ((Long, Long), Long),
     workerStatusTracker: WorkerStatusTracker,
     registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
     cancelAllActiveStages: String => Unit) extends Logging {
@@ -59,9 +59,10 @@ class ApplicationHeartbeater(
         override def run(): Unit = {
           try {
             require(masterClient != null, "When sending a heartbeat, client 
shouldn't be null.")
-            val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
+            val ((tmpTotalWritten, tmpTotalFileCount), 
tmpShuffleFallbackCount) = shuffleMetrics()
             logInfo("Send app heartbeat with " +
-              s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: 
$tmpTotalFileCount")
+              s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: 
$tmpTotalFileCount, " +
+              s"shuffle fallback count: $tmpShuffleFallbackCount")
             // UserResourceConsumption and DiskInfo are eliminated from 
WorkerInfo
             // during serialization of HeartbeatFromApplication
             val appHeartbeat =
@@ -69,6 +70,7 @@ class ApplicationHeartbeater(
                 appId,
                 tmpTotalWritten,
                 tmpTotalFileCount,
+                tmpShuffleFallbackCount,
                 workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
                 ZERO_UUID,
                 true)
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index afab1a56e..bf3ccf63d 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -23,7 +23,7 @@ import java.security.SecureRandom
 import java.util
 import java.util.{function, List => JList}
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
 import java.util.function.{BiConsumer, Consumer}
 
 import scala.collection.JavaConverters._
@@ -84,6 +84,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int, 
Long]()
 
   val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
+  val shuffleFallbackCount = new LongAdder()
   // maintain each shuffle's map relation of WorkerInfo and partition location
   val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
   // shuffle id -> (partitionId -> newest PartitionLocation)
@@ -209,7 +210,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       appUniqueId,
       conf,
       masterClient,
-      () => commitManager.commitMetrics(),
+      () => commitManager.commitMetrics() -> 
shuffleFallbackCount.sumThenReset(),
       workerStatusTracker,
       registeredShuffle,
       reason => cancelAllActiveStages(reason))
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 9ed21d180..1228c1732 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -442,6 +442,7 @@ message PbHeartbeatFromApplication {
   string requestId = 4;
   repeated PbWorkerInfo needCheckedWorkerList = 5;
   bool shouldResponse = 6;
+  int64 shuffleFallbackCount = 7;
 }
 
 message PbHeartbeatFromApplicationResponse {
@@ -674,6 +675,7 @@ message PbSnapshotMetaInfo {
   map<string, PbWorkerEventInfo> workerEventInfos = 15;
   map<string, PbApplicationMeta> applicationMetas = 16;
   repeated PbWorkerInfo decommissionWorkers = 17;
+  int64 shuffleTotalFallbackCount = 18;
 }
 
 message PbOpenStream {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 95562c911..f000c9052 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -39,6 +39,9 @@ case class NamedCounter(name: String, counter: Counter, 
labels: Map[String, Stri
 case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, 
String])
   extends MetricLabels
 
+case class NamedMeter(name: String, meter: Meter, labels: Map[String, String])
+  extends MetricLabels
+
 case class NamedHistogram(name: String, histogram: Histogram, labels: 
Map[String, String])
   extends MetricLabels
 
@@ -79,6 +82,9 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
   protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
     JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
 
+  protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
+    JavaUtils.newConcurrentHashMap[String, NamedMeter]()
+
   def addGauge[T](
       name: String,
       labels: Map[String, String],
@@ -112,6 +118,33 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     addGauge(name, Map.empty[String, String], gauge)
   }
 
+  def addMeter(
+      name: String,
+      labels: Map[String, String],
+      meter: Meter): Unit = {
+    namedMeters.putIfAbsent(
+      metricNameWithCustomizedLabels(name, labels),
+      NamedMeter(name, meter, labels ++ staticLabels))
+  }
+
+  def addMeter(
+      name: String,
+      labels: JMap[String, String],
+      meter: Meter): Unit = {
+    addMeter(name, labels.asScala.toMap, meter)
+  }
+
+  def addMeter(name: String, labels: Map[String, String] = Map.empty)(f: () => 
Long): Unit = {
+    addMeter(
+      name,
+      labels,
+      metricRegistry.meter(metricNameWithCustomizedLabels(name, labels), new 
MeterSupplier(f)))
+  }
+
+  def addMeter(name: String, meter: Meter): Unit = {
+    addMeter(name, Map.empty[String, String], meter)
+  }
+
   protected val namedTimers
       : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, 
Long])] =
     JavaUtils.newConcurrentHashMap[String, (NamedTimer, 
ConcurrentHashMap[String, Long])]()
@@ -152,6 +185,10 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedGauges.values().asScala.toList
   }
 
+  def meters(): List[NamedMeter] = {
+    namedMeters.values().asScala.toList
+  }
+
   def histograms(): List[NamedHistogram] = {
     List.empty[NamedHistogram]
   }
@@ -321,6 +358,22 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     updateInnerMetrics(sb.toString())
   }
 
+  def recordMeter(nm: NamedMeter): Unit = {
+    val timestamp = System.currentTimeMillis
+    val sb = new StringBuilder
+    val label = nm.labelString
+    sb.append(s"${normalizeKey(nm.name)}Count$label ${nm.meter.getCount} 
$timestamp\n")
+    sb.append(s"${normalizeKey(nm.name)}MeanRate$label ${nm.meter.getMeanRate} 
$timestamp\n")
+    sb.append(
+      s"${normalizeKey(nm.name)}OneMinuteRate$label 
${nm.meter.getOneMinuteRate} $timestamp\n")
+    sb.append(
+      s"${normalizeKey(nm.name)}FiveMinuteRate$label 
${nm.meter.getFiveMinuteRate} $timestamp\n")
+    sb.append(
+      s"${normalizeKey(nm.name)}FifteenMinuteRate$label 
${nm.meter.getFifteenMinuteRate} $timestamp\n")
+
+    updateInnerMetrics(sb.toString())
+  }
+
   def recordHistogram(nh: NamedHistogram): Unit = {
     val timestamp = System.currentTimeMillis
     val sb = new mutable.StringBuilder
@@ -377,6 +430,7 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     innerMetrics.synchronized {
       counters().foreach(c => recordCounter(c))
       gauges().foreach(g => recordGauge(g))
+      meters().foreach(m => recordMeter(m))
       histograms().foreach(h => {
         recordHistogram(h)
         h.asInstanceOf[CelebornHistogram].reservoir
@@ -400,6 +454,7 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     metricsCleaner.shutdown()
     namedCounters.clear()
     namedGauges.clear()
+    namedMeters.clear()
     namedTimers.clear()
     innerMetrics.clear()
     metricRegistry.removeMatching(new MetricFilter {
@@ -436,3 +491,7 @@ class TimerSupplier(val slidingWindowSize: Int)
 class GaugeSupplier[T](f: () => T) extends 
MetricRegistry.MetricSupplier[Gauge[_]] {
   override def newMetric(): Gauge[T] = new Gauge[T] { override def getValue: T 
= f() }
 }
+
+class MeterSupplier(f: () => Long) extends 
MetricRegistry.MetricSupplier[Meter] {
+  override def newMetric(): Meter = new Meter { override def getCount: Long = 
f() }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 4a96184b6..24d3fe601 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -412,6 +412,7 @@ object ControlMessages extends Logging {
       appId: String,
       totalWritten: Long,
       fileCount: Long,
+      shuffleFallbackCount: Long,
       needCheckedWorkerList: util.List[WorkerInfo],
       override var requestId: String = ZERO_UUID,
       shouldResponse: Boolean = false) extends MasterRequestMessage
@@ -809,6 +810,7 @@ object ControlMessages extends Logging {
           appId,
           totalWritten,
           fileCount,
+          shuffleFallbackCount,
           needCheckedWorkerList,
           requestId,
           shouldResponse) =>
@@ -817,6 +819,7 @@ object ControlMessages extends Logging {
         .setRequestId(requestId)
         .setTotalWritten(totalWritten)
         .setFileCount(fileCount)
+        .setShuffleFallbackCount(shuffleFallbackCount)
         .addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
           PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
         .setShouldResponse(shouldResponse)
@@ -1209,6 +1212,7 @@ object ControlMessages extends Logging {
           pbHeartbeatFromApplication.getAppId,
           pbHeartbeatFromApplication.getTotalWritten,
           pbHeartbeatFromApplication.getFileCount,
+          pbHeartbeatFromApplication.getShuffleFallbackCount,
           new util.ArrayList[WorkerInfo](
             pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
               .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index f9f01b9a7..9a14a3dee 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -459,6 +459,7 @@ object PbSerDeUtils {
       workers: java.util.Set[WorkerInfo],
       partitionTotalWritten: java.lang.Long,
       partitionTotalFileCount: java.lang.Long,
+      shuffleTotalFallbackCount: java.lang.Long,
       appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
       currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
       lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
@@ -480,6 +481,7 @@ object PbSerDeUtils {
       .addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, 
false)).asJava)
       .setPartitionTotalWritten(partitionTotalWritten)
       .setPartitionTotalFileCount(partitionTotalFileCount)
+      .setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
       // appDiskUsageMetricSnapshots can have null values,
       // protobuf repeated value can't support null value in list.
       .addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ 
!= null)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index b037f6a0f..c07c16d1e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -100,6 +100,7 @@ These metrics are exposed by Celeborn master.
     | RunningApplicationCount  | The count of running applications.            
                                  |
     | ActiveShuffleSize        | The active shuffle size of workers.           
                                  |
     | ActiveShuffleFileCount   | The active shuffle file count of workers.     
                                  |
+    | ShuffleFallbackCount     | The count of shuffle fallbacks.               
                                  |
     | WorkerCount              | The count of active workers.                  
                                  |
     | LostWorkerCount          | The count of workers in lost list.            
                                  |
     | ExcludedWorkerCount      | The count of workers in excluded list.        
                                  |
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index f1d8a37e8..80988cfff 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -85,6 +85,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public double unhealthyDiskRatioThreshold;
   public final LongAdder partitionTotalWritten = new LongAdder();
   public final LongAdder partitionTotalFileCount = new LongAdder();
+  public final LongAdder shuffleTotalFallbackCount = new LongAdder();
   public AppDiskUsageMetric appDiskUsageMetric = null;
 
   public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
@@ -139,10 +140,12 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
   }
 
-  public void updateAppHeartbeatMeta(String appId, long time, long 
totalWritten, long fileCount) {
+  public void updateAppHeartbeatMeta(
+      String appId, long time, long totalWritten, long fileCount, long 
shuffleFallbackCount) {
     appHeartbeatTime.put(appId, time);
     partitionTotalWritten.add(totalWritten);
     partitionTotalFileCount.add(fileCount);
+    shuffleTotalFallbackCount.add(shuffleFallbackCount);
   }
 
   public void updateAppLostMeta(String appId) {
@@ -316,6 +319,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                 new HashSet(workersMap.values()),
                 partitionTotalWritten.sum(),
                 partitionTotalFileCount.sum(),
+                shuffleTotalFallbackCount.sum(),
                 appDiskUsageMetric.snapShots(),
                 appDiskUsageMetric.currentSnapShot().get(),
                 lostWorkers,
@@ -416,6 +420,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
 
       partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
       
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
+      
shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount());
       appDiskUsageMetric.restoreFromSnapshot(
           snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
               .map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
@@ -457,6 +462,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     workerLostEvents.clear();
     partitionTotalWritten.reset();
     partitionTotalFileCount.reset();
+    shuffleTotalFallbackCount.reset();
     workerEventInfos.clear();
     applicationMetas.clear();
   }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 9e17728d6..e765a9d4d 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -39,7 +39,12 @@ public interface IMetadataHandler {
   void handleBatchUnRegisterShuffles(List<String> shuffleKeys, String 
requestId);
 
   void handleAppHeartbeat(
-      String appId, long totalWritten, long fileCount, long time, String 
requestId);
+      String appId,
+      long totalWritten,
+      long fileCount,
+      long shuffleFallbackCount,
+      long time,
+      String requestId);
 
   void handleAppLost(String appId, String requestId);
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 228f9c3db..04c74e51e 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -73,8 +73,13 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
 
   @Override
   public void handleAppHeartbeat(
-      String appId, long totalWritten, long fileCount, long time, String 
requestId) {
-    updateAppHeartbeatMeta(appId, time, totalWritten, fileCount);
+      String appId,
+      long totalWritten,
+      long fileCount,
+      long shuffleFallbackCount,
+      long time,
+      String requestId) {
+    updateAppHeartbeatMeta(appId, time, totalWritten, fileCount, 
shuffleFallbackCount);
   }
 
   @Override
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index fd229c904..a87470d9d 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -130,7 +130,12 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
 
   @Override
   public void handleAppHeartbeat(
-      String appId, long totalWritten, long fileCount, long time, String 
requestId) {
+      String appId,
+      long totalWritten,
+      long fileCount,
+      long shuffleFallbackCount,
+      long time,
+      String requestId) {
     try {
       ratisServer.submitRequest(
           ResourceRequest.newBuilder()
@@ -142,6 +147,7 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
                       .setTime(time)
                       .setTotalWritten(totalWritten)
                       .setFileCount(fileCount)
+                      .setShuffleFallbackCount(shuffleFallbackCount)
                       .build())
               .build());
     } catch (CelebornRuntimeException e) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 4ec1dac37..626378f0e 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -141,7 +141,12 @@ public class MetaHandler {
           long time = request.getAppHeartbeatRequest().getTime();
           long totalWritten = 
request.getAppHeartbeatRequest().getTotalWritten();
           long fileCount = request.getAppHeartbeatRequest().getFileCount();
-          metaSystem.updateAppHeartbeatMeta(appId, time, totalWritten, 
fileCount);
+          long shuffleFallbackCount = 
request.getAppHeartbeatRequest().getShuffleFallbackCount();
+          if (shuffleFallbackCount > 0) {
+            LOG.warn("{} shuffle fallbacks in app {}", shuffleFallbackCount, 
appId);
+          }
+          metaSystem.updateAppHeartbeatMeta(
+              appId, time, totalWritten, fileCount, shuffleFallbackCount);
           break;
 
         case AppLost:
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index acb1d6097..1e65943a9 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -120,6 +120,7 @@ message AppHeartbeatRequest {
   required int64 time = 2;
   required int64 totalWritten = 3;
   required int64 fileCount = 4;
+  optional int64 shuffleFallbackCount = 5;
 }
 
 message AppLostRequest {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 117b4875a..519d36019 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -261,6 +261,10 @@ private[celeborn] class Master(
       }).sum()
   }
 
+  masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
+    statusSystem.shuffleTotalFallbackCount.sum()
+  }
+
   masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
     statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
   }
@@ -391,6 +395,7 @@ private[celeborn] class Master(
           appId,
           totalWritten,
           fileCount,
+          fallbackShuffles,
           needCheckedWorkerList,
           requestId,
           shouldResponse) =>
@@ -403,6 +408,7 @@ private[celeborn] class Master(
           appId,
           totalWritten,
           fileCount,
+          fallbackShuffles,
           needCheckedWorkerList,
           requestId,
           shouldResponse))
@@ -1094,6 +1100,7 @@ private[celeborn] class Master(
       appId: String,
       totalWritten: Long,
       fileCount: Long,
+      shuffleFallbackCount: Long,
       needCheckedWorkerList: util.List[WorkerInfo],
       requestId: String,
       shouldResponse: Boolean): Unit = {
@@ -1101,6 +1108,7 @@ private[celeborn] class Master(
       appId,
       totalWritten,
       fileCount,
+      shuffleFallbackCount,
       System.currentTimeMillis(),
       requestId)
     val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index b2e725244..13f35cc84 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -55,6 +55,8 @@ object MasterSource {
 
   val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
 
+  val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
+
   val OFFER_SLOTS_TIME = "OfferSlotsTime"
 
   // Capacity
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index cde62c4cf..810f12d19 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -637,11 +637,11 @@ public class DefaultMetaSystemSuiteJ {
   @Test
   public void testHandleAppHeartbeat() {
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
     assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
     assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2));
 
     assertEquals(2, statusSystem.appHeartbeatTime.size());
@@ -811,23 +811,23 @@ public class DefaultMetaSystemSuiteJ {
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
 
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy, 
getNewReqeustId());
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
 
     // Max size
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy, 
getNewReqeustId());
 
     // Min size
     statusSystem.handleUpdatePartitionSize();
@@ -897,4 +897,15 @@ public class DefaultMetaSystemSuiteJ {
     assertEquals(1, statusSystem.decommissionWorkers.size());
     assertTrue(statusSystem.excludedWorkers.isEmpty());
   }
+
+  @Test
+  public void testShuffleFallbackCount() {
+    statusSystem.shuffleTotalFallbackCount.reset();
+
+    Long dummy = 1235L;
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy, 
getNewReqeustId());
+
+    assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+  }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index e4170a02c..ae70ed54b 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -946,14 +946,14 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertNotNull(statusSystem);
 
     long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
     Thread.sleep(3000L);
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
     Thread.sleep(3000L);
 
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
@@ -1315,23 +1315,23 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
 
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy, 
getNewReqeustId());
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
 
     // Max size
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy, 
getNewReqeustId());
 
     // Min size
     statusSystem.handleUpdatePartitionSize();
@@ -1501,4 +1501,17 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
   }
+
+  @Test
+  public void testShuffleFallbackCount() {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+    statusSystem.shuffleTotalFallbackCount.reset();
+
+    Long dummy = 1235L;
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy, 
getNewReqeustId());
+
+    Assert.assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+  }
 }


Reply via email to