This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 169b6f697 [CELEBORN-1685] ShuffleFallbackPolicy supports
ShuffleFallbackCount metric
169b6f697 is described below
commit 169b6f6973b2ee5093d91df0d2b573977efdc7ae
Author: SteNicholas <[email protected]>
AuthorDate: Mon Nov 11 10:37:25 2024 +0800
[CELEBORN-1685] ShuffleFallbackPolicy supports ShuffleFallbackCount metric
### What changes were proposed in this pull request?
1. `ShuffleFallbackPolicy` supports `ShuffleFallbackCount` metric to
provide the shuffle fallback count of each fallback policy.
2. Introduce `ShuffleTotalCount` metric to record the total count of
shuffle.
3. Fix Spark 2 does not increment shuffle count via `LifecycleManager`.
### Why are the changes needed?
The implementations of `ShuffleFallbackPolicy` does not support
`ShuffleFallbackCount` metric at present. Meanwhile, Bilibili production
practice needs `ShuffleFallbackCount` of different `ShuffleFallbackPolicy`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test.
Closes #2891 from SteNicholas/CELEBORN-1685.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 90 +++++++++++++++++++++-
.../shuffle/celeborn/SparkShuffleManager.java | 1 +
.../CelebornShuffleFallbackPolicyRunner.scala | 28 +++++--
.../shuffle/celeborn/SparkShuffleManager.java | 2 +-
.../CelebornShuffleFallbackPolicyRunner.scala | 24 ++++--
.../celeborn/client/ApplicationHeartbeater.scala | 11 ++-
.../apache/celeborn/client/LifecycleManager.scala | 15 +++-
common/src/main/proto/TransportMessages.proto | 6 +-
.../common/protocol/message/ControlMessages.scala | 12 ++-
.../apache/celeborn/common/util/PbSerDeUtils.scala | 6 +-
docs/monitoring.md | 37 ++++-----
.../master/clustermeta/AbstractMetaManager.java | 29 +++++--
.../master/clustermeta/IMetadataHandler.java | 3 +-
.../clustermeta/SingleMasterMetaManager.java | 6 +-
.../master/clustermeta/ha/HAMasterMetaManager.java | 6 +-
.../deploy/master/clustermeta/ha/MetaHandler.java | 16 ++--
master/src/main/proto/Resource.proto | 3 +-
.../celeborn/service/deploy/master/Master.scala | 29 +++++--
.../service/deploy/master/MasterSource.scala | 5 +-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 41 ++++++----
.../ha/RatisMasterStatusSystemSuiteJ.java | 41 ++++++----
21 files changed, 313 insertions(+), 98 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 15d81a725..6007e7297 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1472,7 +1472,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "description": "The count of shuffle fallbacks.",
+ "description": "The total count of shuffle including celeborn
shuffle and spark built-in shuffle.",
"fieldConfig": {
"defaults": {
"color": {
@@ -1544,6 +1544,94 @@
"sort": "none"
}
},
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr":
"metrics_ShuffleTotalCount_Value{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ShuffleTotalCount_Value",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "The count of shuffle fallbacks.",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 34
+ },
+ "id": 219,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
"targets": [
{
"datasource": {
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 8c25dc5c2..d450071f4 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -118,6 +118,7 @@ public class SparkShuffleManager implements ShuffleManager {
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);
+ lifecycleManager.shuffleCount().increment();
if (fallbackPolicyRunner.applyFallbackPolicies(dependency,
lifecycleManager)) {
logger.warn("Fallback to SortShuffleManager!");
sortShuffleIds.add(shuffleId);
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index c1e93ec76..3ba52ef4d 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -17,6 +17,8 @@
package org.apache.spark.shuffle.celeborn
+import java.util.function.BiFunction
+
import scala.collection.JavaConverters._
import org.apache.spark.ShuffleDependency
@@ -35,12 +37,26 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
def applyFallbackPolicies[K, V, C](
dependency: ShuffleDependency[K, V, C],
lifecycleManager: LifecycleManager): Boolean = {
- val needFallback =
- shuffleFallbackPolicies.exists(_.needFallback(dependency, conf,
lifecycleManager))
- if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
- throw new CelebornIOException(
- "Fallback to spark built-in shuffle implementation is prohibited.")
+ val fallbackPolicy =
+ shuffleFallbackPolicies.find(_.needFallback(dependency, conf,
lifecycleManager))
+ if (fallbackPolicy.isDefined) {
+ if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
+ throw new CelebornIOException(
+ "Fallback to spark built-in shuffle implementation is prohibited.")
+ } else {
+ lifecycleManager.shuffleFallbackCounts.compute(
+ fallbackPolicy.getClass.getName,
+ new BiFunction[String, java.lang.Long, java.lang.Long] {
+ override def apply(k: String, v: java.lang.Long): java.lang.Long =
{
+ if (v == null) {
+ 1L
+ } else {
+ v + 1L
+ }
+ }
+ })
+ }
}
- needFallback
+ fallbackPolicy.isDefined
}
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index cbebde096..8541ad223 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -161,8 +161,8 @@ public class SparkShuffleManager implements ShuffleManager {
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);
+ lifecycleManager.shuffleCount().increment();
if (fallbackPolicyRunner.applyFallbackPolicies(dependency,
lifecycleManager)) {
- lifecycleManager.shuffleFallbackCount().increment();
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
logger.error(
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
index c1e93ec76..8a2b7c889 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala
@@ -35,12 +35,24 @@ class CelebornShuffleFallbackPolicyRunner(conf:
CelebornConf) extends Logging {
def applyFallbackPolicies[K, V, C](
dependency: ShuffleDependency[K, V, C],
lifecycleManager: LifecycleManager): Boolean = {
- val needFallback =
- shuffleFallbackPolicies.exists(_.needFallback(dependency, conf,
lifecycleManager))
- if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
- throw new CelebornIOException(
- "Fallback to spark built-in shuffle implementation is prohibited.")
+ val fallbackPolicy =
+ shuffleFallbackPolicies.find(_.needFallback(dependency, conf,
lifecycleManager))
+ if (fallbackPolicy.isDefined) {
+ if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
+ throw new CelebornIOException(
+ "Fallback to spark built-in shuffle implementation is prohibited.")
+ } else {
+ lifecycleManager.shuffleFallbackCounts.compute(
+ fallbackPolicy.getClass.getName,
+ (_, v) => {
+ if (v == null) {
+ 1L
+ } else {
+ v + 1L
+ }
+ })
+ }
}
- needFallback
+ fallbackPolicy.isDefined
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index ee337deb6..3d8d23019 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -37,7 +37,7 @@ class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
masterClient: MasterClient,
- shuffleMetrics: () => ((Long, Long), Long),
+ shuffleMetrics: () => ((Long, Long), (Long, Map[String, java.lang.Long])),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
@@ -59,10 +59,12 @@ class ApplicationHeartbeater(
override def run(): Unit = {
try {
require(masterClient != null, "When sending a heartbeat, client
shouldn't be null.")
- val ((tmpTotalWritten, tmpTotalFileCount),
tmpShuffleFallbackCount) = shuffleMetrics()
+ val (
+ (tmpTotalWritten, tmpTotalFileCount),
+ (tmpShuffleCount, tmpShuffleFallbackCounts)) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count:
$tmpTotalFileCount, " +
- s"shuffle fallback count: $tmpShuffleFallbackCount")
+ s"shuffle count: $tmpShuffleCount, shuffle fallback counts:
$tmpShuffleFallbackCounts")
// UserResourceConsumption and DiskInfo are eliminated from
WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
@@ -70,7 +72,8 @@ class ApplicationHeartbeater(
appId,
tmpTotalWritten,
tmpTotalFileCount,
- tmpShuffleFallbackCount,
+ tmpShuffleCount,
+ tmpShuffleFallbackCounts.asJava,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bf3ccf63d..47c27bced 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -84,7 +84,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int,
Long]()
val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
- val shuffleFallbackCount = new LongAdder()
+ val shuffleCount = new LongAdder()
+ val shuffleFallbackCounts = JavaUtils.newConcurrentHashMap[String,
java.lang.Long]()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
// shuffle id -> (partitionId -> newest PartitionLocation)
@@ -210,10 +211,20 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
appUniqueId,
conf,
masterClient,
- () => commitManager.commitMetrics() ->
shuffleFallbackCount.sumThenReset(),
+ () => {
+ commitManager.commitMetrics() ->
+ (shuffleCount.sumThenReset(), resetShuffleFallbackCounts())
+ },
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
+ private def resetShuffleFallbackCounts(): Map[String, java.lang.Long] = {
+ val fallbackCounts = new util.HashMap[String, java.lang.Long]()
+ shuffleFallbackCounts.keys().asScala.foreach { key =>
+ Option(shuffleFallbackCounts.remove(key)).filter(_ >
0).foreach(fallbackCounts.put(key, _))
+ }
+ fallbackCounts.asScala.toMap
+ }
private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 1228c1732..be09e83c7 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -442,7 +442,8 @@ message PbHeartbeatFromApplication {
string requestId = 4;
repeated PbWorkerInfo needCheckedWorkerList = 5;
bool shouldResponse = 6;
- int64 shuffleFallbackCount = 7;
+ int64 shuffleCount = 7;
+ map<string, int64> shuffleFallbackCounts = 8;
}
message PbHeartbeatFromApplicationResponse {
@@ -675,7 +676,8 @@ message PbSnapshotMetaInfo {
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
repeated PbWorkerInfo decommissionWorkers = 17;
- int64 shuffleTotalFallbackCount = 18;
+ int64 shuffleTotalCount = 18;
+ map<string, int64> shuffleFallbackCounts = 19;
}
message PbOpenStream {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 24d3fe601..3ec9c699b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -412,7 +412,8 @@ object ControlMessages extends Logging {
appId: String,
totalWritten: Long,
fileCount: Long,
- shuffleFallbackCount: Long,
+ shuffleCount: Long,
+ shuffleFallbackCounts: util.Map[String, java.lang.Long],
needCheckedWorkerList: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID,
shouldResponse: Boolean = false) extends MasterRequestMessage
@@ -810,7 +811,8 @@ object ControlMessages extends Logging {
appId,
totalWritten,
fileCount,
- shuffleFallbackCount,
+ shuffleCount,
+ shuffleFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse) =>
@@ -819,7 +821,8 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
- .setShuffleFallbackCount(shuffleFallbackCount)
+ .setShuffleCount(shuffleCount)
+ .putAllShuffleFallbackCounts(shuffleFallbackCounts)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
@@ -1212,7 +1215,8 @@ object ControlMessages extends Logging {
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
- pbHeartbeatFromApplication.getShuffleFallbackCount,
+ pbHeartbeatFromApplication.getShuffleCount,
+ pbHeartbeatFromApplication.getShuffleFallbackCountsMap,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index d2f07191e..0c638f241 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -466,7 +466,8 @@ object PbSerDeUtils {
workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
- shuffleTotalFallbackCount: java.lang.Long,
+ shuffleTotalCount: java.lang.Long,
+ shuffleFallbackCounts: java.util.Map[String, java.lang.Long],
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
@@ -488,7 +489,8 @@ object PbSerDeUtils {
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true,
false)).asJava)
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
- .setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
+ .setShuffleTotalCount(shuffleTotalCount)
+ .putAllShuffleFallbackCounts(shuffleFallbackCounts)
// appDiskUsageMetricSnapshots can have null values,
// protobuf repeated value can't support null value in list.
.addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_
!= null)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index c07c16d1e..93e997e8e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -92,24 +92,25 @@ These metrics are exposed by Celeborn master.
- namespace=master
- | Metric Name | Description
|
-
|--------------------------|---------------------------------------------------------------------------------|
- | RegisteredShuffleCount | The count of registered shuffle.
|
- | DeviceCelebornFreeBytes | The actual usable space of Celeborn for
device. |
- | DeviceCelebornTotalBytes | The total space of Celeborn for device.
|
- | RunningApplicationCount | The count of running applications.
|
- | ActiveShuffleSize | The active shuffle size of workers.
|
- | ActiveShuffleFileCount | The active shuffle file count of workers.
|
- | ShuffleFallbackCount | The count of shuffle fallbacks.
|
- | WorkerCount | The count of active workers.
|
- | LostWorkerCount | The count of workers in lost list.
|
- | ExcludedWorkerCount | The count of workers in excluded list.
|
- | AvailableWorkerCount | The count of workers in available list.
|
- | ShutdownWorkerCount | The count of workers in shutdown list.
|
- | DecommissionWorkerCount | The count of workers in decommission list.
|
- | IsActiveMaster | Whether the current master is active.
|
- | PartitionSize | The size of estimated shuffle partition.
|
- | OfferSlotsTime | The time for masters to handle `RequestSlots`
request when registering shuffle. |
+ | Metric Name | Description
|
+
|--------------------------|-----------------------------------------------------------------------------------|
+ | RegisteredShuffleCount | The count of registered shuffle.
|
+ | DeviceCelebornFreeBytes | The actual usable space of Celeborn for
device. |
+ | DeviceCelebornTotalBytes | The total space of Celeborn for device.
|
+ | RunningApplicationCount | The count of running applications.
|
+ | ActiveShuffleSize | The active shuffle size of workers.
|
+ | ActiveShuffleFileCount | The active shuffle file count of workers.
|
+ | ShuffleTotalCount | The total count of shuffle including celeborn
shuffle and spark built-in shuffle. |
+ | ShuffleFallbackCount | The count of shuffle fallbacks.
|
+ | WorkerCount | The count of active workers.
|
+ | LostWorkerCount | The count of workers in lost list.
|
+ | ExcludedWorkerCount | The count of workers in excluded list.
|
+ | AvailableWorkerCount | The count of workers in available list.
|
+ | ShutdownWorkerCount | The count of workers in shutdown list.
|
+ | DecommissionWorkerCount | The count of workers in decommission list.
|
+ | IsActiveMaster | Whether the current master is active.
|
+ | PartitionSize | The size of estimated shuffle partition.
|
+ | OfferSlotsTime | The time for masters to handle `RequestSlots`
request when registering shuffle. |
- namespace=CPU
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 80988cfff..cede0284a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -85,7 +85,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public double unhealthyDiskRatioThreshold;
public final LongAdder partitionTotalWritten = new LongAdder();
public final LongAdder partitionTotalFileCount = new LongAdder();
- public final LongAdder shuffleTotalFallbackCount = new LongAdder();
+ public final LongAdder shuffleTotalCount = new LongAdder();
+ public final Map<String, Long> shuffleFallbackCounts =
JavaUtils.newConcurrentHashMap();
public AppDiskUsageMetric appDiskUsageMetric = null;
public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
@@ -141,11 +142,17 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
public void updateAppHeartbeatMeta(
- String appId, long time, long totalWritten, long fileCount, long
shuffleFallbackCount) {
+ String appId,
+ long time,
+ long totalWritten,
+ long fileCount,
+ long shuffleCount,
+ Map<String, Long> shuffleFallbackCounts) {
appHeartbeatTime.put(appId, time);
partitionTotalWritten.add(totalWritten);
partitionTotalFileCount.add(fileCount);
- shuffleTotalFallbackCount.add(shuffleFallbackCount);
+ shuffleTotalCount.add(shuffleCount);
+ addShuffleFallbackCounts(shuffleFallbackCounts);
}
public void updateAppLostMeta(String appId) {
@@ -319,7 +326,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
new HashSet(workersMap.values()),
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
- shuffleTotalFallbackCount.sum(),
+ shuffleTotalCount.sum(),
+ shuffleFallbackCounts,
appDiskUsageMetric.snapShots(),
appDiskUsageMetric.currentSnapShot().get(),
lostWorkers,
@@ -420,7 +428,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
-
shuffleTotalFallbackCount.add(snapshotMetaInfo.getShuffleTotalFallbackCount());
+ shuffleTotalCount.add(snapshotMetaInfo.getShuffleTotalCount());
+ addShuffleFallbackCounts(snapshotMetaInfo.getShuffleFallbackCountsMap());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
.map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
@@ -462,7 +471,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
workerLostEvents.clear();
partitionTotalWritten.reset();
partitionTotalFileCount.reset();
- shuffleTotalFallbackCount.reset();
+ shuffleTotalCount.reset();
+ shuffleFallbackCounts.clear();
workerEventInfos.clear();
applicationMetas.clear();
}
@@ -550,4 +560,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public int registeredShuffleCount() {
return
registeredAppAndShuffles.values().stream().mapToInt(Set::size).sum();
}
+
+ private void addShuffleFallbackCounts(Map<String, Long> fallbackCounts) {
+ for (String fallbackPolicy : fallbackCounts.keySet()) {
+ shuffleFallbackCounts.compute(
+ fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v +
fallbackCounts.get(k));
+ }
+ }
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index e765a9d4d..5ad1907c1 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -42,7 +42,8 @@ public interface IMetadataHandler {
String appId,
long totalWritten,
long fileCount,
- long shuffleFallbackCount,
+ long shuffleCount,
+ Map<String, Long> shuffleFallbackCounts,
long time,
String requestId);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 04c74e51e..033be796d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -76,10 +76,12 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
String appId,
long totalWritten,
long fileCount,
- long shuffleFallbackCount,
+ long shuffleCount,
+ Map<String, Long> shuffleFallbackCounts,
long time,
String requestId) {
- updateAppHeartbeatMeta(appId, time, totalWritten, fileCount,
shuffleFallbackCount);
+ updateAppHeartbeatMeta(
+ appId, time, totalWritten, fileCount, shuffleCount,
shuffleFallbackCounts);
}
@Override
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index a87470d9d..0c48419d6 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -133,7 +133,8 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
String appId,
long totalWritten,
long fileCount,
- long shuffleFallbackCount,
+ long shuffleCount,
+ Map<String, Long> shuffleFallbackCounts,
long time,
String requestId) {
try {
@@ -147,7 +148,8 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
.setTime(time)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
- .setShuffleFallbackCount(shuffleFallbackCount)
+ .setShuffleCount(shuffleCount)
+ .putAllShuffleFallbackCounts(shuffleFallbackCounts)
.build())
.build());
} catch (CelebornRuntimeException e) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 626378f0e..5ce91baed 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -33,6 +33,7 @@ import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
import org.apache.celeborn.common.quota.ResourceConsumption;
+import org.apache.celeborn.common.util.CollectionUtils;
import org.apache.celeborn.service.deploy.master.clustermeta.MetaUtil;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -137,16 +138,21 @@ public class MetaHandler {
case AppHeartbeat:
appId = request.getAppHeartbeatRequest().getAppId();
- LOG.debug("Handle app heartbeat for {}", appId);
long time = request.getAppHeartbeatRequest().getTime();
long totalWritten =
request.getAppHeartbeatRequest().getTotalWritten();
long fileCount = request.getAppHeartbeatRequest().getFileCount();
- long shuffleFallbackCount =
request.getAppHeartbeatRequest().getShuffleFallbackCount();
- if (shuffleFallbackCount > 0) {
- LOG.warn("{} shuffle fallbacks in app {}", shuffleFallbackCount,
appId);
+ long shuffleCount =
request.getAppHeartbeatRequest().getShuffleCount();
+ LOG.debug("Handle app heartbeat for {} with shuffle count {}",
appId, shuffleCount);
+ Map<String, Long> shuffleFallbackCounts =
+ request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
+ if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
+ LOG.warn(
+ "{} shuffle fallbacks in app {}",
+ shuffleFallbackCounts.values().stream().mapToLong(v ->
v).sum(),
+ appId);
}
metaSystem.updateAppHeartbeatMeta(
- appId, time, totalWritten, fileCount, shuffleFallbackCount);
+ appId, time, totalWritten, fileCount, shuffleCount,
shuffleFallbackCounts);
break;
case AppLost:
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index 1e65943a9..52b418912 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -120,7 +120,8 @@ message AppHeartbeatRequest {
required int64 time = 2;
required int64 totalWritten = 3;
required int64 fileCount = 4;
- optional int64 shuffleFallbackCount = 5;
+ optional int64 shuffleCount = 5;
+ map<string, int64> shuffleFallbackCounts = 6;
}
message AppLostRequest {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 519d36019..0a95609e1 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -261,8 +261,12 @@ private[celeborn] class Master(
}).sum()
}
+ masterSource.addGauge(MasterSource.SHUFFLE_TOTAL_COUNT) { () =>
+ statusSystem.shuffleTotalCount.sum()
+ }
+
masterSource.addGauge(MasterSource.SHUFFLE_FALLBACK_COUNT) { () =>
- statusSystem.shuffleTotalFallbackCount.sum()
+ statusSystem.shuffleFallbackCounts.values().asScala.map(_.longValue()).sum
}
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
@@ -395,7 +399,8 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
- fallbackShuffles,
+ shuffleFallbackCount,
+ shuffleFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse) =>
@@ -408,7 +413,8 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
- fallbackShuffles,
+ shuffleFallbackCount,
+ shuffleFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse))
@@ -1095,12 +1101,23 @@ private[celeborn] class Master(
}
}
+ private def gaugeShuffleFallbackCounts(): Unit = {
+ statusSystem.shuffleFallbackCounts.keySet().asScala.foreach {
fallbackPolicy =>
+ masterSource.addGauge(
+ MasterSource.SHUFFLE_FALLBACK_COUNT,
+ Map("fallbackPolicy" -> fallbackPolicy)) { () =>
+
Option(statusSystem.shuffleFallbackCounts.get(fallbackPolicy)).getOrElse(0L)
+ }
+ }
+ }
+
private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,
totalWritten: Long,
fileCount: Long,
- shuffleFallbackCount: Long,
+ shuffleCount: Long,
+ shuffleFallbackCounts: util.Map[String, java.lang.Long],
needCheckedWorkerList: util.List[WorkerInfo],
requestId: String,
shouldResponse: Boolean): Unit = {
@@ -1108,9 +1125,11 @@ private[celeborn] class Master(
appId,
totalWritten,
fileCount,
- shuffleFallbackCount,
+ shuffleCount,
+ shuffleFallbackCounts,
System.currentTimeMillis(),
requestId)
+ gaugeShuffleFallbackCounts()
val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
statusSystem.workersMap.containsKey(w.toUniqueId())).asJava
if (shouldResponse) {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 13f35cc84..80bd1e5fb 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -44,6 +44,9 @@ object MasterSource {
val DECOMMISSION_WORKER_COUNT = "DecommissionWorkerCount"
val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
+ val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
+ // The total count including RegisteredShuffleCount(celeborn shuffle) and
ShuffleFallbackCount(spark built-in shuffle).
+ val SHUFFLE_TOTAL_COUNT = "ShuffleTotalCount"
val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"
@@ -55,8 +58,6 @@ object MasterSource {
val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
- val SHUFFLE_FALLBACK_COUNT = "ShuffleFallbackCount"
-
val OFFER_SLOTS_TIME = "OfferSlotsTime"
// Capacity
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 810f12d19..095bb9496 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -83,6 +83,8 @@ public class DefaultMetaSystemSuiteJ {
private static final int REPLICATEPORT3 = 3114;
private static final int INTERNALPORT3 = 3115;
private static final String NETWORK_LOCATION3 = "networkLocation3";
+ private static final String POLICY1 = "ShufflePartitionsFallbackPolicy";
+ private static final String POLICY2 = "QuotaFallbackPolicy";
private static final Map<String, DiskInfo> disks3 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
new HashMap<>();
@@ -637,11 +639,11 @@ public class DefaultMetaSystemSuiteJ {
@Test
public void testHandleAppHeartbeat() {
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
assertEquals(dummy, statusSystem.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
assertEquals(dummy, statusSystem.appHeartbeatTime.get(appId2));
assertEquals(2, statusSystem.appHeartbeatTime.size());
@@ -811,23 +813,25 @@ public class DefaultMetaSystemSuiteJ {
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
// Max size
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
- statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
// Min size
statusSystem.handleUpdatePartitionSize();
@@ -899,13 +903,22 @@ public class DefaultMetaSystemSuiteJ {
}
@Test
- public void testShuffleFallbackCount() {
- statusSystem.shuffleTotalFallbackCount.reset();
+ public void testShuffleCountWithFallback() {
+ statusSystem.shuffleTotalCount.reset();
+ statusSystem.shuffleFallbackCounts.clear();
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy,
getNewReqeustId());
-
- assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+ Map<String, Long> shuffleFallbackCounts = new HashMap<>();
+ shuffleFallbackCounts.put(POLICY1, 1L);
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+ shuffleFallbackCounts.put(POLICY1, 1L);
+ shuffleFallbackCounts.put(POLICY2, 1L);
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
+
+ assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
+ assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
2);
+ assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
1);
}
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index ae70ed54b..b04a419f4 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -255,6 +255,8 @@ public class RatisMasterStatusSystemSuiteJ {
private static final String APPID1 = "appId1";
private static final int SHUFFLEID1 = 1;
private static final String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
+ private static final String POLICY1 = "ShufflePartitionsFallbackPolicy";
+ private static final String POLICY2 = "QuotaFallbackPolicy";
private static final WorkerStatus workerStatus =
WorkerStatus.normalWorkerStatus();
private String getNewReqeustId() {
@@ -946,14 +948,14 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertNotNull(statusSystem);
long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
@@ -1315,23 +1317,25 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 10000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
String appId2 = "app02";
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
// Max size
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
- statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, 0, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000000000l, 1, 0, new HashMap<>(), dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, 0, new HashMap<>(), dummy,
getNewReqeustId());
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, 0, new HashMap<>(),
dummy, getNewReqeustId());
// Min size
statusSystem.handleUpdatePartitionSize();
@@ -1503,15 +1507,24 @@ public class RatisMasterStatusSystemSuiteJ {
}
@Test
- public void testShuffleFallbackCount() {
+ public void testShuffleCountWithFallback() {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
- statusSystem.shuffleTotalFallbackCount.reset();
+ statusSystem.shuffleTotalCount.reset();
+ statusSystem.shuffleFallbackCounts.clear();
Long dummy = 1235L;
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 1, dummy,
getNewReqeustId());
- statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, 2, dummy,
getNewReqeustId());
-
- Assert.assertEquals(statusSystem.shuffleTotalFallbackCount.longValue(), 3);
+ Map<String, Long> shuffleFallbackCounts = new HashMap<>();
+ shuffleFallbackCounts.put(POLICY1, 1L);
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000l, 1, 1, shuffleFallbackCounts, dummy, getNewReqeustId());
+ shuffleFallbackCounts.put(POLICY1, 1L);
+ shuffleFallbackCounts.put(POLICY2, 1L);
+ statusSystem.handleAppHeartbeat(
+ APPID1, 1000l, 1, 2, shuffleFallbackCounts, dummy, getNewReqeustId());
+
+ Assert.assertEquals(statusSystem.shuffleTotalCount.longValue(), 3);
+
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(),
2);
+
Assert.assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(),
1);
}
}