This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8ba4c9c19981fffe3c958c6851e2b8b8bf90bfbb Author: Hunter Lee <[email protected]> AuthorDate: Thu Mar 28 12:30:09 2019 -0700 TASK2.0: Add performance metrics to JobMonitor We want to add more metrics to Task Framework so that the user could understand what's going on in case of a slowdown, or get a general sense of how fast the workload is moving. Changelist: 1. Add SubmissionToProcessDelay 2. Add SubmissionToScheduleDelay 3. Add ControllerInducedDelay (for testing) 4. Add JobLatencyGauge 5. Change regular metrics to Dynamic metrics in JobMonitor 6. Add an integration test: TestTaskPerformanceMetrics --- .../monitoring/mbeans/ClusterStatusMonitor.java | 35 ++-- .../apache/helix/monitoring/mbeans/JobMonitor.java | 220 +++++++++++++-------- .../apache/helix/task/AbstractTaskDispatcher.java | 159 ++++++++++++++- .../java/org/apache/helix/task/JobContext.java | 37 +++- .../java/org/apache/helix/task/JobDispatcher.java | 14 +- .../org/apache/helix/task/WorkflowContext.java | 2 +- .../mbeans/TestTaskPerformanceMetrics.java | 151 ++++++++++++++ 7 files changed, 508 insertions(+), 110 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 4616f9e..1d778ee 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -641,10 +641,22 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { jobType = preProcessJobMonitor(jobType); JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType); if (jobMonitor != null) { - jobMonitor.updateJobCounters(to, latency); + jobMonitor.updateJobMetricsWithLatency(to, latency); } } + /** + * TODO: Separate Workflow/Job Monitors from ClusterStatusMonitor because ClusterStatusMonitor is + * getting too big. + * Returns the appropriate JobMonitor for the given type. If it does not exist, create one and + * return it. + * @param jobType + * @return + */ + public JobMonitor getJobMonitor(String jobType) { + return _perTypeJobMonitorMap.get(preProcessJobMonitor(jobType)); + } + private void updateJobGauges(String jobType, TaskState current) { // When first time for WorkflowRebalancer call, jobconfig may not ready. // Thus only check it for gauge. @@ -662,13 +674,17 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { synchronized (_perTypeJobMonitorMap) { if (!_perTypeJobMonitorMap.containsKey(jobType)) { - JobMonitor monitor = new JobMonitor(_clusterName, jobType); + String jobMonitorBeanName = getJobBeanName(jobType); + JobMonitor monitor = null; try { - registerJob(monitor); - } catch (MalformedObjectNameException e) { + monitor = new JobMonitor(_clusterName, jobType, getObjectName(jobMonitorBeanName)); + monitor.register(); // Necessary for dynamic metrics + } catch (Exception e) { LOG.error("Failed to register job type : " + jobType, e); } - _perTypeJobMonitorMap.put(jobType, monitor); + if (monitor != null) { + _perTypeJobMonitorMap.put(jobType, monitor); + } } } return jobType; @@ -789,17 +805,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } } - private void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException { - String jobBeanName = getJobBeanName(jobMonitor.getJobType()); - register(jobMonitor, getObjectName(jobBeanName)); - } - - private void unregisterAllJobs() throws MalformedObjectNameException { + private void unregisterAllJobs() { synchronized (_perTypeJobMonitorMap) { Iterator<Map.Entry<String, JobMonitor>> jobIter = _perTypeJobMonitorMap.entrySet().iterator(); while (jobIter.hasNext()) { Map.Entry<String, JobMonitor> jobEntry = jobIter.next(); - unregister(getObjectName(getJobBeanName(jobEntry.getKey()))); + jobEntry.getValue().unregister(); jobIter.remove(); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java index 39108cf..6589e96 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java @@ -19,82 +19,82 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.management.JMException; +import javax.management.ObjectName; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric; import org.apache.helix.task.TaskState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -public class JobMonitor implements JobMonitorMBean { +public class JobMonitor extends DynamicMBeanProvider { private static final String JOB_KEY = "Job"; private static final Logger LOG = LoggerFactory.getLogger(JobMonitor.class); private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour + // For registering dynamic metrics + private final ObjectName _initObjectName; + private String _clusterName; private String _jobType; - - private long _successfullJobCount; - private long _failedJobCount; - private long _abortedJobCount; - private long _existingJobGauge; - private long _queuedJobGauge; - private long _runningJobGauge; - private long _maximumJobLatencyGauge; - private long _jobLatencyCount; private long _lastResetTime; - public JobMonitor(String clusterName, String jobType) { + // Counters + private SimpleDynamicMetric<Long> _successfulJobCount; + private SimpleDynamicMetric<Long> _failedJobCount; + private SimpleDynamicMetric<Long> _abortedJobCount; + + // Gauges + private SimpleDynamicMetric<Long> _existingJobGauge; + private SimpleDynamicMetric<Long> _queuedJobGauge; + private SimpleDynamicMetric<Long> _runningJobGauge; + @Deprecated // To be removed (replaced by jobLatencyGauge Histogram) + private SimpleDynamicMetric<Long> _maximumJobLatencyGauge; + @Deprecated // To be removed (replaced by jobLatencyGauge Histogram) + private SimpleDynamicMetric<Long> _jobLatencyCount; + + // Histogram + private HistogramDynamicMetric _jobLatencyGauge; + private HistogramDynamicMetric _submissionToProcessDelayGauge; + private HistogramDynamicMetric _submissionToScheduleDelayGauge; + private HistogramDynamicMetric _controllerInducedDelayGauge; + + public JobMonitor(String clusterName, String jobType, ObjectName objectName) { _clusterName = clusterName; _jobType = jobType; - _successfullJobCount = 0L; - _failedJobCount = 0L; - _abortedJobCount = 0L; - _existingJobGauge = 0L; - _queuedJobGauge = 0L; - _runningJobGauge = 0L; + _initObjectName = objectName; _lastResetTime = System.currentTimeMillis(); - _jobLatencyCount = 0L; - _maximumJobLatencyGauge = 0L; - } - - @Override - public long getSuccessfulJobCount() { - return _successfullJobCount; - } - - @Override - public long getFailedJobCount() { - return _failedJobCount; - } - - @Override - public long getAbortedJobCount() { - return _abortedJobCount; - } - - @Override - public long getExistingJobGauge() { - return _existingJobGauge; - } - - @Override - public long getQueuedJobGauge() { - return _queuedJobGauge; - } - - @Override - public long getRunningJobGauge() { - return _runningJobGauge; - } - @Override - public long getMaximumJobLatencyGauge() { - return _maximumJobLatencyGauge; - } - - @Override - public long getJobLatencyCount() { - return _jobLatencyCount; + // Instantiate simple dynamic metrics + _successfulJobCount = new SimpleDynamicMetric("SuccessfulJobCount", 0L); + _failedJobCount = new SimpleDynamicMetric("FailedJobCount", 0L); + _abortedJobCount = new SimpleDynamicMetric("AbortedJobCount", 0L); + _existingJobGauge = new SimpleDynamicMetric("ExistingJobGauge", 0L); + _queuedJobGauge = new SimpleDynamicMetric("QueuedJobGauge", 0L); + _runningJobGauge = new SimpleDynamicMetric("RunningJobGauge", 0L); + _maximumJobLatencyGauge = new SimpleDynamicMetric("MaximumJobLatencyGauge", 0L); + _jobLatencyCount = new SimpleDynamicMetric("JobLatencyCount", 0L); + + // Instantiate histogram dynamic metrics + _jobLatencyGauge = new HistogramDynamicMetric("JobLatencyGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _submissionToProcessDelayGauge = + new HistogramDynamicMetric("SubmissionToProcessDelayGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _submissionToScheduleDelayGauge = + new HistogramDynamicMetric("SubmissionToScheduleDelayGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); + _controllerInducedDelayGauge = + new HistogramDynamicMetric("ControllerInducedDelayGauge", new Histogram( + new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); } @Override @@ -110,38 +110,37 @@ public class JobMonitor implements JobMonitorMBean { * Update job counters with transition state * @param to The to state of job, cleaned by ZK when it is null */ - - public void updateJobCounters(TaskState to) { - updateJobCounters(to, 0); + public void updateJobMetricsWithLatency(TaskState to) { + updateJobMetricsWithLatency(to, 0); } - public void updateJobCounters(TaskState to, long latency) { + public void updateJobMetricsWithLatency(TaskState to, long latency) { // TODO maybe use separate TIMED_OUT counter later if (to.equals(TaskState.FAILED) || to.equals(TaskState.TIMED_OUT)) { - _failedJobCount++; + incrementSimpleDynamicMetric(_failedJobCount); } else if (to.equals(TaskState.COMPLETED)) { - _successfullJobCount++; - + incrementSimpleDynamicMetric(_successfulJobCount); // Only count succeeded jobs - _maximumJobLatencyGauge = Math.max(_maximumJobLatencyGauge, latency); - _jobLatencyCount += latency > 0 ? latency : 0; + _maximumJobLatencyGauge.updateValue(Math.max(_maximumJobLatencyGauge.getValue(), latency)); + if (latency > 0) { + incrementSimpleDynamicMetric(_jobLatencyCount, latency); + _jobLatencyGauge.updateValue(latency); + } } else if (to.equals(TaskState.ABORTED)) { - _abortedJobCount++; + incrementSimpleDynamicMetric(_abortedJobCount); } - - } /** * Reset job gauges */ public void resetJobGauge() { - _queuedJobGauge = 0L; - _existingJobGauge = 0L; - _runningJobGauge = 0L; + _queuedJobGauge.updateValue(0L); + _existingJobGauge.updateValue(0L); + _runningJobGauge.updateValue(0L); if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) { _lastResetTime = System.currentTimeMillis(); - _maximumJobLatencyGauge = 0L; + _maximumJobLatencyGauge.updateValue(0L); } } @@ -150,11 +149,76 @@ public class JobMonitor implements JobMonitorMBean { * @param to The current state of job */ public void updateJobGauge(TaskState to) { - _existingJobGauge++; + incrementSimpleDynamicMetric(_existingJobGauge); if (to == null || to.equals(TaskState.NOT_STARTED)) { - _queuedJobGauge++; + incrementSimpleDynamicMetric(_queuedJobGauge); } else if (to.equals(TaskState.IN_PROGRESS)) { - _runningJobGauge++; + incrementSimpleDynamicMetric(_runningJobGauge); } } + + /** + * Update SubmissionToProcessDelay to its corresponding HistogramDynamicMetric. + * @param delay + */ + public void updateSubmissionToProcessDelayGauge(long delay) { + _submissionToProcessDelayGauge.updateValue(delay); + } + + /** + * Update SubmissionToScheduleDelay to its corresponding HistogramDynamicMetric. + * @param delay + */ + public void updateSubmissionToScheduleDelayGauge(long delay) { + _submissionToScheduleDelayGauge.updateValue(delay); + } + + /** + * Update ControllerInducedDelay to its corresponding HistogramDynamicMetric. + * @param delay + */ + public void updateControllerInducedDelayGauge(long delay) { + _controllerInducedDelayGauge.updateValue(delay); + } + + /** + * This method registers the dynamic metrics. + * @return + * @throws JMException + */ + @Override + public DynamicMBeanProvider register() throws JMException { + List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); + attributeList.add(_successfulJobCount); + attributeList.add(_failedJobCount); + attributeList.add(_abortedJobCount); + attributeList.add(_existingJobGauge); + attributeList.add(_queuedJobGauge); + attributeList.add(_runningJobGauge); + attributeList.add(_maximumJobLatencyGauge); + attributeList.add(_jobLatencyCount); + attributeList.add(_jobLatencyGauge); + attributeList.add(_submissionToProcessDelayGauge); + attributeList.add(_submissionToScheduleDelayGauge); + attributeList.add(_controllerInducedDelayGauge); + doRegister(attributeList, _initObjectName); + return this; + } + + /** + * NOTE: This method is not thread-safe nor atomic. + * Increment the value of a given SimpleDynamicMetric by 1. + */ + private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) { + metric.updateValue(metric.getValue() + 1); + } + + /** + * NOTE: This method is not thread-safe nor atomic. + * Increment the value of a given SimpleDynamicMetric by 1. + */ + private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long value) { + metric.updateValue(metric.getValue() + value); + } + } diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index a6345f6..4de8112 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -1,5 +1,24 @@ package org.apache.helix.task; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.ArrayList; @@ -11,10 +30,13 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.Callable; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; +import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.CurrentStateOutput; @@ -23,12 +45,14 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.monitoring.mbeans.JobMonitor; import org.apache.helix.task.assigner.AssignableInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AbstractTaskDispatcher { private static final Logger LOG = LoggerFactory.getLogger(AbstractTaskDispatcher.class); + private static final String TASK_LATENCY_TAG = "Latency"; // For connection management protected HelixManager _manager; @@ -470,12 +494,13 @@ public abstract class AbstractTaskDispatcher { // This is the actual assigning part protected void handleAdditionalTaskAssignment( Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances, - String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg, - WorkflowConfig workflowConfig, WorkflowContext workflowCtx, - WorkflowControllerDataProvider cache, ResourceAssignment prevTaskToInstanceStateAssignment, + String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, + final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx, + final WorkflowControllerDataProvider cache, + ResourceAssignment prevTaskToInstanceStateAssignment, Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment> paMap, Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal, - Set<Integer> allPartitions, long currentTime, Collection<String> liveInstances) { + Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) { // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline boolean existsLiveInstanceOrCurrentStateChange = @@ -602,7 +627,14 @@ public abstract class AbstractTaskDispatcher { excludeSet.add(pId); jobCtx.setAssignedParticipant(pId, instance); jobCtx.setPartitionState(pId, TaskPartitionState.INIT); - jobCtx.setPartitionStartTime(pId, System.currentTimeMillis()); + final long currentTimestamp = System.currentTimeMillis(); + jobCtx.setPartitionStartTime(pId, currentTimestamp); + if (jobCtx.getExecutionStartTime() == WorkflowContext.NOT_STARTED) { + // This means this is the very first task scheduled for this job + jobCtx.setExecutionStartTime(currentTimestamp); + reportSubmissionToScheduleDelay(cache, _clusterStatusMonitor, workflowConfig, jobCfg, + currentTimestamp); + } if (LOG.isDebugEnabled()) { LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, TaskPartitionState.RUNNING, instance)); @@ -792,12 +824,13 @@ public abstract class AbstractTaskDispatcher { } } - protected void markJobComplete(String jobName, JobContext jobContext, - WorkflowConfig workflowConfig, WorkflowContext workflowContext, - Map<String, JobConfig> jobConfigMap, WorkflowControllerDataProvider dataProvider) { + protected void markJobComplete(final String jobName, final JobContext jobContext, + final WorkflowConfig workflowConfig, WorkflowContext workflowContext, + final Map<String, JobConfig> jobConfigMap, + final WorkflowControllerDataProvider dataProvider) { finishJobInRuntimeJobDag(dataProvider.getTaskDataCache(), workflowConfig.getWorkflowId(), jobName); - long currentTime = System.currentTimeMillis(); + final long currentTime = System.currentTimeMillis(); workflowContext.setJobState(jobName, TaskState.COMPLETED); jobContext.setFinishTime(currentTime); if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, dataProvider)) { @@ -805,6 +838,13 @@ public abstract class AbstractTaskDispatcher { updateWorkflowMonitor(workflowContext, workflowConfig); } scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime); + + // Job has completed successfully so report ControllerInducedDelay + JobConfig jobConfig = jobConfigMap.get(jobName); + if (jobConfig != null) { + reportControllerInducedDelay(dataProvider, _clusterStatusMonitor, workflowConfig, jobConfig, + currentTime); + } } protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, @@ -1175,4 +1215,105 @@ public abstract class AbstractTaskDispatcher { jobName)); } } + + /** + * TODO: Move this logic to Task Framework metrics class for refactoring. + * Computes and passes on submissionToProcessDelay to the dynamic metric. + * @param dataProvider + * @param clusterStatusMonitor + * @param workflowConfig + * @param jobConfig + * @param currentTimestamp + */ + protected static void reportSubmissionToProcessDelay(BaseControllerDataProvider dataProvider, + final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig, + final JobConfig jobConfig, final long currentTimestamp) { + AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() { + @Override + public Object call() { + // Asynchronously update the appropriate JobMonitor + JobMonitor jobMonitor = clusterStatusMonitor + .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); + if (jobMonitor == null) { + return null; + } + + // Compute SubmissionToProcessDelay + long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime(); + jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay); + return null; + } + }); + } + + /** + * TODO: Move this logic to Task Framework metrics class for refactoring. + * Computes and passes on submissionToScheduleDelay to the dynamic metric. + * @param dataProvider + * @param clusterStatusMonitor + * @param workflowConfig + * @param jobConfig + * @param currentTimestamp + */ + private static void reportSubmissionToScheduleDelay(BaseControllerDataProvider dataProvider, + final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig, + final JobConfig jobConfig, final long currentTimestamp) { + AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() { + @Override + public Object call() { + // Asynchronously update the appropriate JobMonitor + JobMonitor jobMonitor = clusterStatusMonitor + .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); + if (jobMonitor == null) { + return null; + } + + // Compute SubmissionToScheduleDelay + long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime(); + jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay); + return null; + } + }); + } + + /** + * TODO: Move this logic to Task Framework metrics class for refactoring. + * Computes and passes on controllerInducedDelay to the dynamic metric. + * @param dataProvider + * @param clusterStatusMonitor + * @param workflowConfig + * @param jobConfig + * @param currentTimestamp + */ + private static void reportControllerInducedDelay(BaseControllerDataProvider dataProvider, + final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig, + final JobConfig jobConfig, final long currentTimestamp) { + AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>() { + @Override + public Object call() { + // Asynchronously update the appropriate JobMonitor + JobMonitor jobMonitor = clusterStatusMonitor + .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig)); + if (jobMonitor == null) { + return null; + } + + // Compute ControllerInducedDelay only if the workload is a test load + // NOTE: this metric cannot be computed for general user-submitted workloads because + // the actual runtime of the tasks vary, and there could exist multiple tasks per + // job + // NOTE: a test workload will have the "latency" field in the mapField of the + // JobConfig (taskConfig) + String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next(); + if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG)) { + long taskDuration = + Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG)); + long controllerInducedDelay = + currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration; + jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay); + } + return null; + } + }); + } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java index 2d878fb..c84f660 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java @@ -38,7 +38,7 @@ import com.google.common.collect.Sets; */ public class JobContext extends HelixProperty { private enum ContextProperties { - START_TIME, + START_TIME, // Time at which this JobContext was created STATE, NUM_ATTEMPTS, FINISH_TIME, @@ -47,7 +47,8 @@ public class JobContext extends HelixProperty { ASSIGNED_PARTICIPANT, NEXT_RETRY_TIME, INFO, - NAME + NAME, + EXECUTION_START_TIME, // Time at which the first task of this job got scheduled } public JobContext(ZNRecord record) { @@ -61,7 +62,7 @@ public class JobContext extends HelixProperty { public long getStartTime() { String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString()); if (tStr == null) { - return WorkflowContext.UNSTARTED; + return WorkflowContext.NOT_STARTED; } return Long.parseLong(tStr); } @@ -141,11 +142,11 @@ public class JobContext extends HelixProperty { public long getPartitionStartTime(int p) { Map<String, String> map = getMapField(p); if (map == null) { - return WorkflowContext.UNSTARTED; + return WorkflowContext.NOT_STARTED; } String tStr = map.get(ContextProperties.START_TIME.toString()); if (tStr == null) { - return WorkflowContext.UNSTARTED; + return WorkflowContext.NOT_STARTED; } return Long.parseLong(tStr); } @@ -273,8 +274,30 @@ public class JobContext extends HelixProperty { } /** + * Only set the execution start time when it hasn't already been set. + * NOTE: This method is not thread-safe. However, it is okay because even if this does get written + * twice due to a race condition, that means the timestamps will be close enough to get a fairly + * good estimate for the execution start time. We do not want to affect the task status update + * performance and ultimately, this execution start time is an estimate in and of itself anyways. + * @param t + */ + public void setExecutionStartTime(long t) { + String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString()); + if (tStr == null) { + _record.setSimpleField(ContextProperties.EXECUTION_START_TIME.toString(), String.valueOf(t)); + } + } + + public long getExecutionStartTime() { + String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString()); + if (tStr == null) { + return WorkflowContext.NOT_STARTED; + } + return Long.parseLong(tStr); + } + + /** * Get MapField for the given partition. - * * @param p * @return mapField for the partition, NULL if the partition has not scheduled yet. */ @@ -286,7 +309,7 @@ public class JobContext extends HelixProperty { String pStr = String.valueOf(p); Map<String, String> map = _record.getMapField(pStr); if (map == null && createIfNotPresent) { - map = new TreeMap<String, String>(); + map = new TreeMap<>(); _record.setMapField(pStr, map); } return map; diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 6530f3a..56e04ac 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -29,8 +29,10 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; import org.apache.helix.ZNRecord; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; +import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.Message; import org.apache.helix.model.Partition; @@ -54,7 +56,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName, CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) { // Fetch job configuration - JobConfig jobCfg = _dataProvider.getJobConfig(jobName); + final JobConfig jobCfg = _dataProvider.getJobConfig(jobName); if (jobCfg == null) { LOG.error("Job configuration is NULL for " + jobName); return buildEmptyAssignment(jobName, currStateOutput); @@ -62,7 +64,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { String workflowResource = jobCfg.getWorkflow(); // Fetch workflow configuration and context - WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource); + final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource); if (workflowCfg == null) { LOG.error("Workflow configuration is NULL for " + jobName); return buildEmptyAssignment(jobName, currStateOutput); @@ -113,13 +115,19 @@ public class JobDispatcher extends AbstractTaskDispatcher { JobContext jobCtx = _dataProvider.getJobContext(jobName); if (jobCtx == null) { jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW)); - jobCtx.setStartTime(System.currentTimeMillis()); + final long currentTimestamp = System.currentTimeMillis(); + jobCtx.setStartTime(currentTimestamp); jobCtx.setName(jobName); // This job's JobContext has not been created yet. Since we are creating a new JobContext // here, we must also create its UserContentStore TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName, new ZNRecord(TaskUtil.USER_CONTENT_NODE)); workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS); + + // Since this job has been processed for the first time, we report SubmissionToProcessDelay + // here asynchronously + reportSubmissionToProcessDelay(_dataProvider, _clusterStatusMonitor, workflowCfg, jobCfg, + currentTimestamp); } if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) { diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java index 0e91648..c915e3b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java @@ -50,7 +50,7 @@ public class WorkflowContext extends HelixProperty { NAME } - public static final int UNSTARTED = -1; + public static final int NOT_STARTED = -1; public static final int UNFINISHED = -1; public WorkflowContext(ZNRecord record) { diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java new file mode 100644 index 0000000..2792488 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java @@ -0,0 +1,151 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.Query; +import javax.management.QueryExp; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskSynchronizedTestBase; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Tests that performance profiling metrics via JobMonitorMBean are computed correctly. + */ +public class TestTaskPerformanceMetrics extends TaskSynchronizedTestBase { + private static final long TASK_LATENCY = 100L; + // Configurable values for test setup + private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer(); + private Map<String, Object> _beanValueMap = new HashMap<>(); + + @BeforeClass + public void beforeClass() throws Exception { + setSingleTestEnvironment(); + super.beforeClass(); + } + + /** + * Test the following metrics are dynamically emitted: + * SubmissionToStartDelay + * ControllerInducedDelay + * The test schedules a workflow with 30 jobs, each with one task with TASK_LATENCY. + * AllowOverlapJobAssignment is false, so these jobs will be run in series, one at a time. + * With this setup, we can assume that the mean value of the metrics above will increase every + * time we poll at some interval greater than TASK_LATENCY. + * @throws Exception + */ + @Test + public void testTaskPerformanceMetrics() throws Exception { + // Create a workflow + JobConfig.Builder jobConfigBuilder = new JobConfig.Builder(); + TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder(); + List<TaskConfig> taskConfigs = new ArrayList<>(); + TaskConfig taskConfig = taskConfigBuilder.setTaskId("1").setCommand("Reindex").build(); + taskConfig.getConfigMap().put("Latency", Long.toString(TASK_LATENCY)); + taskConfigs.add(taskConfig); + jobConfigBuilder.addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, Long.toString(TASK_LATENCY))); + Workflow.Builder workflowBuilder = new Workflow.Builder("wf"); + for (int i = 0; i < 30; i++) { + workflowBuilder.addJob("job_" + i, jobConfigBuilder); + } + Workflow workflow = workflowBuilder.build(); + + // Start the controller and start the workflow + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME); + _controller.syncStart(); + _driver.start(workflow); + + // Confirm that there are metrics computed dynamically here and keeps increasing because jobs + // are processed one by one + double oldSubmissionToStartDelay = -1L; + double oldControllerInducedDelay = -1L; + for (int i = 0; i < 5; i++) { + // The dynamic metrics should generally be updated within 2 seconds or it would be too slow + Thread.sleep(2000L); + + extractMetrics(); + + // For SubmissionToProcessDelay, the value will stay constant because the Controller will + // create JobContext right away most of the time + Assert.assertTrue(_beanValueMap.containsKey("SubmissionToProcessDelayGauge.Mean")); + Assert.assertTrue(_beanValueMap.containsKey("SubmissionToScheduleDelayGauge.Mean")); + Assert.assertTrue(_beanValueMap.containsKey("ControllerInducedDelayGauge.Mean")); + + // Get the new values + double submissionToProcessDelay = + (double) _beanValueMap.get("SubmissionToProcessDelayGauge.Mean"); + double newSubmissionToScheduleDelay = + (double) _beanValueMap.get("SubmissionToScheduleDelayGauge.Mean"); + double newControllerInducedDelay = + (double) _beanValueMap.get("ControllerInducedDelayGauge.Mean"); + + Assert.assertTrue(submissionToProcessDelay > 0); + Assert.assertTrue(oldSubmissionToStartDelay < newSubmissionToScheduleDelay); + Assert.assertTrue(oldControllerInducedDelay < newControllerInducedDelay); + + oldSubmissionToStartDelay = newSubmissionToScheduleDelay; + oldControllerInducedDelay = newControllerInducedDelay; + } + } + + /** + * Queries for all MBeans from the MBean Server and only looks at the relevant MBean and gets its + * metric numbers. + */ + private void extractMetrics() { + try { + QueryExp exp = Query.match(Query.attr("SensorName"), Query.value("*")); + Set<ObjectInstance> mbeans = new HashSet<>( + ManagementFactory.getPlatformMBeanServer().queryMBeans(new ObjectName(""), exp)); + for (ObjectInstance instance : mbeans) { + ObjectName beanName = instance.getObjectName(); + if (instance.getClassName().contains("JobMonitor")) { + MBeanInfo info = _server.getMBeanInfo(beanName); + MBeanAttributeInfo[] infos = info.getAttributes(); + for (MBeanAttributeInfo infoItem : infos) { + Object val = _server.getAttribute(beanName, infoItem.getName()); + _beanValueMap.put(infoItem.getName(), val); + } + } + } + } catch (Exception e) { + // update failed + } + } +}
