This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 169b6f697 [CELEBORN-1685] ShuffleFallbackPolicy supports 
ShuffleFallbackCount metric
169b6f697 is described below

commit 169b6f6973b2ee5093d91df0d2b573977efdc7ae
Author: SteNicholas <[email protected]>
AuthorDate: Mon Nov 11 10:37:25 2024 +0800

    [CELEBORN-1685] ShuffleFallbackPolicy supports ShuffleFallbackCount metric
    
    ### What changes were proposed in this pull request?
    
    1. `ShuffleFallbackPolicy` supports `ShuffleFallbackCount` metric to 
provide the shuffle fallback count of each fallback policy.
    2. Introduce `ShuffleTotalCount` metric to record the total count of 
shuffle.
    3. Fix Spark 2 does not increment shuffle count via `LifecycleManager`.
    
    ### Why are the changes needed?
    
    The implementations of `ShuffleFallbackPolicy` does not support 
`ShuffleFallbackCount` metric at present. Meanwhile, Bilibili production 
practice needs `ShuffleFallbackCount` of different `ShuffleFallbackPolicy`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Cluster test.
    
    Closes #2891 from SteNicholas/CELEBORN-1685.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 90 +++++++++++++++++++++-
 .../shuffle/celeborn/SparkShuffleManager.java      |  1 +
 .../CelebornShuffleFallbackPolicyRunner.scala      | 28 +++++--
 .../shuffle/celeborn/SparkShuffleManager.java      |  2 +-
 .../CelebornShuffleFallbackPolicyRunner.scala      | 24 ++++--
 .../celeborn/client/ApplicationHeartbeater.scala   | 11 ++-
 .../apache/celeborn/client/LifecycleManager.scala  | 15 +++-
 common/src/main/proto/TransportMessages.proto      |  6 +-
 .../common/protocol/message/ControlMessages.scala  | 12 ++-
 .../apache/celeborn/common/util/PbSerDeUtils.scala |  6 +-
 docs/monitoring.md                                 | 37 ++++-----
 .../master/clustermeta/AbstractMetaManager.java    | 29 +++++--
 .../master/clustermeta/IMetadataHandler.java       |  3 +-
 .../clustermeta/SingleMasterMetaManager.java       |  6 +-
 .../master/clustermeta/ha/HAMasterMetaManager.java |  6 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  | 16 ++--
 master/src/main/proto/Resource.proto               |  3 +-
 .../celeborn/service/deploy/master/Master.scala    | 29 +++++--
 .../service/deploy/master/MasterSource.scala       |  5 +-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 41 ++++++----
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 41 ++++++----
 21 files changed, 313 insertions(+), 98 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 15d81a725..6007e7297 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1472,7 +1472,7 @@
             "type": "prometheus",
             "uid": "${DS_PROMETHEUS}"
           },
-          "description": "The count of shuffle fallbacks.",
+          "description": "The total count of shuffle including celeborn 
shuffle and spark built-in shuffle.",
           "fieldConfig": {
             "defaults": {
               "color": {
@@ -1544,6 +1544,94 @@
               "sort": "none"
             }
           },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": 
"metrics_ShuffleTotalCount_Value{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ShuffleTotalCount_Value",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "The count of shuffle fallbacks.",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 34
+          },
+          "id": 219,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
           "targets": [
             {
               "datasource": {
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 8c25dc5c2..d450071f4 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -118,6 +118,7 @@ public class SparkShuffleManager implements ShuffleManager {
     String appId = SparkUtils.appUniqueId(dependency.rdd().context());
     initializeLifecycleManager(appId);
 
+    lifecycleManager.shuffleCount().increment();
     if (fallbackPolicyRunner.applyFallbackPolicies(dependency, 
lifecycleManager)) {
       logger.warn("Fallback to SortShuffleManager!");
       sortShuffleIds.add(shuffleId);
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index c1e93ec76..3ba52ef4d 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.shuffle.celeborn
 
+import java.util.function.BiFunction
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.ShuffleDependency
@@ -35,12 +37,26 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
   def applyFallbackPolicies[K, V, C](
       dependency: ShuffleDependency[K, V, C],
       lifecycleManager: LifecycleManager): Boolean = {
-    val needFallback =
-      shuffleFallbackPolicies.exists(_.needFallback(dependency, conf, 
lifecycleManager))
-    if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
-      throw new CelebornIOException(
-        "Fallback to spark built-in shuffle implementation is prohibited.")
+    val fallbackPolicy =
+      shuffleFallbackPolicies.find(_.needFallback(dependency, conf, 
lifecycleManager))
+    if (fallbackPolicy.isDefined) {
+      if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
+        throw new CelebornIOException(
+          "Fallback to spark built-in shuffle implementation is prohibited.")
+      } else {
+        lifecycleManager.shuffleFallbackCounts.compute(
+          fallbackPolicy.getClass.getName,
+          new BiFunction[String, java.lang.Long, java.lang.Long] {
+            override def apply(k: String, v: java.lang.Long): java.lang.Long = 
{
+              if (v == null) {
+                1L
+              } else {
+                v + 1L
+              }
+            }
+          })
+      }
     }
-    needFallback
+    fallbackPolicy.isDefined
   }
 }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index cbebde096..8541ad223 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -161,8 +161,8 @@ public class SparkShuffleManager implements ShuffleManager {
     String appId = SparkUtils.appUniqueId(dependency.rdd().context());
     initializeLifecycleManager(appId);
 
+    lifecycleManager.shuffleCount().increment();
     if (fallbackPolicyRunner.applyFallbackPolicies(dependency, 
lifecycleManager)) {
-      lifecycleManager.shuffleFallbackCount().increment();
       if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
           && !conf.getBoolean("spark.shuffle.service.enabled", false)) {
         logger.error(
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index c1e93ec76..8a2b7c889 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -35,12 +35,24 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
   def applyFallbackPolicies[K, V, C](
       dependency: ShuffleDependency[K, V, C],
       lifecycleManager: LifecycleManager): Boolean = {
-    val needFallback =
-      shuffleFallbackPolicies.exists(_.needFallback(dependency, conf, 
lifecycleManager))
-    if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
-      throw new CelebornIOException(
-        "Fallback to spark built-in shuffle implementation is prohibited.")
+    val fallbackPolicy =
+      shuffleFallbackPolicies.find(_.needFallback(dependency, conf, 
lifecycleManager))
+    if (fallbackPolicy.isDefined) {
+      if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
+        throw new CelebornIOException(
+          "Fallback to spark built-in shuffle implementation is prohibited.")
+      } else {
+        lifecycleManager.shuffleFallbackCounts.compute(
+          fallbackPolicy.getClass.getName,
+          (_, v) => {
+            if (v == null) {
+              1L
+            } else {
+              v + 1L
+            }
+          })
+      }
     }
-    needFallback
+    fallbackPolicy.isDefined
   }
 }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index ee337deb6..3d8d23019 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -37,7 +37,7 @@ class ApplicationHeartbeater(
     appId: String,
     conf: CelebornConf,
     masterClient: MasterClient,
-    shuffleMetrics: () => ((Long, Long), Long),
+    shuffleMetrics: () => ((Long, Long), (Long, Map[String, java.lang.Long])),
     workerStatusTracker: WorkerStatusTracker,
     registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
     cancelAllActiveStages: String => Unit) extends Logging {
@@ -59,10 +59,12 @@ class ApplicationHeartbeater(
         override def run(): Unit = {
           try {
             require(masterClient != null, "When sending a heartbeat, client 
shouldn't be null.")
-            val ((tmpTotalWritten, tmpTotalFileCount), 
tmpShuffleFallbackCount) = shuffleMetrics()
+            val (
+              (tmpTotalWritten, tmpTotalFileCount),
+              (tmpShuffleCount, tmpShuffleFallbackCounts)) = shuffleMetrics()
             logInfo("Send app heartbeat with " +
               s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: 
$tmpTotalFileCount, " +
-              s"shuffle fallback count: $tmpShuffleFallbackCount")
+              s"shuffle count: $tmpShuffleCount, shuffle fallback counts: 
$tmpShuffleFallbackCounts")
             // UserResourceConsumption and DiskInfo are eliminated from 
WorkerInfo
             // during serialization of HeartbeatFromApplication
             val appHeartbeat =
@@ -70,7 +72,8 @@ class ApplicationHeartbeater(
                 appId,
                 tmpTotalWritten,
                 tmpTotalFileCount,
-                tmpShuffleFallbackCount,
+                tmpShuffleCount,
+                tmpShuffleFallbackCounts.asJava,
                 workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
                 ZERO_UUID,
                 true)
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bf3ccf63d..47c27bced 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -84,7 +84,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int, 
Long]()
 
   val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
-  val shuffleFallbackCount = new LongAdder()
+  val shuffleCount = new LongAdder()
+  val shuffleFallbackCounts = JavaUtils.newConcurrentHashMap[String, 
java.lang.Long]()
   // maintain each shuffle's map relation of WorkerInfo and partition location
   val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
   // shuffle id -> (partitionId -> newest PartitionLocation)
@@ -210,10 +211,20 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       appUniqueId,
       conf,
       masterClient,
-      () => commitManager.commitMetrics() -> 
shuffleFallbackCount.sumThenReset(),
+      () => {
+        commitManager.commitMetrics() ->
+          (shuffleCount.sumThenReset(), resetShuffleFallbackCounts())
+      },
       workerStatusTracker,
       registeredShuffle,
       reason => cancelAllActiveStages(reason))
+  private def resetShuffleFallbackCounts(): Map[String, java.lang.Long] = {
+    val fallbackCounts = new util.HashMap[String, java.lang.Long]()
+    shuffleFallbackCounts.keys().asScala.foreach { key =>
+      Option(shuffleFallbackCounts.remove(key)).filter(_ > 
0).foreach(fallbackCounts.put(key, _))
+    }
+    fallbackCounts.asScala.toMap
+  }
   private val changePartitionManager = new ChangePartitionManager(conf, this)
   private val releasePartitionManager = new ReleasePartitionManager(conf, this)
 
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 1228c1732..be09e83c7 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -442,7 +442,8 @@ message PbHeartbeatFromApplication {
   string requestId = 4;
   repeated PbWorkerInfo needCheckedWorkerList = 5;
   bool shouldResponse = 6;
-  int64 shuffleFallbackCount = 7;
+  int64 shuffleCount = 7;
+  map<string, int64> shuffleFallbackCounts = 8;
 }
 
 message PbHeartbeatFromApplicationResponse {
@@ -675,7 +676,8 @@ message PbSnapshotMetaInfo {
   map<string, PbWorkerEventInfo> workerEventInfos = 15;
   map<string, PbApplicationMeta> applicationMetas = 16;
   repeated PbWorkerInfo decommissionWorkers = 17;
-  int64 shuffleTotalFallbackCount = 18;
+  int64 shuffleTotalCount = 18;
+  map<string, int64> shuffleFallbackCounts = 19;
 }
 
 message PbOpenStream {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 24d3fe601..3ec9c699b 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -412,7 +412,8 @@ object ControlMessages extends Logging {
       appId: String,
       totalWritten: Long,
       fileCount: Long,
-      shuffleFallbackCount: Long,
+      shuffleCount: Long,
+      shuffleFallbackCounts: util.Map[String, java.lang.Long],
       needCheckedWorkerList: util.List[WorkerInfo],
       override var requestId: String = ZERO_UUID,
       shouldResponse: Boolean = false) extends MasterRequestMessage
@@ -810,7 +811,8 @@ object ControlMessages extends Logging {
           appId,
           totalWritten,
           fileCount,
-          shuffleFallbackCount,
+          shuffleCount,
+          shuffleFallbackCounts,
           needCheckedWorkerList,
           requestId,
           shouldResponse) =>
@@ -819,7 +821,8 @@ object ControlMessages extends Logging {
         .setRequestId(requestId)
         .setTotalWritten(totalWritten)
         .setFileCount(fileCount)
-        .setShuffleFallbackCount(shuffleFallbackCount)
+        .setShuffleCount(shuffleCount)
+        .putAllShuffleFallbackCounts(shuffleFallbackCounts)
         .addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
           PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
         .setShouldResponse(shouldResponse)
@@ -1212,7 +1215,8 @@ object ControlMessages extends Logging {
           pbHeartbeatFromApplication.getAppId,
           pbHeartbeatFromApplication.getTotalWritten,
           pbHeartbeatFromApplication.getFileCount,
-          pbHeartbeatFromApplication.getShuffleFallbackCount,
+          pbHeartbeatFromApplication.getShuffleCount,
+          pbHeartbeatFromApplication.getShuffleFallbackCountsMap,
           new util.ArrayList[WorkerInfo](
             pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
               .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index d2f07191e..0c638f241 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -466,7 +466,8 @@ object PbSerDeUtils {
       workers: java.util.Set[WorkerInfo],
       partitionTotalWritten: java.lang.Long,
       partitionTotalFileCount: java.lang.Long,
-      shuffleTotalFallbackCount: java.lang.Long,
+      shuffleTotalCount: java.lang.Long,
+      shuffleFallbackCounts: java.util.Map[String, java.lang.Long],
       appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
       currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
       lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
@@ -488,7 +489,8 @@ object PbSerDeUtils {
       .addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, 
false)).asJava)
       .setPartitionTotalWritten(partitionTotalWritten)
       .setPartitionTotalFileCount(partitionTotalFileCount)
-      .setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
+      .setShuffleTotalCount(shuffleTotalCount)
+      .putAllShuffleFallbackCounts(shuffleFallbackCounts)
       // appDiskUsageMetricSnapshots can have null values,
       // protobuf repeated value can't support null value in list.
       .addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ 
!= null)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index c07c16d1e..93e997e8e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -92,24 +92,25 @@ These metrics are exposed by Celeborn master.
 
   - namespace=master
     
-    | Metric Name              | Description                                   
                                  |
-    
|--------------------------|---------------------------------------------------------------------------------|
-    | RegisteredShuffleCount   | The count of registered shuffle.              
                                  |
-    | DeviceCelebornFreeBytes  | The actual usable space of Celeborn for 
device.                                 |
-    | DeviceCelebornTotalBytes | The total space of Celeborn for device.       
                                  |
-    | RunningApplicationCount  | The count of running applications.            
                                  |
-    | ActiveShuffleSize        | The active shuffle size of workers.           
                                  |
-    | ActiveShuffleFileCount   | The active shuffle file count of workers.     
                                  |
-    | ShuffleFallbackCount     | The count of shuffle fallbacks.               
                                  |
-    | WorkerCount              | The count of active workers.                  
                                  |
-    | LostWorkerCount          | The count of workers in lost list.            
                                  |
-    | ExcludedWorkerCount      | The count of workers in excluded list.        
                                  |
-    | AvailableWorkerCount     | The count of workers in available list.       
                                  |
-    | ShutdownWorkerCount      | The count of workers in shutdown list.        
                                  |
-    | DecommissionWorkerCount  | The count of workers in decommission list.    
                                  |
-    | IsActiveMaster           | Whether the current master is active.         
                                  |
-    | PartitionSize            | The size of estimated shuffle partition.      
                                  |
-    | OfferSlotsTime           | The time for masters to handle `RequestSlots` 
request when registering shuffle. |
+    | Metric Name              | Description                                   
                                    |
+    
|--------------------------|-----------------------------------------------------------------------------------|
+    | RegisteredShuffleCount   | The count of registered shuffle.              
                                    |
+    | DeviceCelebornFreeBytes  | The actual usable space of Celeborn for 
device.                                   |
+    | DeviceCelebornTotalBytes | The total space of Celeborn for device.       
                                    |
+    | RunningApplicationCount  | The count of running applications.            
                                    |
+    | ActiveShuffleSize        | The active shuffle size of workers.           
                                    |
+    | ActiveShuffleFileCount   | The active shuffle file count of workers.     
                                    |
+    | ShuffleTotalCount        | The total count of shuffle including celeborn 
shuffle and spark built-in shuffle. |
+    | ShuffleFallbackCount     | The count of shuffle fallbacks.               
                                    |
+    | WorkerCount              | The count of active workers.                  
                                    |
+    | LostWorkerCount          | The count of workers in lost list.            
                                    |
+    | ExcludedWorkerCount      | The count of workers in excluded list.        
                                    |
+    | AvailableWorkerCount     | The count of workers in available list.       
                                    |
+    | ShutdownWorkerCount      | The count of workers in shutdown list.        
                                    |
+    | DecommissionWorkerCount  | The count of workers in decommission list.    
                                    |
+    | IsActiveMaster           | Whether the current master is active.         
                                    |
+    | PartitionSize            | The size of estimated shuffle partition.      
                                    |
+    | OfferSlotsTime           | The time for masters to handle `RequestSlots` 
request when registering shuffle.   |
 
   - namespace=CPU
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 80988cfff..cede0284a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -85,7 +85,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public double unhealthyDiskRatioThreshold;
   public final LongAdder partitionTotalWritten = new LongAdder();
   public final LongAdder partitionTotalFileCount = new LongAdder();
-  public final LongAdder shuffleTotalFallbackCount = new LongAdder();
+  public final LongAdder shuffleTotalCount = new LongAdder();
+  public final Map<String, Long> shuffleFallbackCounts = 
JavaUtils.newConcurrentHashMap();
   public AppDiskUsageMetric appDiskUsageMetric = null;
 
   public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
@@ -141,11 +142,17 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   }
 
   public void updateAppHeartbeatMeta(
-      String appId, long time, long totalWritten, long fileCount, long 
shuffleFallbackCount) {
+      String appId,
+      long time,
+      long totalWritten,
+      long fileCount,
+      long shuffleCount,
+      Map<String, Long> shuffleFallbackCounts) {
     appHeartbeatTime.put(appId, time);
     partitionTotalWritten.add(totalWritten);
     partitionTotalFileCount.add(fileCount);
-    shuffleTotalFallbackCount.add(shuffleFallbackCount);
+    shuffleTotalCount.add(shuffleCount);
+    addShuffleFallbackCounts(shuffleFallbackCounts);
   }
 
   public void updateAppLostMeta(String appId) {
@@ -319,7 +326,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                 new HashSet(workersMap.values()),
                 partitionTotalWritten.sum(),
                 partitionTotalFileCount.sum(),
-                shuffleTotalFallbackCount.sum(),
+                shuffleTotalCount.sum(),
+                shuffleFallbackCounts,
                 appDiskUsageMetric.snapShots(),
                 appDiskUsageMetric.currentSnapShot().get(),
                 lostWorkers,
@@ -420,7 +428,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
 
       partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
       
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
-      
shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount());
+      shuffleTotalCount.add(snapshotMetaInfo.getShuffleTotalCount());
+      addShuffleFallbackCounts(snapshotMetaInfo.getShuffleFallbackCountsMap());
       appDiskUsageMetric.restoreFromSnapshot(
           snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
               .map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
@@ -462,7 +471,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     workerLostEvents.clear();
     partitionTotalWritten.reset();
     partitionTotalFileCount.reset();
-    shuffleTotalFallbackCount.reset();
+    shuffleTotalCount.reset();
+    shuffleFallbackCounts.clear();
     workerEventInfos.clear();
     applicationMetas.clear();
   }
@@ -550,4 +560,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public int registeredShuffleCount() {
     return 
registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
   }
+
+  private void addShuffleFallbackCounts(Map<String, Long> fallbackCounts) {
+    for (String fallbackPolicy : fallbackCounts.keySet()) {
+      shuffleFallbackCounts.compute(
+          fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v + 
fallbackCounts.get(k));
+    }
+  }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index e765a9d4d..5ad1907c1 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -42,7 +42,8 @@ public interface IMetadataHandler {
       String appId,
       long totalWritten,
       long fileCount,
-      long shuffleFallbackCount,
+      long shuffleCount,
+      Map<String, Long> shuffleFallbackCounts,
       long time,
       String requestId);
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 04c74e51e..033be796d 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -76,10 +76,12 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
       String appId,
       long totalWritten,
       long fileCount,
-      long shuffleFallbackCount,
+      long shuffleCount,
+      Map<String, Long> shuffleFallbackCounts,
       long time,
       String requestId) {
-    updateAppHeartbeatMeta(appId, time, totalWritten, fileCount, 
shuffleFallbackCount);
+    updateAppHeartbeatMeta(
+        appId, time, totalWritten, fileCount, shuffleCount, 
shuffleFallbackCounts);
   }
 
   @Override
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index a87470d9d..0c48419d6 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -133,7 +133,8 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
       String appId,
       long totalWritten,
       long fileCount,
-      long shuffleFallbackCount,
+      long shuffleCount,
+      Map<String, Long> shuffleFallbackCounts,
       long time,
       String requestId) {
     try {
@@ -147,7 +148,8 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
                       .setTime(time)
                       .setTotalWritten(totalWritten)
                       .setFileCount(fileCount)
-                      .setShuffleFallbackCount(shuffleFallbackCount)
+                      .setShuffleCount(shuffleCount)
+                      .putAllShuffleFallbackCounts(shuffleFallbackCounts)
                       .build())
               .build());
     } catch (CelebornRuntimeException e) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 626378f0e..5ce91baed 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -33,6 +33,7 @@ import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.meta.WorkerStatus;
 import org.apache.celeborn.common.quota.ResourceConsumption;
+import org.apache.celeborn.common.util.CollectionUtils;
 import org.apache.celeborn.service.deploy.master.clustermeta.MetaUtil;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -137,16 +138,21 @@ public class MetaHandler {
 
         case AppHeartbeat:
           appId = request.getAppHeartbeatRequest().getAppId();
-          LOG.debug("Handle app heartbeat for {}", appId);
           long time = request.getAppHeartbeatRequest().getTime();
           long totalWritten = 
request.getAppHeartbeatRequest().getTotalWritten();
           long fileCount = request.getAppHeartbeatRequest().getFileCount();
-          long shuffleFallbackCount = 
request.getAppHeartbeatRequest().getShuffleFallbackCount();
-          if (shuffleFallbackCount > 0) {
-            LOG.warn("{} shuffle fallbacks in app {}", shuffleFallbackCount, 
appId);
+          long shuffleCount = 
request.getAppHeartbeatRequest().getShuffleCount();
+          LOG.debug("Handle app heartbeat for {} with shuffle count {}", 
appId, shuffleCount);
+          Map<String, Long> shuffleFallbackCounts =
+              request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
+          if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
+            LOG.warn(
+                "{} shuffle fallbacks in app {}",
+                shuffleFallbackCounts.values().stream().mapToLong(v -> 
v).sum(),
+                appId);
           }
           metaSystem.updateAppHeartbeatMeta(
-              appId, time, totalWritten, fileCount, shuffleFallbackCount);
+              appId, time, totalWritten, fileCount, shuffleCount, 
shuffleFallbackCounts);
           break;
 
         case AppLost:
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index 1e65943a9..52b418912 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -120,7 +120,8 @@ message AppHeartbeatRequest {
   required int64 time = 2;
   required int64 totalWritten = 3;
   required int64 fileCount = 4;
-  optional int64 shuffleFallbackCount = 5;
+  optional int64 shuffleCount = 5;
+  map<string, int64> shuffleFallbackCounts = 6;
 }
 
 message AppLostRequest {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 519d36019..0a95609e1 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -261,8 +261,12 @@ private[celeborn] class Master(
       }).sum()
   }
 
+  masterSource.addGauge(MasterSource.SHUFFLE_TOTAL_COUNT) { () =>
+    statusSystem.shuffleTotalCount.sum()
+  }
+
   masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
-    statusSystem.shuffleTotalFallbackCount.sum()
+    statusSystem.shuffleFallbackCounts.values().asScala.map(_.longValue()).sum
   }
 
   masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
@@ -395,7 +399,8 @@ private[celeborn] class Master(
           appId,
           totalWritten,
           fileCount,
-          fallbackShuffles,
+          shuffleFallbackCount,
+          shuffleFallbackCounts,
           needCheckedWorkerList,
           requestId,
           shouldResponse) =>
@@ -408,7 +413,8 @@ private[celeborn] class Master(
           appId,
           totalWritten,
           fileCount,
-          fallbackShuffles,
+          shuffleFallbackCount,
+          shuffleFallbackCounts,
           needCheckedWorkerList,
           requestId,
           shouldResponse))
@@ -1095,12 +1101,23 @@ private[celeborn] class Master(
     }
   }
 
+  private def gaugeShuffleFallbackCounts(): Unit = {
+    statusSystem.shuffleFallbackCounts.keySet().asScala.foreach { 
fallbackPolicy =>
+      masterSource.addGauge(
+        MasterSource.SHUFFLE_FALLBACK_COUNT,
+        Map("fallbackPolicy" -> fallbackPolicy)) { () =>
+        
Option(statusSystem.shuffleFallbackCounts.get(fallbackPolicy)).getOrElse(0L)
+      }
+    }
+  }
+
   private def handleHeartbeatFromApplication(
       context: RpcCallContext,
       appId: String,
       totalWritten: Long,
       fileCount: Long,
-      shuffleFallbackCount: Long,
+      shuffleCount: Long,
+      shuffleFallbackCounts: util.Map[String, java.lang.Long],
       needCheckedWorkerList: util.List[WorkerInfo],
       requestId: String,
       shouldResponse: Boolean): Unit = {
@@ -1108,9 +1125,11 @@ private[celeborn] class Master(
       appId,
       totalWritten,
       fileCount,
-      shuffleFallbackCount,
+      shuffleCount,
+      shuffleFallbackCounts,
       System.currentTimeMillis(),
       requestId)
+    gaugeShuffleFallbackCounts()
     val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
       statusSystem.workersMap.containsKey(w.toUniqueId())).asJava
     if (shouldResponse) {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 13f35cc84..80bd1e5fb 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -44,6 +44,9 @@ object MasterSource {
   val DECOMMISSION_WORKER_COUNT = "DecommissionWorkerCount"
 
   val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
+  val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
+  // The total count including RegisteredShuffleCount(celeborn shuffle) and 
ShuffleFallbackCount(spark built-in shuffle).
+  val SHUFFLE_TOTAL_COUNT = "ShuffleTotalCount"
 
   val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"
 
@@ -55,8 +58,6 @@ object MasterSource {
 
   val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
 
-  val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
-
   val OFFER_SLOTS_TIME = "OfferSlotsTime"
 
   // Capacity
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 810f12d19..095bb9496 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -83,6 +83,8 @@ public class DefaultMetaSystemSuiteJ {
   private static final int REPLICATEPORT3 = 3114;
   private static final int INTERNALPORT3 = 3115;
   private static final String NETWORK_LOCATION3 = "networkLocation3";
+  private static final String POLICY1 = "ShufflePartitionsFallbackPolicy";
+  private static final String POLICY2 = "QuotaFallbackPolicy";
   private static final Map<String, DiskInfo> disks3 = new HashMap<>();
   private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption3 =
       new HashMap<>();
@@ -637,11 +639,11 @@ public class DefaultMetaSystemSuiteJ {
   @Test
   public void testHandleAppHeartbeat() {
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
     assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
     assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2));
 
     assertEquals(2, statusSystem.appHeartbeatTime.size());
@@ -811,23 +813,25 @@ public class DefaultMetaSystemSuiteJ {
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
 
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Max size
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
 
     // Min size
     statusSystem.handleUpdatePartitionSize();
@@ -899,13 +903,22 @@ public class DefaultMetaSystemSuiteJ {
   }
 
   @Test
-  public void testShuffleFallbackCount() {
-    statusSystem.shuffleTotalFallbackCount.reset();
+  public void testShuffleCountWithFallback() {
+    statusSystem.shuffleTotalCount.reset();
+    statusSystem.shuffleFallbackCounts.clear();
 
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy, 
getNewReqeustId());
-
-    assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+    Map<String, Long> shuffleFallbackCounts = new HashMap<>();
+    shuffleFallbackCounts.put(POLICY1, 1L);
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+    shuffleFallbackCounts.put(POLICY1, 1L);
+    shuffleFallbackCounts.put(POLICY2, 1L);
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
+
+    assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
+    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(), 
2);
+    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(), 
1);
   }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index ae70ed54b..b04a419f4 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -255,6 +255,8 @@ public class RatisMasterStatusSystemSuiteJ {
   private static final String APPID1 = "appId1";
   private static final int SHUFFLEID1 = 1;
   private static final String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
+  private static final String POLICY1 = "ShufflePartitionsFallbackPolicy";
+  private static final String POLICY2 = "QuotaFallbackPolicy";
   private static final WorkerStatus workerStatus = 
WorkerStatus.normalWorkerStatus();
 
   private String getNewReqeustId() {
@@ -946,14 +948,14 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertNotNull(statusSystem);
 
     long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
     Thread.sleep(3000L);
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
     Thread.sleep(3000L);
 
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
@@ -1315,23 +1317,25 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
 
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Max size
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
 
     // Min size
     statusSystem.handleUpdatePartitionSize();
@@ -1503,15 +1507,24 @@ public class RatisMasterStatusSystemSuiteJ {
   }
 
   @Test
-  public void testShuffleFallbackCount() {
+  public void testShuffleCountWithFallback() {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();
     Assert.assertNotNull(statusSystem);
-    statusSystem.shuffleTotalFallbackCount.reset();
+    statusSystem.shuffleTotalCount.reset();
+    statusSystem.shuffleFallbackCounts.clear();
 
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy, 
getNewReqeustId());
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy, 
getNewReqeustId());
-
-    Assert.assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+    Map<String, Long> shuffleFallbackCounts = new HashMap<>();
+    shuffleFallbackCounts.put(POLICY1, 1L);
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+    shuffleFallbackCounts.put(POLICY1, 1L);
+    shuffleFallbackCounts.put(POLICY2, 1L);
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
+
+    Assert.assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
+    
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
 2);
+    
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
 1);
   }
 }


Reply via email to