This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d9984c9e0 [CELEBORN-1800] Introduce ApplicationTotalCount and
ApplicationFallbackCount metric to record the total and fallback count of
application
d9984c9e0 is described below
commit d9984c9e0e4ce6f763cc43b265b500bc97f05497
Author: SteNicholas <[email protected]>
AuthorDate: Mon May 19 07:20:00 2025 -0700
[CELEBORN-1800] Introduce ApplicationTotalCount and
ApplicationFallbackCount metric to record the total and fallback count of
application
### What changes were proposed in this pull request?
Introduce `ApplicationTotalCount` and `ApplicationFallbackCount` metric to
record the total and fallback count of application.
### Why are the changes needed?
There is no any metric to record the total count of application running
with celeborn shuffle and engine bulit-in shuffle and the fallback count of
application. Meanwhile, the fallback of Flink shuffle is based on job
granularity rather than shuffle granularity.
Follw up
https://github.com/apache/celeborn/pull/3012#issuecomment-2553488532.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `DefaultMetaSystemSuiteJ#testShuffleAndApplicationCountWithFallback`
- `RatisMasterStatusSystemSuiteJ#testShuffleAndApplicationCountWithFallback`
Closes #3026 from SteNicholas/CELEBORN-1800.
Lead-authored-by: SteNicholas <[email protected]>
Co-authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 186 ++++++++++++++++++++-
.../celeborn/plugin/flink/RemoteShuffleMaster.java | 11 +-
.../shuffle/celeborn/SparkShuffleManager.java | 1 +
.../CelebornShuffleFallbackPolicyRunner.scala | 31 ++--
.../shuffle/celeborn/SparkShuffleManager.java | 2 +
.../CelebornShuffleFallbackPolicyRunner.scala | 28 +++-
.../celeborn/client/ApplicationHeartbeater.scala | 15 +-
.../apache/celeborn/client/LifecycleManager.scala | 26 ++-
common/src/main/proto/TransportMessages.proto | 4 +
.../common/protocol/message/ControlMessages.scala | 8 +
.../apache/celeborn/common/util/PbSerDeUtils.scala | 4 +
docs/monitoring.md | 44 ++---
.../master/clustermeta/AbstractMetaManager.java | 27 ++-
.../master/clustermeta/IMetadataHandler.java | 2 +
.../clustermeta/SingleMasterMetaManager.java | 11 +-
.../master/clustermeta/ha/HAMasterMetaManager.java | 4 +
.../deploy/master/clustermeta/ha/MetaHandler.java | 18 +-
master/src/main/proto/Resource.proto | 2 +
.../celeborn/service/deploy/master/Master.scala | 20 ++-
.../service/deploy/master/MasterSource.scala | 5 +-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 65 +++++--
.../ha/RatisMasterStatusSystemSuiteJ.java | 64 +++++--
22 files changed, 476 insertions(+), 102 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 8e98e6402..d98edb8b6 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1472,7 +1472,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "description": "The total count of shuffle including celeborn
shuffle and spark built-in shuffle.",
+ "description": "The total count of shuffle including celeborn
shuffle and engine built-in shuffle.",
"fieldConfig": {
"defaults": {
"color": {
@@ -1652,7 +1652,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "description": "The ApplyCompletedIndex of current master node in HA
mode.",
+ "description": "The total count of application running with celeborn
shuffle and engine built-in shuffle.",
"fieldConfig": {
"defaults": {
"color": {
@@ -1710,6 +1710,186 @@
"x": 0,
"y": 90
},
+ "id": 239,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr":
"metrics_ApplicationTotalCount_Value{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ApplicationTotalCount_Value",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "The count of application fallbacks.",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 90
+ },
+ "id": 240,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr":
"metrics_ApplicationFallbackCount_Value{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ApplicationFallbackCount_Value",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "The ApplyCompletedIndex of current master node in HA
mode.",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 98
+ },
"id": 118,
"options": {
"legend": {
@@ -1802,7 +1982,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 90
+ "y": 98
},
"id": 136,
"options": {
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index cb1a9e7f9..3c65da82a 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -104,13 +104,17 @@ public class RemoteShuffleMaster implements
ShuffleMaster<ShuffleDescriptor> {
}
}
+ lifecycleManager.applicationCount().increment();
try {
if (nettyShuffleServiceFactory != null) {
Optional<ShuffleFallbackPolicy> shuffleFallbackPolicy =
ShuffleFallbackPolicyRunner.getActivatedFallbackPolicy(context,
conf, lifecycleManager);
if (shuffleFallbackPolicy.isPresent()) {
LOG.warn("Fallback to vanilla Flink NettyShuffleMaster for job:
{}.", jobID);
- jobFallbackPolicies.put(jobID,
shuffleFallbackPolicy.get().getClass().getName());
+ String jobFallbackPolicy =
shuffleFallbackPolicy.get().getClass().getName();
+ jobFallbackPolicies.put(jobID, jobFallbackPolicy);
+ lifecycleManager.computeFallbackCounts(
+ lifecycleManager.applicationFallbackCounts(), jobFallbackPolicy);
nettyShuffleMaster().registerJob(context);
return;
}
@@ -158,9 +162,8 @@ public class RemoteShuffleMaster implements
ShuffleMaster<ShuffleDescriptor> {
String jobFallbackPolicy = jobFallbackPolicies.get(jobID);
if (jobFallbackPolicy != null) {
try {
- lifecycleManager
- .shuffleFallbackCounts()
- .compute(jobFallbackPolicy, (key, value) -> value == null ?
1L : value + 1L);
+ lifecycleManager.computeFallbackCounts(
+ lifecycleManager.shuffleFallbackCounts(), jobFallbackPolicy);
return nettyShuffleMaster()
.registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
.get();
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 41fe777b7..97c393f3b 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -97,6 +97,7 @@ public class SparkShuffleManager implements ShuffleManager {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+ lifecycleManager.applicationCount().increment();
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientStageRerunEnabled()) {
MapOutputTrackerMaster mapOutputTracker =
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index 4f501eff3..b5d54ef4d 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.celeborn
+import java.util.concurrent.ConcurrentHashMap
import java.util.function.BiFunction
import scala.collection.JavaConverters._
@@ -44,19 +45,27 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
} else {
- lifecycleManager.shuffleFallbackCounts.compute(
- fallbackPolicy.get.getClass.getName,
- new BiFunction[String, java.lang.Long, java.lang.Long] {
- override def apply(k: String, v: java.lang.Long): java.lang.Long =
{
- if (v == null) {
- 1L
- } else {
- v + 1L
- }
- }
- })
+ val shuffleFallbackPolicy = fallbackPolicy.get.getClass.getName
+ computeFallbackCounts(shuffleFallbackPolicy,
lifecycleManager.shuffleFallbackCounts)
+ computeFallbackCounts(shuffleFallbackPolicy,
lifecycleManager.applicationFallbackCounts)
}
}
fallbackPolicy.isDefined
}
+
+ private def computeFallbackCounts(
+ fallbackPolicy: String,
+ fallbackCounts: ConcurrentHashMap[String, java.lang.Long]) = {
+ fallbackCounts.compute(
+ fallbackPolicy,
+ new BiFunction[String, java.lang.Long, java.lang.Long] {
+ override def apply(k: String, v: java.lang.Long): java.lang.Long = {
+ if (v == null) {
+ 1L
+ } else {
+ v + 1L
+ }
+ }
+ })
+ }
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index a3e75cd10..8c099b29f 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -139,6 +139,7 @@ public class SparkShuffleManager implements ShuffleManager {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+ lifecycleManager.applicationCount().increment();
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientStageRerunEnabled()) {
MapOutputTrackerMaster mapOutputTracker =
@@ -234,6 +235,7 @@ public class SparkShuffleManager implements ShuffleManager {
@Override
public void stop() {
+ sortShuffleIds.clear();
if (shuffleClient != null) {
shuffleClient.shutdown();
ShuffleClient.reset();
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index a78b642f2..37dbcac80 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -17,6 +17,8 @@
package org.apache.spark.shuffle.celeborn
+import java.util.concurrent.ConcurrentHashMap
+
import scala.collection.JavaConverters._
import org.apache.spark.ShuffleDependency
@@ -42,17 +44,25 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
} else {
- lifecycleManager.shuffleFallbackCounts.compute(
- fallbackPolicy.get.getClass.getName,
- (_, v) => {
- if (v == null) {
- 1L
- } else {
- v + 1L
- }
- })
+ val shuffleFallbackPolicy = fallbackPolicy.get.getClass.getName
+ computeFallbackCounts(shuffleFallbackPolicy,
lifecycleManager.shuffleFallbackCounts)
+ computeFallbackCounts(shuffleFallbackPolicy,
lifecycleManager.applicationFallbackCounts)
}
}
fallbackPolicy.isDefined
}
+
+ private def computeFallbackCounts(
+ fallbackPolicy: String,
+ fallbackCounts: ConcurrentHashMap[String, java.lang.Long]) = {
+ fallbackCounts.compute(
+ fallbackPolicy,
+ (_, v) => {
+ if (v == null) {
+ 1L
+ } else {
+ v + 1L
+ }
+ })
+ }
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index 3d8d23019..94c33d146 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -37,7 +37,9 @@ class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
masterClient: MasterClient,
- shuffleMetrics: () => ((Long, Long), (Long, Map[String, java.lang.Long])),
+ shuffleMetrics: () => (
+ (Long, Long),
+ (Long, Long, Map[String, java.lang.Long], Map[String,
java.lang.Long])),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
@@ -61,10 +63,15 @@ class ApplicationHeartbeater(
require(masterClient != null, "When sending a heartbeat, client
shouldn't be null.")
val (
(tmpTotalWritten, tmpTotalFileCount),
- (tmpShuffleCount, tmpShuffleFallbackCounts)) = shuffleMetrics()
+ (
+ tmpShuffleCount,
+ tmpApplicationCount,
+ tmpShuffleFallbackCounts,
+ tmpApplicationFallbackCounts)) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count:
$tmpTotalFileCount, " +
- s"shuffle count: $tmpShuffleCount, shuffle fallback counts:
$tmpShuffleFallbackCounts")
+ s"shuffle count: $tmpShuffleCount, shuffle fallback counts:
$tmpShuffleFallbackCounts, " +
+ s"application count: $tmpApplicationCount, application fallback
counts: $tmpApplicationFallbackCounts")
// UserResourceConsumption and DiskInfo are eliminated from
WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
@@ -73,7 +80,9 @@ class ApplicationHeartbeater(
tmpTotalWritten,
tmpTotalFileCount,
tmpShuffleCount,
+ tmpApplicationCount,
tmpShuffleFallbackCounts.asJava,
+ tmpApplicationFallbackCounts.asJava,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bcb018570..ae692f908 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -24,7 +24,7 @@ import java.util
import java.util.{function, List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.{BiConsumer, BiFunction, Consumer}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
@@ -91,7 +91,9 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
val shuffleCount = new LongAdder()
+ val applicationCount = new LongAdder()
val shuffleFallbackCounts = JavaUtils.newConcurrentHashMap[String,
java.lang.Long]()
+ val applicationFallbackCounts = JavaUtils.newConcurrentHashMap[String,
java.lang.Long]()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
// shuffle id -> (partitionId -> newest PartitionLocation)
@@ -223,15 +225,17 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
masterClient,
() => {
commitManager.commitMetrics() ->
- (shuffleCount.sumThenReset(), resetShuffleFallbackCounts())
+ (shuffleCount.sumThenReset(), applicationCount.sumThenReset(),
resetFallbackCounts(
+ shuffleFallbackCounts),
resetFallbackCounts(applicationFallbackCounts))
},
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
- private def resetShuffleFallbackCounts(): Map[String, java.lang.Long] = {
+ private def resetFallbackCounts(counts: ConcurrentHashMap[String,
java.lang.Long])
+ : Map[String, java.lang.Long] = {
val fallbackCounts = new util.HashMap[String, java.lang.Long]()
- shuffleFallbackCounts.keys().asScala.foreach { key =>
- Option(shuffleFallbackCounts.remove(key)).filter(_ >
0).foreach(fallbackCounts.put(key, _))
+ counts.keys().asScala.foreach { key =>
+ Option(counts.remove(key)).filter(_ > 0).foreach(fallbackCounts.put(key,
_))
}
fallbackCounts.asScala.toMap
}
@@ -1927,4 +1931,16 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
case _ =>
}
}
+
+ def computeFallbackCounts(
+ fallbackCounts: ConcurrentHashMap[String, java.lang.Long],
+ fallbackPolicy: String): Unit = {
+ fallbackCounts.compute(
+ fallbackPolicy,
+ new BiFunction[String, java.lang.Long, java.lang.Long] {
+ override def apply(k: String, v: java.lang.Long): java.lang.Long = {
+ if (v == null) 1L else v + 1L
+ }
+ })
+ }
}
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index c120b9952..4ca924582 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -462,6 +462,8 @@ message PbHeartbeatFromApplication {
bool shouldResponse = 6;
int64 shuffleCount = 7;
map<string, int64> shuffleFallbackCounts = 8;
+ int64 applicationCount = 9;
+ map<string, int64> applicationFallbackCounts = 10;
}
message PbHeartbeatFromApplicationResponse {
@@ -698,6 +700,8 @@ message PbSnapshotMetaInfo {
repeated PbWorkerInfo decommissionWorkers = 17;
int64 shuffleTotalCount = 18;
map<string, int64> shuffleFallbackCounts = 19;
+ int64 applicationTotalCount = 20;
+ map<string, int64> applicationFallbackCounts = 21;
}
message PbOpenStream {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index ccd5d79a2..009c3a4fc 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -422,7 +422,9 @@ object ControlMessages extends Logging {
totalWritten: Long,
fileCount: Long,
shuffleCount: Long,
+ applicationCount: Long,
shuffleFallbackCounts: util.Map[String, java.lang.Long],
+ applicationFallbackCounts: util.Map[String, java.lang.Long],
needCheckedWorkerList: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID,
shouldResponse: Boolean = false) extends MasterRequestMessage
@@ -840,7 +842,9 @@ object ControlMessages extends Logging {
totalWritten,
fileCount,
shuffleCount,
+ applicationCount,
shuffleFallbackCounts,
+ applicationFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse) =>
@@ -850,7 +854,9 @@ object ControlMessages extends Logging {
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.setShuffleCount(shuffleCount)
+ .setApplicationCount(applicationCount)
.putAllShuffleFallbackCounts(shuffleFallbackCounts)
+ .putAllApplicationFallbackCounts(applicationFallbackCounts)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
@@ -1262,7 +1268,9 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
pbHeartbeatFromApplication.getShuffleCount,
+ pbHeartbeatFromApplication.getApplicationCount,
pbHeartbeatFromApplication.getShuffleFallbackCountsMap,
+ pbHeartbeatFromApplication.getApplicationFallbackCountsMap,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 6ba547684..7f51e158f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -428,7 +428,9 @@ object PbSerDeUtils {
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
shuffleTotalCount: java.lang.Long,
+ applicationTotalCount: java.lang.Long,
shuffleFallbackCounts: java.util.Map[String, java.lang.Long],
+ applicationFallbackCounts: java.util.Map[String, java.lang.Long],
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
shutdownWorkers: java.util.Set[WorkerInfo],
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
@@ -449,7 +451,9 @@ object PbSerDeUtils {
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
.setShuffleTotalCount(shuffleTotalCount)
+ .setApplicationTotalCount(applicationTotalCount)
.putAllShuffleFallbackCounts(shuffleFallbackCounts)
+ .putAllApplicationFallbackCounts(applicationFallbackCounts)
.putAllLostWorkers(lostWorkers.asScala.map {
case (worker: WorkerInfo, time: java.lang.Long) => (worker.toUniqueId,
time)
}.asJava)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 95cefcf92..d2f8ced8f 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -92,27 +92,29 @@ These metrics are exposed by Celeborn master.
- namespace=master
- | Metric Name | Description
|
-
|------------------------------|-----------------------------------------------------------------------------------|
- | RegisteredShuffleCount | The count of registered shuffle.
|
- | DeviceCelebornFreeBytes | The actual usable space of Celeborn
available workers for device. |
- | DeviceCelebornTotalBytes | The total space of Celeborn for device.
|
- | RunningApplicationCount | The count of running applications.
|
- | ActiveShuffleSize | The active shuffle size of workers.
|
- | ActiveShuffleFileCount | The active shuffle file count of workers.
|
- | ShuffleTotalCount | The total count of shuffle including
celeborn shuffle and spark built-in shuffle. |
- | ShuffleFallbackCount | The count of shuffle fallbacks.
|
- | WorkerCount | The count of active workers.
|
- | LostWorkerCount | The count of workers in lost list.
|
- | ExcludedWorkerCount | The count of workers in excluded list.
|
- | AvailableWorkerCount | The count of workers in available list.
|
- | ShutdownWorkerCount | The count of workers in shutdown list.
|
- | DecommissionWorkerCount | The count of workers in decommission
list. |
- | IsActiveMaster | Whether the current master is active.
|
- | RatisApplyCompletedIndex | The ApplyCompletedIndex of the current
master node in HA mode. |
- | RatisApplyCompletedIndexDiff | The difference value of
ApplyCompletedIndex of the master nodes in HA mode. |
- | PartitionSize | The size of estimated shuffle partition.
|
- | OfferSlotsTime | The time for masters to handle
`RequestSlots` request when registering shuffle. |
+ | Metric Name | Description
|
+
|------------------------------|-------------------------------------------------------------------------------------------|
+ | RegisteredShuffleCount | The count of registered shuffle.
|
+ | DeviceCelebornFreeBytes | The actual usable space of Celeborn
available workers for device. |
+ | DeviceCelebornTotalBytes | The total space of Celeborn for device.
|
+ | RunningApplicationCount | The count of running applications.
|
+ | ActiveShuffleSize | The active shuffle size of workers.
|
+ | ActiveShuffleFileCount | The active shuffle file count of workers.
|
+ | ShuffleTotalCount | The total count of shuffle including
celeborn shuffle and engine built-in shuffle. |
+ | ShuffleFallbackCount | The count of shuffle fallbacks.
|
+ | ApplicationTotalCount | The total count of application running
with celeborn shuffle and engine built-in shuffle. |
+ | ApplicationFallbackCount | The count of application fallbacks.
|
+ | WorkerCount | The count of active workers.
|
+ | LostWorkerCount | The count of workers in lost list.
|
+ | ExcludedWorkerCount | The count of workers in excluded list.
|
+ | AvailableWorkerCount | The count of workers in available list.
|
+ | ShutdownWorkerCount | The count of workers in shutdown list.
|
+ | DecommissionWorkerCount | The count of workers in decommission
list. |
+ | IsActiveMaster | Whether the current master is active.
|
+ | RatisApplyCompletedIndex | The ApplyCompletedIndex of the current
master node in HA mode. |
+ | RatisApplyCompletedIndexDiff | The difference value of
ApplyCompletedIndex of the master nodes in HA mode. |
+ | PartitionSize | The size of estimated shuffle partition.
|
+ | OfferSlotsTime | The time for masters to handle
`RequestSlots` request when registering shuffle. |
- namespace=CPU
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 2acaea57b..6bee54125 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -92,7 +92,9 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public final LongAdder partitionTotalWritten = new LongAdder();
public final LongAdder partitionTotalFileCount = new LongAdder();
public final LongAdder shuffleTotalCount = new LongAdder();
+ public final LongAdder applicationTotalCount = new LongAdder();
public final Map<String, Long> shuffleFallbackCounts =
JavaUtils.newConcurrentHashMap();
+ public final Map<String, Long> applicationFallbackCounts =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
JavaUtils.newConcurrentHashMap();
@@ -152,12 +154,16 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
long totalWritten,
long fileCount,
long shuffleCount,
- Map<String, Long> shuffleFallbackCounts) {
+ long applicationCount,
+ Map<String, Long> shuffleFallbackCounts,
+ Map<String, Long> applicationFallbackCounts) {
appHeartbeatTime.put(appId, time);
partitionTotalWritten.add(totalWritten);
partitionTotalFileCount.add(fileCount);
shuffleTotalCount.add(shuffleCount);
- addShuffleFallbackCounts(shuffleFallbackCounts);
+ applicationTotalCount.add(applicationCount);
+ addFallbackCounts(this.shuffleFallbackCounts, shuffleFallbackCounts);
+ addFallbackCounts(this.applicationFallbackCounts,
applicationFallbackCounts);
}
public void updateAppLostMeta(String appId) {
@@ -364,7 +370,9 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
shuffleTotalCount.sum(),
+ applicationTotalCount.sum(),
shuffleFallbackCounts,
+ applicationFallbackCounts,
lostWorkers,
shutdownWorkers,
workerEventInfos,
@@ -464,7 +472,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
shuffleTotalCount.add(snapshotMetaInfo.getShuffleTotalCount());
- addShuffleFallbackCounts(snapshotMetaInfo.getShuffleFallbackCountsMap());
+ applicationTotalCount.add(snapshotMetaInfo.getApplicationTotalCount());
+ addFallbackCounts(shuffleFallbackCounts,
snapshotMetaInfo.getShuffleFallbackCountsMap());
+ addFallbackCounts(
+ applicationFallbackCounts,
snapshotMetaInfo.getApplicationFallbackCountsMap());
snapshotMetaInfo
.getApplicationMetasMap()
@@ -505,7 +516,9 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
partitionTotalWritten.reset();
partitionTotalFileCount.reset();
shuffleTotalCount.reset();
+ applicationTotalCount.reset();
shuffleFallbackCounts.clear();
+ applicationFallbackCounts.clear();
workerEventInfos.clear();
applicationMetas.clear();
}
@@ -609,10 +622,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
return
registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
}
- private void addShuffleFallbackCounts(Map<String, Long> fallbackCounts) {
- for (String fallbackPolicy : fallbackCounts.keySet()) {
- shuffleFallbackCounts.compute(
- fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v +
fallbackCounts.get(k));
+ private void addFallbackCounts(Map<String, Long> fallbackCounts, Map<String,
Long> counts) {
+ for (String fallbackPolicy : counts.keySet()) {
+ fallbackCounts.compute(
+ fallbackPolicy, (k, v) -> v == null ? counts.get(k) : v +
counts.get(k));
}
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index c25135634..34fd74e34 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -43,7 +43,9 @@ public interface IMetadataHandler {
long totalWritten,
long fileCount,
long shuffleCount,
+ long applicationCount,
Map<String, Long> shuffleFallbackCounts,
+ Map<String, Long> applicationFallbackCounts,
long time,
String requestId);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 779ef4683..4d4fa3b1a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -75,11 +75,20 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
long totalWritten,
long fileCount,
long shuffleCount,
+ long applicationCount,
Map<String, Long> shuffleFallbackCounts,
+ Map<String, Long> applicationFallbackCounts,
long time,
String requestId) {
updateAppHeartbeatMeta(
- appId, time, totalWritten, fileCount, shuffleCount,
shuffleFallbackCounts);
+ appId,
+ time,
+ totalWritten,
+ fileCount,
+ shuffleCount,
+ applicationCount,
+ shuffleFallbackCounts,
+ applicationFallbackCounts);
}
@Override
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 6d72627f8..e3319ffc1 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -132,7 +132,9 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
long totalWritten,
long fileCount,
long shuffleCount,
+ long applicationCount,
Map<String, Long> shuffleFallbackCounts,
+ Map<String, Long> applicationFallbackCounts,
long time,
String requestId) {
try {
@@ -147,7 +149,9 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.setShuffleCount(shuffleCount)
+ .setApplicationCount(applicationCount)
.putAllShuffleFallbackCounts(shuffleFallbackCounts)
+
.putAllApplicationFallbackCounts(applicationFallbackCounts)
.build())
.build());
} catch (CelebornRuntimeException e) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 44a82d8d3..abe21a4de 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -141,7 +141,12 @@ public class MetaHandler {
long totalWritten =
request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
long shuffleCount =
request.getAppHeartbeatRequest().getShuffleCount();
- LOG.debug("Handle app heartbeat for {} with shuffle count {}",
appId, shuffleCount);
+ long applicationCount =
request.getAppHeartbeatRequest().getApplicationCount();
+ LOG.debug(
+ "Handle app heartbeat for {} with shuffle count {} and
application count {}",
+ appId,
+ shuffleCount,
+ applicationCount);
Map<String, Long> shuffleFallbackCounts =
request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
@@ -150,8 +155,17 @@ public class MetaHandler {
shuffleFallbackCounts.values().stream().mapToLong(v ->
v).sum(),
appId);
}
+ Map<String, Long> applicationFallbackCounts =
+
request.getAppHeartbeatRequest().getApplicationFallbackCountsMap();
metaSystem.updateAppHeartbeatMeta(
- appId, time, totalWritten, fileCount, shuffleCount,
shuffleFallbackCounts);
+ appId,
+ time,
+ totalWritten,
+ fileCount,
+ shuffleCount,
+ applicationCount,
+ shuffleFallbackCounts,
+ applicationFallbackCounts);
break;
case AppLost:
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index 8837ea494..c8e9c2aee 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -132,6 +132,8 @@ message AppHeartbeatRequest {
optional int64 shuffleCount = 5;
map<string, int64> shuffleFallbackCounts = 6;
HeartbeatInfo heartbeatInfo = 7;
+ optional int64 applicationCount = 8;
+ map<string, int64> applicationFallbackCounts = 9;
}
message AppLostRequest {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1400a832e..dfe4e03a1 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -271,10 +271,18 @@ private[celeborn] class Master(
statusSystem.shuffleTotalCount.sum()
}
+ masterSource.addGauge(MasterSource.APPLICATION_TOTAL_COUNT) { () =>
+ statusSystem.applicationTotalCount.sum()
+ }
+
masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
statusSystem.shuffleFallbackCounts.values().asScala.map(_.longValue()).sum
}
+ masterSource.addGauge(MasterSource.APPLICATION_FALLBACK_COUNT) { () =>
+
statusSystem.applicationFallbackCounts.values().asScala.map(_.longValue()).sum
+ }
+
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
}
@@ -414,8 +422,10 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
- shuffleFallbackCount,
+ shuffleCount,
+ applicationCount,
shuffleFallbackCounts,
+ applicationFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse) =>
@@ -428,8 +438,10 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
- shuffleFallbackCount,
+ shuffleCount,
+ applicationCount,
shuffleFallbackCounts,
+ applicationFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse))
@@ -1126,7 +1138,9 @@ private[celeborn] class Master(
totalWritten: Long,
fileCount: Long,
shuffleCount: Long,
+ applicationCount: Long,
shuffleFallbackCounts: util.Map[String, java.lang.Long],
+ applicationFallbackCounts: util.Map[String, java.lang.Long],
needCheckedWorkerList: util.List[WorkerInfo],
requestId: String,
shouldResponse: Boolean): Unit = {
@@ -1135,7 +1149,9 @@ private[celeborn] class Master(
totalWritten,
fileCount,
shuffleCount,
+ applicationCount,
shuffleFallbackCounts,
+ applicationFallbackCounts,
System.currentTimeMillis(),
requestId)
gaugeShuffleFallbackCounts()
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index dcc764ebb..aa28fd900 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -46,10 +46,13 @@ object MasterSource {
val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
- // The total count including RegisteredShuffleCount(celeborn shuffle) and
ShuffleFallbackCount(spark built-in shuffle).
+ // The total count including RegisteredShuffleCount(celeborn shuffle) and
ShuffleFallbackCount(engine built-in shuffle).
val SHUFFLE_TOTAL_COUNT = "ShuffleTotalCount"
val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"
+ val APPLICATION_FALLBACK_COUNT = "ApplicationFallbackCount"
+ // The total count including RunningApplicationCount(celeborn shuffle) and
ApplicationFallbackCount(engine built-in shuffle).
+ val APPLICATION_TOTAL_COUNT = "ApplicationTotalCount"
val IS_ACTIVE_MASTER = "IsActiveMaster"
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 78695460b..4f4361ef0 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -643,11 +643,13 @@ public class DefaultMetaSystemSuiteJ {
@Test
public void testHandleAppHeartbeat() {
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2));
assertEquals(2, statusSystem.appHeartbeatTime.size());
@@ -819,24 +821,28 @@ public class DefaultMetaSystemSuiteJ {
Long dummy = 1235L;
statusSystem.handleAppHeartbeat(
- APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+ APPID1, 10000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(),
dummy, getNewReqeustId());
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
// Max size
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
statusSystem.handleAppHeartbeat(
- APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ APPID1, 1000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1000l, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
// Min size
statusSystem.handleUpdatePartitionSize();
@@ -909,22 +915,47 @@ public class DefaultMetaSystemSuiteJ {
}
@Test
- public void testShuffleCountWithFallback() {
+ public void testShuffleAndApplicationCountWithFallback() {
statusSystem.shuffleTotalCount.reset();
+ statusSystem.applicationTotalCount.reset();
statusSystem.shuffleFallbackCounts.clear();
+ statusSystem.applicationFallbackCounts.clear();
Long dummy = 1235L;
Map<String, Long> shuffleFallbackCounts = new HashMap<>();
- shuffleFallbackCounts.put(POLICY1, 1L);
+ Map<String, Long> applicationFallbackCounts = new HashMap<>();
+ shuffleFallbackCounts.put(POLICY1, 2L);
+ applicationFallbackCounts.put(POLICY1, 1L);
statusSystem.handleAppHeartbeat(
- APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+ APPID1,
+ 1000l,
+ 1,
+ 2,
+ 1,
+ shuffleFallbackCounts,
+ applicationFallbackCounts,
+ dummy,
+ getNewReqeustId());
shuffleFallbackCounts.put(POLICY1, 1L);
- shuffleFallbackCounts.put(POLICY2, 1L);
+ shuffleFallbackCounts.put(POLICY2, 2L);
+ applicationFallbackCounts.put(POLICY1, 1L);
+ applicationFallbackCounts.put(POLICY2, 1L);
statusSystem.handleAppHeartbeat(
- APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
-
- assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
- assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
2);
- assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
1);
+ APPID1,
+ 1000l,
+ 1,
+ 3,
+ 2,
+ shuffleFallbackCounts,
+ applicationFallbackCounts,
+ dummy,
+ getNewReqeustId());
+
+ assertEquals(statusSystem.shuffleTotalCount.longValue(), 5);
+ assertEquals(statusSystem.applicationTotalCount.longValue(), 3);
+ assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
3);
+ assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
2);
+
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(),
2);
+
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(),
1);
}
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 0f7f752cb..be644869f 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -17,6 +17,7 @@
package org.apache.celeborn.service.deploy.master.clustermeta.ha;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@@ -964,14 +965,16 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertNotNull(statusSystem);
long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
@@ -1382,24 +1385,28 @@ public class RatisMasterStatusSystemSuiteJ {
Long dummy = 1235L;
statusSystem.handleAppHeartbeat(
- APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+ APPID1, 10000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(),
dummy, getNewReqeustId());
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
// Max size
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
statusSystem.handleAppHeartbeat(
- APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
+ APPID1, 1000000000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ appId2, 1000l, 1, 2, 2, new HashMap<>(), new HashMap<>(), dummy,
getNewReqeustId());
// Min size
statusSystem.handleUpdatePartitionSize();
@@ -1582,24 +1589,49 @@ public class RatisMasterStatusSystemSuiteJ {
}
@Test
- public void testShuffleCountWithFallback() {
+ public void testShuffleAndApplicationCountWithFallback() {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
statusSystem.shuffleTotalCount.reset();
+ statusSystem.applicationTotalCount.reset();
statusSystem.shuffleFallbackCounts.clear();
+ statusSystem.applicationFallbackCounts.clear();
Long dummy = 1235L;
Map<String, Long> shuffleFallbackCounts = new HashMap<>();
- shuffleFallbackCounts.put(POLICY1, 1L);
+ Map<String, Long> applicationFallbackCounts = new HashMap<>();
+ shuffleFallbackCounts.put(POLICY1, 2L);
+ applicationFallbackCounts.put(POLICY1, 1L);
statusSystem.handleAppHeartbeat(
- APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+ APPID1,
+ 1000l,
+ 1,
+ 2,
+ 1,
+ shuffleFallbackCounts,
+ applicationFallbackCounts,
+ dummy,
+ getNewReqeustId());
shuffleFallbackCounts.put(POLICY1, 1L);
- shuffleFallbackCounts.put(POLICY2, 1L);
+ shuffleFallbackCounts.put(POLICY2, 2L);
+ applicationFallbackCounts.put(POLICY1, 1L);
+ applicationFallbackCounts.put(POLICY2, 1L);
statusSystem.handleAppHeartbeat(
- APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
+ APPID1,
+ 1000l,
+ 1,
+ 3,
+ 2,
+ shuffleFallbackCounts,
+ applicationFallbackCounts,
+ dummy,
+ getNewReqeustId());
- Assert.assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
-
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
2);
-
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
1);
+ assertEquals(statusSystem.shuffleTotalCount.longValue(), 5);
+ assertEquals(statusSystem.applicationTotalCount.longValue(), 3);
+ assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
3);
+ assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
2);
+
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(),
2);
+
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(),
1);
}
}