This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 698bee8 Add TF Available Threads Metrics (#1834)
698bee8 is described below
commit 698bee866d99cbb141bbbdfc581e2a98e2128ddf
Author: Neal Sun <[email protected]>
AuthorDate: Mon Aug 16 11:22:49 2021 -0700
Add TF Available Threads Metrics (#1834)
Add metrics about Task Framework available threads in the cluster per job
type.
---
.../stages/task/TaskSchedulingStage.java | 10 +++-
.../monitoring/mbeans/ClusterStatusMonitor.java | 12 +++++
.../apache/helix/monitoring/mbeans/JobMonitor.java | 11 ++++
.../helix/task/AssignableInstanceManager.java | 8 +++
.../mbeans/TestClusterStatusMonitor.java | 63 ++++++++++++++++++++++
5 files changed, 102 insertions(+), 2 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 9746bf9..dbedf7b 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -81,13 +81,19 @@ public class TaskSchedulingStage extends AbstractBaseStage {
// Reset current INIT/RUNNING tasks on participants for throttling
cache.resetActiveTaskCount(currentStateOutput);
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
buildQuotaBasedWorkflowPQsAndInitDispatchers(cache,
- (HelixManager) event.getAttribute(AttributeName.helixmanager.name()),
- (ClusterStatusMonitor)
event.getAttribute(AttributeName.clusterStatusMonitor.name()));
+ (HelixManager) event.getAttribute(AttributeName.helixmanager.name()),
clusterStatusMonitor);
final BestPossibleStateOutput bestPossibleStateOutput =
compute(event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+
+ if (clusterStatusMonitor != null) {
+
clusterStatusMonitor.updateAvailableThreadsPerJob(cache.getAssignableInstanceManager()
+ .getGlobalCapacityMap());
+ }
}
private BestPossibleStateOutput compute(ClusterEvent event, Map<String,
Resource> resourceMap,
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index a9c1811..621710e 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -757,6 +757,18 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
}
/**
+ * For each JobType, report their total available threads across all
instances to corresponding
+ * jobMonitors
+ * @param threadCapacityMap
+ */
+ public void updateAvailableThreadsPerJob(Map<String, Integer>
threadCapacityMap) {
+ for (String jobType : threadCapacityMap.keySet()) {
+ JobMonitor jobMonitor = getJobMonitor(jobType);
+ jobMonitor.updateAvailableThreadGauge((long)
threadCapacityMap.get(jobType));
+ }
+ }
+
+ /**
* TODO: Separate Workflow/Job Monitors from ClusterStatusMonitor because
ClusterStatusMonitor is
* getting too big.
* Returns the appropriate JobMonitor for the given type. If it does not
exist, create one and
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 241199e..2ab407a 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -57,6 +57,7 @@ public class JobMonitor extends DynamicMBeanProvider {
private SimpleDynamicMetric<Long> _existingJobGauge;
private SimpleDynamicMetric<Long> _queuedJobGauge;
private SimpleDynamicMetric<Long> _runningJobGauge;
+ private SimpleDynamicMetric<Long> _availableThreadGauge;
@Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
private SimpleDynamicMetric<Long> _maximumJobLatencyGauge;
@Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
@@ -81,6 +82,7 @@ public class JobMonitor extends DynamicMBeanProvider {
_existingJobGauge = new SimpleDynamicMetric("ExistingJobGauge", 0L);
_queuedJobGauge = new SimpleDynamicMetric("QueuedJobGauge", 0L);
_runningJobGauge = new SimpleDynamicMetric("RunningJobGauge", 0L);
+ _availableThreadGauge = new SimpleDynamicMetric("AvailableThreadGauge",
0L);
_maximumJobLatencyGauge = new
SimpleDynamicMetric("MaximumJobLatencyGauge", 0L);
_jobLatencyCount = new SimpleDynamicMetric("JobLatencyCount", 0L);
@@ -159,6 +161,14 @@ public class JobMonitor extends DynamicMBeanProvider {
}
/**
+ * Update the available thread count to the AvailableThreadGauge
+ * @param availableThreads
+ */
+ public void updateAvailableThreadGauge(long availableThreads) {
+ _availableThreadGauge.updateValue(availableThreads);
+ }
+
+ /**
* Update SubmissionToProcessDelay to its corresponding
HistogramDynamicMetric.
* @param delay
*/
@@ -196,6 +206,7 @@ public class JobMonitor extends DynamicMBeanProvider {
attributeList.add(_existingJobGauge);
attributeList.add(_queuedJobGauge);
attributeList.add(_runningJobGauge);
+ attributeList.add(_availableThreadGauge);
attributeList.add(_maximumJobLatencyGauge);
attributeList.add(_jobLatencyCount);
attributeList.add(_jobLatencyGauge);
diff --git
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 88a12e9..d8b4820 100644
---
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -461,6 +461,14 @@ public class AssignableInstanceManager {
}
/**
+ * Returns a mapping of: jobType -> available threads in all instances for
this jobType
+ * @return globalThreadBasedQuotaMap
+ */
+ public Map<String, Integer> getGlobalCapacityMap() {
+ return Collections.unmodifiableMap(_globalThreadBasedQuotaMap);
+ }
+
+ /**
* Check remained global quota of certain quota type for skipping redundant
computation
* @param quotaType
* @return
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 9958b68..b083786 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -44,6 +44,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.helix.TestHelper;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.AssignableInstanceManager;
+import org.apache.helix.task.assigner.TaskAssignResult;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -56,10 +61,13 @@ import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.tools.StateModelConfigGenerator;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Sets;
+import static org.mockito.Mockito.when;
+
public class TestClusterStatusMonitor {
private static final MBeanServerConnection _server =
ManagementFactory.getPlatformMBeanServer();
@@ -494,6 +502,61 @@ public class TestClusterStatusMonitor {
}
}
+ @Test
+ public void testRecordAvailableThreadsPerType() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ monitor.active();
+ ObjectName clusterMonitorObjName =
monitor.getObjectName(monitor.clusterBeanName());
+ Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ LiveInstance liveInstance = new LiveInstance(instanceName);
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ liveInstanceMap.put(instanceName, liveInstance);
+ instanceConfigMap.put(instanceName, instanceConfig);
+ }
+
+ ClusterConfig clusterConfig = new ClusterConfig(clusterName);
+ clusterConfig.resetTaskQuotaRatioMap();
+ clusterConfig.setTaskQuotaRatio("type1", 30);
+ clusterConfig.setTaskQuotaRatio("type2", 10);
+
+ TaskDataCache taskDataCache = Mockito.mock(TaskDataCache.class);
+ when(taskDataCache.getJobConfigMap()).thenReturn(Collections.emptyMap());
+
+ AssignableInstanceManager assignableInstanceManager = new
AssignableInstanceManager();
+ assignableInstanceManager.buildAssignableInstances(clusterConfig,
taskDataCache,
+ liveInstanceMap, instanceConfigMap);
+
+
monitor.updateAvailableThreadsPerJob(assignableInstanceManager.getGlobalCapacityMap());
+ ObjectName type1ObjectName =
monitor.getObjectName(monitor.getJobBeanName("type1"));
+ ObjectName type2ObjectName =
monitor.getObjectName(monitor.getJobBeanName("type2"));
+ Assert.assertTrue(_server.isRegistered(type1ObjectName));
+ Assert.assertEquals(_server.getAttribute(type1ObjectName,
"AvailableThreadGauge"), 90L);
+ Assert.assertTrue(_server.isRegistered(type2ObjectName));
+ Assert.assertEquals(_server.getAttribute(type2ObjectName,
"AvailableThreadGauge"), 30L);
+
+ TaskAssignResult taskAssignResult = Mockito.mock(TaskAssignResult.class);
+ when(taskAssignResult.getQuotaType()).thenReturn("type1");
+ // Use non-existing instance to bypass the actual assignment, but still
decrease thread counts
+ assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+ // Do it twice for type 1
+ assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+ when(taskAssignResult.getQuotaType()).thenReturn("type2");
+ assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+
+
monitor.updateAvailableThreadsPerJob(assignableInstanceManager.getGlobalCapacityMap());
+ Assert.assertEquals(_server.getAttribute(type1ObjectName,
"AvailableThreadGauge"), 88L);
+ Assert.assertEquals(_server.getAttribute(type2ObjectName,
"AvailableThreadGauge"), 29L);
+ }
+
private void verifyCapacityMetrics(ClusterStatusMonitor monitor, Map<String,
Double> maxUsageMap,
Map<String, Map<String, Integer>> instanceCapacityMap)
throws MalformedObjectNameException, IOException,
AttributeNotFoundException, MBeanException,