This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d9984c9e0 [CELEBORN-1800] Introduce ApplicationTotalCount and 
ApplicationFallbackCount metric to record the total and fallback count of 
application
d9984c9e0 is described below

commit d9984c9e0e4ce6f763cc43b265b500bc97f05497
Author: SteNicholas <[email protected]>
AuthorDate: Mon May 19 07:20:00 2025 -0700

    [CELEBORN-1800] Introduce ApplicationTotalCount and 
ApplicationFallbackCount metric to record the total and fallback count of 
application
    
    ### What changes were proposed in this pull request?
    
    Introduce `ApplicationTotalCount` and `ApplicationFallbackCount` metric to 
record the total and fallback count of application.
    
    ### Why are the changes needed?
    
    There is no any metric to record the total count of application running 
with celeborn shuffle and engine bulit-in shuffle and the fallback count of 
application. Meanwhile, the fallback of Flink shuffle is based on job 
granularity rather than shuffle granularity.
    
    Follw up 
https://github.com/apache/celeborn/pull/3012#issuecomment-2553488532.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - `DefaultMetaSystemSuiteJ#testShuffleAndApplicationCountWithFallback`
    - `RatisMasterStatusSystemSuiteJ#testShuffleAndApplicationCountWithFallback`
    
    Closes #3026 from SteNicholas/CELEBORN-1800.
    
    Lead-authored-by: SteNicholas <[email protected]>
    Co-authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 186 ++++++++++++++++++++-
 .../celeborn/plugin/flink/RemoteShuffleMaster.java |  11 +-
 .../shuffle/celeborn/SparkShuffleManager.java      |   1 +
 .../CelebornShuffleFallbackPolicyRunner.scala      |  31 ++--
 .../shuffle/celeborn/SparkShuffleManager.java      |   2 +
 .../CelebornShuffleFallbackPolicyRunner.scala      |  28 +++-
 .../celeborn/client/ApplicationHeartbeater.scala   |  15 +-
 .../apache/celeborn/client/LifecycleManager.scala  |  26 ++-
 common/src/main/proto/TransportMessages.proto      |   4 +
 .../common/protocol/message/ControlMessages.scala  |   8 +
 .../apache/celeborn/common/util/PbSerDeUtils.scala |   4 +
 docs/monitoring.md                                 |  44 ++---
 .../master/clustermeta/AbstractMetaManager.java    |  27 ++-
 .../master/clustermeta/IMetadataHandler.java       |   2 +
 .../clustermeta/SingleMasterMetaManager.java       |  11 +-
 .../master/clustermeta/ha/HAMasterMetaManager.java |   4 +
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  18 +-
 master/src/main/proto/Resource.proto               |   2 +
 .../celeborn/service/deploy/master/Master.scala    |  20 ++-
 .../service/deploy/master/MasterSource.scala       |   5 +-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       |  65 +++++--
 .../ha/RatisMasterStatusSystemSuiteJ.java          |  64 +++++--
 22 files changed, 476 insertions(+), 102 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 8e98e6402..d98edb8b6 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1472,7 +1472,7 @@
             "type": "prometheus",
             "uid": "${DS_PROMETHEUS}"
           },
-          "description": "The total count of shuffle including celeborn 
shuffle and spark built-in shuffle.",
+          "description": "The total count of shuffle including celeborn 
shuffle and engine built-in shuffle.",
           "fieldConfig": {
             "defaults": {
               "color": {
@@ -1652,7 +1652,7 @@
             "type": "prometheus",
             "uid": "${DS_PROMETHEUS}"
           },
-          "description": "The ApplyCompletedIndex of current master node in HA 
mode.",
+          "description": "The total count of application running with celeborn 
shuffle and engine built-in shuffle.",
           "fieldConfig": {
             "defaults": {
               "color": {
@@ -1710,6 +1710,186 @@
             "x": 0,
             "y": 90
           },
+          "id": 239,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": 
"metrics_ApplicationTotalCount_Value{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ApplicationTotalCount_Value",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "The count of application fallbacks.",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 90
+          },
+          "id": 240,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": 
"metrics_ApplicationFallbackCount_Value{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ApplicationFallbackCount_Value",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "The ApplyCompletedIndex of current master node in HA 
mode.",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 98
+          },
           "id": 118,
           "options": {
             "legend": {
@@ -1802,7 +1982,7 @@
             "h": 8,
             "w": 12,
             "x": 12,
-            "y": 90
+            "y": 98
           },
           "id": 136,
           "options": {
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index cb1a9e7f9..3c65da82a 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -104,13 +104,17 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
       }
     }
 
+    lifecycleManager.applicationCount().increment();
     try {
       if (nettyShuffleServiceFactory != null) {
         Optional<ShuffleFallbackPolicy> shuffleFallbackPolicy =
             ShuffleFallbackPolicyRunner.getActivatedFallbackPolicy(context, 
conf, lifecycleManager);
         if (shuffleFallbackPolicy.isPresent()) {
           LOG.warn("Fallback to vanilla Flink NettyShuffleMaster for job: 
{}.", jobID);
-          jobFallbackPolicies.put(jobID, 
shuffleFallbackPolicy.get().getClass().getName());
+          String jobFallbackPolicy = 
shuffleFallbackPolicy.get().getClass().getName();
+          jobFallbackPolicies.put(jobID, jobFallbackPolicy);
+          lifecycleManager.computeFallbackCounts(
+              lifecycleManager.applicationFallbackCounts(), jobFallbackPolicy);
           nettyShuffleMaster().registerJob(context);
           return;
         }
@@ -158,9 +162,8 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
           String jobFallbackPolicy = jobFallbackPolicies.get(jobID);
           if (jobFallbackPolicy != null) {
             try {
-              lifecycleManager
-                  .shuffleFallbackCounts()
-                  .compute(jobFallbackPolicy, (key, value) -> value == null ? 
1L : value + 1L);
+              lifecycleManager.computeFallbackCounts(
+                  lifecycleManager.shuffleFallbackCounts(), jobFallbackPolicy);
               return nettyShuffleMaster()
                   .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
                   .get();
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 41fe777b7..97c393f3b 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -97,6 +97,7 @@ public class SparkShuffleManager implements ShuffleManager {
         if (lifecycleManager == null) {
           appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+          lifecycleManager.applicationCount().increment();
           
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
           if (celebornConf.clientStageRerunEnabled()) {
             MapOutputTrackerMaster mapOutputTracker =
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 4f501eff3..b5d54ef4d 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle.celeborn
 
+import java.util.concurrent.ConcurrentHashMap
 import java.util.function.BiFunction
 
 import scala.collection.JavaConverters._
@@ -44,19 +45,27 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
         throw new CelebornIOException(
           "Fallback to spark built-in shuffle implementation is prohibited.")
       } else {
-        lifecycleManager.shuffleFallbackCounts.compute(
-          fallbackPolicy.get.getClass.getName,
-          new BiFunction[String, java.lang.Long, java.lang.Long] {
-            override def apply(k: String, v: java.lang.Long): java.lang.Long = 
{
-              if (v == null) {
-                1L
-              } else {
-                v + 1L
-              }
-            }
-          })
+        val shuffleFallbackPolicy = fallbackPolicy.get.getClass.getName
+        computeFallbackCounts(shuffleFallbackPolicy, 
lifecycleManager.shuffleFallbackCounts)
+        computeFallbackCounts(shuffleFallbackPolicy, 
lifecycleManager.applicationFallbackCounts)
       }
     }
     fallbackPolicy.isDefined
   }
+
+  private def computeFallbackCounts(
+      fallbackPolicy: String,
+      fallbackCounts: ConcurrentHashMap[String, java.lang.Long]) = {
+    fallbackCounts.compute(
+      fallbackPolicy,
+      new BiFunction[String, java.lang.Long, java.lang.Long] {
+        override def apply(k: String, v: java.lang.Long): java.lang.Long = {
+          if (v == null) {
+            1L
+          } else {
+            v + 1L
+          }
+        }
+      })
+  }
 }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index a3e75cd10..8c099b29f 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -139,6 +139,7 @@ public class SparkShuffleManager implements ShuffleManager {
         if (lifecycleManager == null) {
           appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+          lifecycleManager.applicationCount().increment();
           
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
           if (celebornConf.clientStageRerunEnabled()) {
             MapOutputTrackerMaster mapOutputTracker =
@@ -234,6 +235,7 @@ public class SparkShuffleManager implements ShuffleManager {
 
   @Override
   public void stop() {
+    sortShuffleIds.clear();
     if (shuffleClient != null) {
       shuffleClient.shutdown();
       ShuffleClient.reset();
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index a78b642f2..37dbcac80 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.shuffle.celeborn
 
+import java.util.concurrent.ConcurrentHashMap
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.ShuffleDependency
@@ -42,17 +44,25 @@ class CelebornShuffleFallbackPolicyRunner(conf: 
CelebornConf) extends Logging {
         throw new CelebornIOException(
           "Fallback to spark built-in shuffle implementation is prohibited.")
       } else {
-        lifecycleManager.shuffleFallbackCounts.compute(
-          fallbackPolicy.get.getClass.getName,
-          (_, v) => {
-            if (v == null) {
-              1L
-            } else {
-              v + 1L
-            }
-          })
+        val shuffleFallbackPolicy = fallbackPolicy.get.getClass.getName
+        computeFallbackCounts(shuffleFallbackPolicy, 
lifecycleManager.shuffleFallbackCounts)
+        computeFallbackCounts(shuffleFallbackPolicy, 
lifecycleManager.applicationFallbackCounts)
       }
     }
     fallbackPolicy.isDefined
   }
+
+  private def computeFallbackCounts(
+      fallbackPolicy: String,
+      fallbackCounts: ConcurrentHashMap[String, java.lang.Long]) = {
+    fallbackCounts.compute(
+      fallbackPolicy,
+      (_, v) => {
+        if (v == null) {
+          1L
+        } else {
+          v + 1L
+        }
+      })
+  }
 }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index 3d8d23019..94c33d146 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -37,7 +37,9 @@ class ApplicationHeartbeater(
     appId: String,
     conf: CelebornConf,
     masterClient: MasterClient,
-    shuffleMetrics: () => ((Long, Long), (Long, Map[String, java.lang.Long])),
+    shuffleMetrics: () => (
+        (Long, Long),
+        (Long, Long, Map[String, java.lang.Long], Map[String, 
java.lang.Long])),
     workerStatusTracker: WorkerStatusTracker,
     registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
     cancelAllActiveStages: String => Unit) extends Logging {
@@ -61,10 +63,15 @@ class ApplicationHeartbeater(
             require(masterClient != null, "When sending a heartbeat, client 
shouldn't be null.")
             val (
               (tmpTotalWritten, tmpTotalFileCount),
-              (tmpShuffleCount, tmpShuffleFallbackCounts)) = shuffleMetrics()
+              (
+                tmpShuffleCount,
+                tmpApplicationCount,
+                tmpShuffleFallbackCounts,
+                tmpApplicationFallbackCounts)) = shuffleMetrics()
             logInfo("Send app heartbeat with " +
               s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: 
$tmpTotalFileCount, " +
-              s"shuffle count: $tmpShuffleCount, shuffle fallback counts: 
$tmpShuffleFallbackCounts")
+              s"shuffle count: $tmpShuffleCount, shuffle fallback counts: 
$tmpShuffleFallbackCounts, " +
+              s"application count: $tmpApplicationCount, application fallback 
counts: $tmpApplicationFallbackCounts")
             // UserResourceConsumption and DiskInfo are eliminated from 
WorkerInfo
             // during serialization of HeartbeatFromApplication
             val appHeartbeat =
@@ -73,7 +80,9 @@ class ApplicationHeartbeater(
                 tmpTotalWritten,
                 tmpTotalFileCount,
                 tmpShuffleCount,
+                tmpApplicationCount,
                 tmpShuffleFallbackCounts.asJava,
+                tmpApplicationFallbackCounts.asJava,
                 workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
                 ZERO_UUID,
                 true)
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bcb018570..ae692f908 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -24,7 +24,7 @@ import java.util
 import java.util.{function, List => JList}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.{BiConsumer, BiFunction, Consumer}
 
 import scala.collection.JavaConverters._
 import scala.collection.generic.CanBuildFrom
@@ -91,7 +91,9 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
   val shuffleCount = new LongAdder()
+  val applicationCount = new LongAdder()
   val shuffleFallbackCounts = JavaUtils.newConcurrentHashMap[String, 
java.lang.Long]()
+  val applicationFallbackCounts = JavaUtils.newConcurrentHashMap[String, 
java.lang.Long]()
   // maintain each shuffle's map relation of WorkerInfo and partition location
   val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
   // shuffle id -> (partitionId -> newest PartitionLocation)
@@ -223,15 +225,17 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       masterClient,
       () => {
         commitManager.commitMetrics() ->
-          (shuffleCount.sumThenReset(), resetShuffleFallbackCounts())
+          (shuffleCount.sumThenReset(), applicationCount.sumThenReset(), 
resetFallbackCounts(
+            shuffleFallbackCounts), 
resetFallbackCounts(applicationFallbackCounts))
       },
       workerStatusTracker,
       registeredShuffle,
       reason => cancelAllActiveStages(reason))
-  private def resetShuffleFallbackCounts(): Map[String, java.lang.Long] = {
+  private def resetFallbackCounts(counts: ConcurrentHashMap[String, 
java.lang.Long])
+      : Map[String, java.lang.Long] = {
     val fallbackCounts = new util.HashMap[String, java.lang.Long]()
-    shuffleFallbackCounts.keys().asScala.foreach { key =>
-      Option(shuffleFallbackCounts.remove(key)).filter(_ > 
0).foreach(fallbackCounts.put(key, _))
+    counts.keys().asScala.foreach { key =>
+      Option(counts.remove(key)).filter(_ > 0).foreach(fallbackCounts.put(key, 
_))
     }
     fallbackCounts.asScala.toMap
   }
@@ -1927,4 +1931,16 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
       case _ =>
     }
   }
+
+  def computeFallbackCounts(
+      fallbackCounts: ConcurrentHashMap[String, java.lang.Long],
+      fallbackPolicy: String): Unit = {
+    fallbackCounts.compute(
+      fallbackPolicy,
+      new BiFunction[String, java.lang.Long, java.lang.Long] {
+        override def apply(k: String, v: java.lang.Long): java.lang.Long = {
+          if (v == null) 1L else v + 1L
+        }
+      })
+  }
 }
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index c120b9952..4ca924582 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -462,6 +462,8 @@ message PbHeartbeatFromApplication {
   bool shouldResponse = 6;
   int64 shuffleCount = 7;
   map<string, int64> shuffleFallbackCounts = 8;
+  int64 applicationCount = 9;
+  map<string, int64> applicationFallbackCounts = 10;
 }
 
 message PbHeartbeatFromApplicationResponse {
@@ -698,6 +700,8 @@ message PbSnapshotMetaInfo {
   repeated PbWorkerInfo decommissionWorkers = 17;
   int64 shuffleTotalCount = 18;
   map<string, int64> shuffleFallbackCounts = 19;
+  int64 applicationTotalCount = 20;
+  map<string, int64> applicationFallbackCounts = 21;
 }
 
 message PbOpenStream {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index ccd5d79a2..009c3a4fc 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -422,7 +422,9 @@ object ControlMessages extends Logging {
       totalWritten: Long,
       fileCount: Long,
       shuffleCount: Long,
+      applicationCount: Long,
       shuffleFallbackCounts: util.Map[String, java.lang.Long],
+      applicationFallbackCounts: util.Map[String, java.lang.Long],
       needCheckedWorkerList: util.List[WorkerInfo],
       override var requestId: String = ZERO_UUID,
       shouldResponse: Boolean = false) extends MasterRequestMessage
@@ -840,7 +842,9 @@ object ControlMessages extends Logging {
           totalWritten,
           fileCount,
           shuffleCount,
+          applicationCount,
           shuffleFallbackCounts,
+          applicationFallbackCounts,
           needCheckedWorkerList,
           requestId,
           shouldResponse) =>
@@ -850,7 +854,9 @@ object ControlMessages extends Logging {
         .setTotalWritten(totalWritten)
         .setFileCount(fileCount)
         .setShuffleCount(shuffleCount)
+        .setApplicationCount(applicationCount)
         .putAllShuffleFallbackCounts(shuffleFallbackCounts)
+        .putAllApplicationFallbackCounts(applicationFallbackCounts)
         .addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
           PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
         .setShouldResponse(shouldResponse)
@@ -1262,7 +1268,9 @@ object ControlMessages extends Logging {
           pbHeartbeatFromApplication.getTotalWritten,
           pbHeartbeatFromApplication.getFileCount,
           pbHeartbeatFromApplication.getShuffleCount,
+          pbHeartbeatFromApplication.getApplicationCount,
           pbHeartbeatFromApplication.getShuffleFallbackCountsMap,
+          pbHeartbeatFromApplication.getApplicationFallbackCountsMap,
           new util.ArrayList[WorkerInfo](
             pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
               .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 6ba547684..7f51e158f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -428,7 +428,9 @@ object PbSerDeUtils {
       partitionTotalWritten: java.lang.Long,
       partitionTotalFileCount: java.lang.Long,
       shuffleTotalCount: java.lang.Long,
+      applicationTotalCount: java.lang.Long,
       shuffleFallbackCounts: java.util.Map[String, java.lang.Long],
+      applicationFallbackCounts: java.util.Map[String, java.lang.Long],
       lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
       shutdownWorkers: java.util.Set[WorkerInfo],
       workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
@@ -449,7 +451,9 @@ object PbSerDeUtils {
       .setPartitionTotalWritten(partitionTotalWritten)
       .setPartitionTotalFileCount(partitionTotalFileCount)
       .setShuffleTotalCount(shuffleTotalCount)
+      .setApplicationTotalCount(applicationTotalCount)
       .putAllShuffleFallbackCounts(shuffleFallbackCounts)
+      .putAllApplicationFallbackCounts(applicationFallbackCounts)
       .putAllLostWorkers(lostWorkers.asScala.map {
         case (worker: WorkerInfo, time: java.lang.Long) => (worker.toUniqueId, 
time)
       }.asJava)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 95cefcf92..d2f8ced8f 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -92,27 +92,29 @@ These metrics are exposed by Celeborn master.
 
   - namespace=master
     
-    | Metric Name                  | Description                               
                                        |
-    
|------------------------------|-----------------------------------------------------------------------------------|
-    | RegisteredShuffleCount       | The count of registered shuffle.          
                                        |
-    | DeviceCelebornFreeBytes      | The actual usable space of Celeborn 
available workers for device.                 |
-    | DeviceCelebornTotalBytes     | The total space of Celeborn for device.   
                                        |
-    | RunningApplicationCount      | The count of running applications.        
                                        |
-    | ActiveShuffleSize            | The active shuffle size of workers.       
                                        |
-    | ActiveShuffleFileCount       | The active shuffle file count of workers. 
                                        |
-    | ShuffleTotalCount            | The total count of shuffle including 
celeborn shuffle and spark built-in shuffle. |
-    | ShuffleFallbackCount         | The count of shuffle fallbacks.           
                                        |
-    | WorkerCount                  | The count of active workers.              
                                        |
-    | LostWorkerCount              | The count of workers in lost list.        
                                        |
-    | ExcludedWorkerCount          | The count of workers in excluded list.    
                                        |
-    | AvailableWorkerCount         | The count of workers in available list.   
                                        |
-    | ShutdownWorkerCount          | The count of workers in shutdown list.    
                                        |
-    | DecommissionWorkerCount      | The count of workers in decommission 
list.                                        |
-    | IsActiveMaster               | Whether the current master is active.     
                                        |
-    | RatisApplyCompletedIndex     | The ApplyCompletedIndex of the current 
master node in HA mode.                    |
-    | RatisApplyCompletedIndexDiff | The difference value of 
ApplyCompletedIndex of the master nodes in HA mode.       |
-    | PartitionSize                | The size of estimated shuffle partition.  
                                        |
-    | OfferSlotsTime               | The time for masters to handle 
`RequestSlots` request when registering shuffle.   |
+    | Metric Name                  | Description                               
                                                |
+    
|------------------------------|-------------------------------------------------------------------------------------------|
+    | RegisteredShuffleCount       | The count of registered shuffle.          
                                                |
+    | DeviceCelebornFreeBytes      | The actual usable space of Celeborn 
available workers for device.                         |
+    | DeviceCelebornTotalBytes     | The total space of Celeborn for device.   
                                                |
+    | RunningApplicationCount      | The count of running applications.        
                                                |
+    | ActiveShuffleSize            | The active shuffle size of workers.       
                                                |
+    | ActiveShuffleFileCount       | The active shuffle file count of workers. 
                                                |
+    | ShuffleTotalCount            | The total count of shuffle including 
celeborn shuffle and engine built-in shuffle.        |
+    | ShuffleFallbackCount         | The count of shuffle fallbacks.           
                                                |
+    | ApplicationTotalCount        | The total count of application running 
with celeborn shuffle and engine built-in shuffle. |
+    | ApplicationFallbackCount     | The count of application fallbacks.       
                                                |
+    | WorkerCount                  | The count of active workers.              
                                                |
+    | LostWorkerCount              | The count of workers in lost list.        
                                                |
+    | ExcludedWorkerCount          | The count of workers in excluded list.    
                                                |
+    | AvailableWorkerCount         | The count of workers in available list.   
                                                |
+    | ShutdownWorkerCount          | The count of workers in shutdown list.    
                                                |
+    | DecommissionWorkerCount      | The count of workers in decommission 
list.                                                |
+    | IsActiveMaster               | Whether the current master is active.     
                                                |
+    | RatisApplyCompletedIndex     | The ApplyCompletedIndex of the current 
master node in HA mode.                            |
+    | RatisApplyCompletedIndexDiff | The difference value of 
ApplyCompletedIndex of the master nodes in HA mode.               |
+    | PartitionSize                | The size of estimated shuffle partition.  
                                                |
+    | OfferSlotsTime               | The time for masters to handle 
`RequestSlots` request when registering shuffle.           |
 
   - namespace=CPU
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 2acaea57b..6bee54125 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -92,7 +92,9 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public final LongAdder partitionTotalWritten = new LongAdder();
   public final LongAdder partitionTotalFileCount = new LongAdder();
   public final LongAdder shuffleTotalCount = new LongAdder();
+  public final LongAdder applicationTotalCount = new LongAdder();
   public final Map<String, Long> shuffleFallbackCounts = 
JavaUtils.newConcurrentHashMap();
+  public final Map<String, Long> applicationFallbackCounts = 
JavaUtils.newConcurrentHashMap();
 
   public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
       JavaUtils.newConcurrentHashMap();
@@ -152,12 +154,16 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       long totalWritten,
       long fileCount,
       long shuffleCount,
-      Map<String, Long> shuffleFallbackCounts) {
+      long applicationCount,
+      Map<String, Long> shuffleFallbackCounts,
+      Map<String, Long> applicationFallbackCounts) {
     appHeartbeatTime.put(appId, time);
     partitionTotalWritten.add(totalWritten);
     partitionTotalFileCount.add(fileCount);
     shuffleTotalCount.add(shuffleCount);
-    addShuffleFallbackCounts(shuffleFallbackCounts);
+    applicationTotalCount.add(applicationCount);
+    addFallbackCounts(this.shuffleFallbackCounts, shuffleFallbackCounts);
+    addFallbackCounts(this.applicationFallbackCounts, 
applicationFallbackCounts);
   }
 
   public void updateAppLostMeta(String appId) {
@@ -364,7 +370,9 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                 partitionTotalWritten.sum(),
                 partitionTotalFileCount.sum(),
                 shuffleTotalCount.sum(),
+                applicationTotalCount.sum(),
                 shuffleFallbackCounts,
+                applicationFallbackCounts,
                 lostWorkers,
                 shutdownWorkers,
                 workerEventInfos,
@@ -464,7 +472,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
       
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
       shuffleTotalCount.add(snapshotMetaInfo.getShuffleTotalCount());
-      addShuffleFallbackCounts(snapshotMetaInfo.getShuffleFallbackCountsMap());
+      applicationTotalCount.add(snapshotMetaInfo.getApplicationTotalCount());
+      addFallbackCounts(shuffleFallbackCounts, 
snapshotMetaInfo.getShuffleFallbackCountsMap());
+      addFallbackCounts(
+          applicationFallbackCounts, 
snapshotMetaInfo.getApplicationFallbackCountsMap());
 
       snapshotMetaInfo
           .getApplicationMetasMap()
@@ -505,7 +516,9 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     partitionTotalWritten.reset();
     partitionTotalFileCount.reset();
     shuffleTotalCount.reset();
+    applicationTotalCount.reset();
     shuffleFallbackCounts.clear();
+    applicationFallbackCounts.clear();
     workerEventInfos.clear();
     applicationMetas.clear();
   }
@@ -609,10 +622,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     return 
registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
   }
 
-  private void addShuffleFallbackCounts(Map<String, Long> fallbackCounts) {
-    for (String fallbackPolicy : fallbackCounts.keySet()) {
-      shuffleFallbackCounts.compute(
-          fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v + 
fallbackCounts.get(k));
+  private void addFallbackCounts(Map<String, Long> fallbackCounts, Map<String, 
Long> counts) {
+    for (String fallbackPolicy : counts.keySet()) {
+      fallbackCounts.compute(
+          fallbackPolicy, (k, v) -> v == null ? counts.get(k) : v + 
counts.get(k));
     }
   }
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index c25135634..34fd74e34 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -43,7 +43,9 @@ public interface IMetadataHandler {
       long totalWritten,
       long fileCount,
       long shuffleCount,
+      long applicationCount,
       Map<String, Long> shuffleFallbackCounts,
+      Map<String, Long> applicationFallbackCounts,
       long time,
       String requestId);
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 779ef4683..4d4fa3b1a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -75,11 +75,20 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
       long totalWritten,
       long fileCount,
       long shuffleCount,
+      long applicationCount,
       Map<String, Long> shuffleFallbackCounts,
+      Map<String, Long> applicationFallbackCounts,
       long time,
       String requestId) {
     updateAppHeartbeatMeta(
-        appId, time, totalWritten, fileCount, shuffleCount, 
shuffleFallbackCounts);
+        appId,
+        time,
+        totalWritten,
+        fileCount,
+        shuffleCount,
+        applicationCount,
+        shuffleFallbackCounts,
+        applicationFallbackCounts);
   }
 
   @Override
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 6d72627f8..e3319ffc1 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -132,7 +132,9 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
       long totalWritten,
       long fileCount,
       long shuffleCount,
+      long applicationCount,
       Map<String, Long> shuffleFallbackCounts,
+      Map<String, Long> applicationFallbackCounts,
       long time,
       String requestId) {
     try {
@@ -147,7 +149,9 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
                       .setTotalWritten(totalWritten)
                       .setFileCount(fileCount)
                       .setShuffleCount(shuffleCount)
+                      .setApplicationCount(applicationCount)
                       .putAllShuffleFallbackCounts(shuffleFallbackCounts)
+                      
.putAllApplicationFallbackCounts(applicationFallbackCounts)
                       .build())
               .build());
     } catch (CelebornRuntimeException e) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 44a82d8d3..abe21a4de 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -141,7 +141,12 @@ public class MetaHandler {
           long totalWritten = 
request.getAppHeartbeatRequest().getTotalWritten();
           long fileCount = request.getAppHeartbeatRequest().getFileCount();
           long shuffleCount = 
request.getAppHeartbeatRequest().getShuffleCount();
-          LOG.debug("Handle app heartbeat for {} with shuffle count {}", 
appId, shuffleCount);
+          long applicationCount = 
request.getAppHeartbeatRequest().getApplicationCount();
+          LOG.debug(
+              "Handle app heartbeat for {} with shuffle count {} and 
application count {}",
+              appId,
+              shuffleCount,
+              applicationCount);
           Map<String, Long> shuffleFallbackCounts =
               request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
           if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
@@ -150,8 +155,17 @@ public class MetaHandler {
                 shuffleFallbackCounts.values().stream().mapToLong(v -> 
v).sum(),
                 appId);
           }
+          Map<String, Long> applicationFallbackCounts =
+              
request.getAppHeartbeatRequest().getApplicationFallbackCountsMap();
           metaSystem.updateAppHeartbeatMeta(
-              appId, time, totalWritten, fileCount, shuffleCount, 
shuffleFallbackCounts);
+              appId,
+              time,
+              totalWritten,
+              fileCount,
+              shuffleCount,
+              applicationCount,
+              shuffleFallbackCounts,
+              applicationFallbackCounts);
           break;
 
         case AppLost:
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index 8837ea494..c8e9c2aee 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -132,6 +132,8 @@ message AppHeartbeatRequest {
   optional int64 shuffleCount = 5;
   map<string, int64> shuffleFallbackCounts = 6;
   HeartbeatInfo heartbeatInfo = 7;
+  optional int64 applicationCount = 8;
+  map<string, int64> applicationFallbackCounts = 9;
 }
 
 message AppLostRequest {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1400a832e..dfe4e03a1 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -271,10 +271,18 @@ private[celeborn] class Master(
     statusSystem.shuffleTotalCount.sum()
   }
 
+  masterSource.addGauge(MasterSource.APPLICATION_TOTAL_COUNT) { () =>
+    statusSystem.applicationTotalCount.sum()
+  }
+
   masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
     statusSystem.shuffleFallbackCounts.values().asScala.map(_.longValue()).sum
   }
 
+  masterSource.addGauge(MasterSource.APPLICATION_FALLBACK_COUNT) { () =>
+    
statusSystem.applicationFallbackCounts.values().asScala.map(_.longValue()).sum
+  }
+
   masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
     statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
   }
@@ -414,8 +422,10 @@ private[celeborn] class Master(
           appId,
           totalWritten,
           fileCount,
-          shuffleFallbackCount,
+          shuffleCount,
+          applicationCount,
           shuffleFallbackCounts,
+          applicationFallbackCounts,
           needCheckedWorkerList,
           requestId,
           shouldResponse) =>
@@ -428,8 +438,10 @@ private[celeborn] class Master(
           appId,
           totalWritten,
           fileCount,
-          shuffleFallbackCount,
+          shuffleCount,
+          applicationCount,
           shuffleFallbackCounts,
+          applicationFallbackCounts,
           needCheckedWorkerList,
           requestId,
           shouldResponse))
@@ -1126,7 +1138,9 @@ private[celeborn] class Master(
       totalWritten: Long,
       fileCount: Long,
       shuffleCount: Long,
+      applicationCount: Long,
       shuffleFallbackCounts: util.Map[String, java.lang.Long],
+      applicationFallbackCounts: util.Map[String, java.lang.Long],
       needCheckedWorkerList: util.List[WorkerInfo],
       requestId: String,
       shouldResponse: Boolean): Unit = {
@@ -1135,7 +1149,9 @@ private[celeborn] class Master(
       totalWritten,
       fileCount,
       shuffleCount,
+      applicationCount,
       shuffleFallbackCounts,
+      applicationFallbackCounts,
       System.currentTimeMillis(),
       requestId)
     gaugeShuffleFallbackCounts()
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index dcc764ebb..aa28fd900 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -46,10 +46,13 @@ object MasterSource {
 
   val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
   val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
-  // The total count including RegisteredShuffleCount(celeborn shuffle) and 
ShuffleFallbackCount(spark built-in shuffle).
+  // The total count including RegisteredShuffleCount(celeborn shuffle) and 
ShuffleFallbackCount(engine built-in shuffle).
   val SHUFFLE_TOTAL_COUNT = "ShuffleTotalCount"
 
   val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"
+  val APPLICATION_FALLBACK_COUNT = "ApplicationFallbackCount"
+  // The total count including RunningApplicationCount(celeborn shuffle) and 
ApplicationFallbackCount(engine built-in shuffle).
+  val APPLICATION_TOTAL_COUNT = "ApplicationTotalCount"
 
   val IS_ACTIVE_MASTER = "IsActiveMaster"
 
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 78695460b..4f4361ef0 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -643,11 +643,13 @@ public class DefaultMetaSystemSuiteJ {
   @Test
   public void testHandleAppHeartbeat() {
     Long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
     assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
     assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2));
 
     assertEquals(2, statusSystem.appHeartbeatTime.size());
@@ -819,24 +821,28 @@ public class DefaultMetaSystemSuiteJ {
 
     Long dummy = 1235L;
     statusSystem.handleAppHeartbeat(
-        APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+        APPID1, 10000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), 
dummy, getNewReqeustId());
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Max size
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
 
     statusSystem.handleAppHeartbeat(
-        APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+        APPID1, 1000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1000l, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Min size
     statusSystem.handleUpdatePartitionSize();
@@ -909,22 +915,47 @@ public class DefaultMetaSystemSuiteJ {
   }
 
   @Test
-  public void testShuffleCountWithFallback() {
+  public void testShuffleAndApplicationCountWithFallback() {
     statusSystem.shuffleTotalCount.reset();
+    statusSystem.applicationTotalCount.reset();
     statusSystem.shuffleFallbackCounts.clear();
+    statusSystem.applicationFallbackCounts.clear();
 
     Long dummy = 1235L;
     Map<String, Long> shuffleFallbackCounts = new HashMap<>();
-    shuffleFallbackCounts.put(POLICY1, 1L);
+    Map<String, Long> applicationFallbackCounts = new HashMap<>();
+    shuffleFallbackCounts.put(POLICY1, 2L);
+    applicationFallbackCounts.put(POLICY1, 1L);
     statusSystem.handleAppHeartbeat(
-        APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+        APPID1,
+        1000l,
+        1,
+        2,
+        1,
+        shuffleFallbackCounts,
+        applicationFallbackCounts,
+        dummy,
+        getNewReqeustId());
     shuffleFallbackCounts.put(POLICY1, 1L);
-    shuffleFallbackCounts.put(POLICY2, 1L);
+    shuffleFallbackCounts.put(POLICY2, 2L);
+    applicationFallbackCounts.put(POLICY1, 1L);
+    applicationFallbackCounts.put(POLICY2, 1L);
     statusSystem.handleAppHeartbeat(
-        APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
-
-    assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
-    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(), 
2);
-    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(), 
1);
+        APPID1,
+        1000l,
+        1,
+        3,
+        2,
+        shuffleFallbackCounts,
+        applicationFallbackCounts,
+        dummy,
+        getNewReqeustId());
+
+    assertEquals(statusSystem.shuffleTotalCount.longValue(), 5);
+    assertEquals(statusSystem.applicationTotalCount.longValue(), 3);
+    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(), 
3);
+    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(), 
2);
+    
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 
2);
+    
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 
1);
   }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 0f7f752cb..be644869f 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.service.deploy.master.clustermeta.ha;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
@@ -964,14 +965,16 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertNotNull(statusSystem);
 
     long dummy = 1235L;
-    statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
     Thread.sleep(3000L);
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
     Thread.sleep(3000L);
 
     Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
@@ -1382,24 +1385,28 @@ public class RatisMasterStatusSystemSuiteJ {
 
     Long dummy = 1235L;
     statusSystem.handleAppHeartbeat(
-        APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+        APPID1, 10000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), 
dummy, getNewReqeustId());
     String appId2 = "app02";
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Max size
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
 
     statusSystem.handleAppHeartbeat(
-        APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy, 
getNewReqeustId());
+        APPID1, 1000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
     Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
 
-    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
-    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(), 
dummy, getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        APPID1, 1000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(
+        appId2, 1000l, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
 
     // Min size
     statusSystem.handleUpdatePartitionSize();
@@ -1582,24 +1589,49 @@ public class RatisMasterStatusSystemSuiteJ {
   }
 
   @Test
-  public void testShuffleCountWithFallback() {
+  public void testShuffleAndApplicationCountWithFallback() {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();
     Assert.assertNotNull(statusSystem);
     statusSystem.shuffleTotalCount.reset();
+    statusSystem.applicationTotalCount.reset();
     statusSystem.shuffleFallbackCounts.clear();
+    statusSystem.applicationFallbackCounts.clear();
 
     Long dummy = 1235L;
     Map<String, Long> shuffleFallbackCounts = new HashMap<>();
-    shuffleFallbackCounts.put(POLICY1, 1L);
+    Map<String, Long> applicationFallbackCounts = new HashMap<>();
+    shuffleFallbackCounts.put(POLICY1, 2L);
+    applicationFallbackCounts.put(POLICY1, 1L);
     statusSystem.handleAppHeartbeat(
-        APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+        APPID1,
+        1000l,
+        1,
+        2,
+        1,
+        shuffleFallbackCounts,
+        applicationFallbackCounts,
+        dummy,
+        getNewReqeustId());
     shuffleFallbackCounts.put(POLICY1, 1L);
-    shuffleFallbackCounts.put(POLICY2, 1L);
+    shuffleFallbackCounts.put(POLICY2, 2L);
+    applicationFallbackCounts.put(POLICY1, 1L);
+    applicationFallbackCounts.put(POLICY2, 1L);
     statusSystem.handleAppHeartbeat(
-        APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
+        APPID1,
+        1000l,
+        1,
+        3,
+        2,
+        shuffleFallbackCounts,
+        applicationFallbackCounts,
+        dummy,
+        getNewReqeustId());
 
-    Assert.assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
-    
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
 2);
-    
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
 1);
+    assertEquals(statusSystem.shuffleTotalCount.longValue(), 5);
+    assertEquals(statusSystem.applicationTotalCount.longValue(), 3);
+    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(), 
3);
+    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(), 
2);
+    
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 
2);
+    
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 
1);
   }
 }

Reply via email to