This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new e26b2dc Move task framework related update to async operation
e26b2dc is described below
commit e26b2dcc1c0722c3368e6f07d13bd539bd8cf6dc
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Feb 25 17:11:33 2019 -0800
Move task framework related update to async operation
Move task framework related update to async operation
---
.../helix/controller/GenericHelixController.java | 5 -----
.../controller/stages/ReadClusterDataStage.java | 14 +++++++++++--
.../monitoring/mbeans/ClusterStatusMonitor.java | 24 +++++++++++++---------
3 files changed, 26 insertions(+), 17 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index bd049f8..855a4c4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -572,11 +572,6 @@ public class GenericHelixController implements
IdealStateChangeListener,
_lastPipelineEndTimestamp
- startTime) + " ms"));
logger.info(sb.toString());
- } else if (_isMonitoring) {
- // report workflow status
- TaskDriver driver = new TaskDriver(manager);
- _clusterStatusMonitor.refreshWorkflowsStatus(driver);
- _clusterStatusMonitor.refreshJobsStatus(driver);
}
// If event handling happens before controller deactivate, the process may
write unnecessary
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 2a9e4c3..db1609c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -61,9 +61,9 @@ public class ReadClusterDataStage extends AbstractBaseStage {
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
_cache.refresh(dataAccessor);
final ClusterConfig clusterConfig = cache.getClusterConfig();
+ final ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
if (!_cache.isTaskCache()) {
- final ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
asyncExecute(_cache.getAsyncTasksThreadPool(), new Callable<Object>() {
@Override public Object call() {
// Update the cluster status gauges
@@ -104,6 +104,16 @@ public class ReadClusterDataStage extends
AbstractBaseStage {
return null;
}
});
+ } else {
+ asyncExecute(_cache.getAsyncTasksThreadPool(), new Callable<Object>() {
+ @Override
+ public Object call() {
+ clusterStatusMonitor.refreshWorkflowsStatus(_cache);
+ clusterStatusMonitor.refreshJobsStatus(_cache);
+ LogUtil.logDebug(logger, _eventId, "Workflow/Job gauge status
successfully refreshed");
+ return null;
+ }
+ });
}
event.addAttribute(AttributeName.ClusterDataCache.name(), _cache);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 1455032..e0b7b28 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -40,6 +40,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -550,19 +551,20 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
}
}
- public void refreshWorkflowsStatus(TaskDriver driver) {
+ public void refreshWorkflowsStatus(ClusterDataCache cache) {
for (Map.Entry<String, WorkflowMonitor> workflowMonitor :
_perTypeWorkflowMonitorMap
.entrySet()) {
workflowMonitor.getValue().resetGauges();
}
- Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
+ Map<String, WorkflowConfig> workflowConfigMap =
cache.getWorkflowConfigMap();
for (String workflow : workflowConfigMap.keySet()) {
if (workflowConfigMap.get(workflow).isRecurring() || workflow.isEmpty())
{
continue;
}
- WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
- TaskState currentState = workflowContext == null ? TaskState.NOT_STARTED
: workflowContext.getWorkflowState();
+ WorkflowContext workflowContext = cache.getWorkflowContext(workflow);
+ TaskState currentState =
+ workflowContext == null ? TaskState.NOT_STARTED :
workflowContext.getWorkflowState();
updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
}
}
@@ -607,23 +609,25 @@ public class ClusterStatusMonitor implements
ClusterStatusMonitorMBean {
return workflowType;
}
- public void refreshJobsStatus(TaskDriver driver) {
+ public void refreshJobsStatus(ClusterDataCache cache) {
for (Map.Entry<String, JobMonitor> jobMonitor :
_perTypeJobMonitorMap.entrySet()) {
jobMonitor.getValue().resetJobGauge();
}
- for (String workflow : driver.getWorkflows().keySet()) {
+ for (String workflow : cache.getWorkflowConfigMap().keySet()) {
if (workflow.isEmpty()) {
continue;
}
- WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow);
+ WorkflowConfig workflowConfig = cache.getWorkflowConfig(workflow);
if (workflowConfig == null) {
continue;
}
Set<String> allJobs = workflowConfig.getJobDag().getAllNodes();
- WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
+ WorkflowContext workflowContext = cache.getWorkflowContext(workflow);
for (String job : allJobs) {
- TaskState currentState = workflowContext == null ?
TaskState.NOT_STARTED : workflowContext.getJobState(job);
- updateJobGauges(workflowConfig.getJobTypes() == null ? null :
workflowConfig.getJobTypes().get(job),
+ TaskState currentState =
+ workflowContext == null ? TaskState.NOT_STARTED :
workflowContext.getJobState(job);
+ updateJobGauges(
+ workflowConfig.getJobTypes() == null ? null :
workflowConfig.getJobTypes().get(job),
currentState);
}
}