This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new e26b2dc  Move task framework related update to async operation
e26b2dc is described below

commit e26b2dcc1c0722c3368e6f07d13bd539bd8cf6dc
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Feb 25 17:11:33 2019 -0800

    Move task framework related update to async operation
    
    Move task framework related update to async operation
---
 .../helix/controller/GenericHelixController.java   |  5 -----
 .../controller/stages/ReadClusterDataStage.java    | 14 +++++++++++--
 .../monitoring/mbeans/ClusterStatusMonitor.java    | 24 +++++++++++++---------
 3 files changed, 26 insertions(+), 17 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index bd049f8..855a4c4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -572,11 +572,6 @@ public class GenericHelixController implements 
IdealStateChangeListener,
               _lastPipelineEndTimestamp
               - startTime) + " ms"));
       logger.info(sb.toString());
-    } else if (_isMonitoring) {
-      // report workflow status
-      TaskDriver driver = new TaskDriver(manager);
-      _clusterStatusMonitor.refreshWorkflowsStatus(driver);
-      _clusterStatusMonitor.refreshJobsStatus(driver);
     }
 
     // If event handling happens before controller deactivate, the process may 
write unnecessary
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 2a9e4c3..db1609c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -61,9 +61,9 @@ public class ReadClusterDataStage extends AbstractBaseStage {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     _cache.refresh(dataAccessor);
     final ClusterConfig clusterConfig = cache.getClusterConfig();
+    final ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
     if (!_cache.isTaskCache()) {
-      final ClusterStatusMonitor clusterStatusMonitor =
-          event.getAttribute(AttributeName.clusterStatusMonitor.name());
       asyncExecute(_cache.getAsyncTasksThreadPool(), new Callable<Object>() {
         @Override public Object call() {
           // Update the cluster status gauges
@@ -104,6 +104,16 @@ public class ReadClusterDataStage extends 
AbstractBaseStage {
           return null;
         }
       });
+    } else {
+      asyncExecute(_cache.getAsyncTasksThreadPool(), new Callable<Object>() {
+        @Override
+        public Object call() {
+          clusterStatusMonitor.refreshWorkflowsStatus(_cache);
+          clusterStatusMonitor.refreshJobsStatus(_cache);
+          LogUtil.logDebug(logger, _eventId, "Workflow/Job gauge status 
successfully refreshed");
+          return null;
+        }
+      });
     }
     event.addAttribute(AttributeName.ClusterDataCache.name(), _cache);
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 1455032..e0b7b28 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -40,6 +40,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -550,19 +551,20 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     }
   }
 
-  public void refreshWorkflowsStatus(TaskDriver driver) {
+  public void refreshWorkflowsStatus(ClusterDataCache cache) {
     for (Map.Entry<String, WorkflowMonitor> workflowMonitor : 
_perTypeWorkflowMonitorMap
         .entrySet()) {
       workflowMonitor.getValue().resetGauges();
     }
 
-    Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
+    Map<String, WorkflowConfig> workflowConfigMap = 
cache.getWorkflowConfigMap();
     for (String workflow : workflowConfigMap.keySet()) {
       if (workflowConfigMap.get(workflow).isRecurring() || workflow.isEmpty()) 
{
         continue;
       }
-      WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
-      TaskState currentState = workflowContext == null ? TaskState.NOT_STARTED 
: workflowContext.getWorkflowState();
+      WorkflowContext workflowContext = cache.getWorkflowContext(workflow);
+      TaskState currentState =
+          workflowContext == null ? TaskState.NOT_STARTED : 
workflowContext.getWorkflowState();
       updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
     }
   }
@@ -607,23 +609,25 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
     return workflowType;
   }
 
-  public void refreshJobsStatus(TaskDriver driver) {
+  public void refreshJobsStatus(ClusterDataCache cache) {
     for (Map.Entry<String, JobMonitor> jobMonitor : 
_perTypeJobMonitorMap.entrySet()) {
       jobMonitor.getValue().resetJobGauge();
     }
-    for (String workflow : driver.getWorkflows().keySet()) {
+    for (String workflow : cache.getWorkflowConfigMap().keySet()) {
       if (workflow.isEmpty()) {
         continue;
       }
-      WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow);
+      WorkflowConfig workflowConfig = cache.getWorkflowConfig(workflow);
       if (workflowConfig == null) {
         continue;
       }
       Set<String> allJobs = workflowConfig.getJobDag().getAllNodes();
-      WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
+      WorkflowContext workflowContext = cache.getWorkflowContext(workflow);
       for (String job : allJobs) {
-        TaskState currentState = workflowContext == null ? 
TaskState.NOT_STARTED : workflowContext.getJobState(job);
-        updateJobGauges(workflowConfig.getJobTypes() == null ? null : 
workflowConfig.getJobTypes().get(job),
+        TaskState currentState =
+            workflowContext == null ? TaskState.NOT_STARTED : 
workflowContext.getJobState(job);
+        updateJobGauges(
+            workflowConfig.getJobTypes() == null ? null : 
workflowConfig.getJobTypes().get(job),
             currentState);
       }
     }

Reply via email to