Add JobTypes in WorkflowConfig and improve monitor performance
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8ec6246 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8ec6246 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8ec6246 Branch: refs/heads/master Commit: c8ec6246ffdbd377e2ec1c58404ed1bbffb076d1 Parents: ad760ae Author: Junkai Xue <[email protected]> Authored: Mon Sep 25 14:39:04 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Sep 25 14:39:04 2017 -0700 ---------------------------------------------------------------------- .../monitoring/mbeans/ClusterStatusMonitor.java | 15 ++++----- .../java/org/apache/helix/task/TaskDriver.java | 35 ++++++++++++++++---- 2 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c8ec6246/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 468a0ce..bae3a60 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -483,12 +483,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { if (workflow.isEmpty()) { continue; } - Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes(); + WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflow); + Set<String> allJobs = workflowConfig.getJobDag().getAllNodes(); WorkflowContext workflowContext = driver.getWorkflowContext(workflow); for (String job : allJobs) { - TaskState currentState = - workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job); - updateJobGauges(driver.getJobConfig(job), currentState); + TaskState currentState = workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getJobState(job); + updateJobGauges(workflowConfig.getJobTypes() == null ? null : workflowConfig.getJobTypes().get(job), + currentState); } } } @@ -499,13 +500,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { _perTypeJobMonitorMap.get(jobType).updateJobCounters(to); } - private void updateJobGauges(JobConfig jobConfig, TaskState current) { + private void updateJobGauges(String jobType, TaskState current) { // When first time for WorkflowRebalancer call, jobconfig may not ready. // Thus only check it for gauge. - if (jobConfig == null) { - return; - } - String jobType = jobConfig.getJobType(); jobType = preProcessJobMonitor(jobType); _perTypeJobMonitorMap.get(jobType).updateJobGauge(current); } http://git-wip-us.apache.org/repos/asf/helix/blob/c8ec6246/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index d3dba5b..a639cd0 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -202,19 +202,28 @@ public class TaskDriver { LOG.info("Starting workflow " + flow.getName()); flow.validate(); - // first, add workflow config. - if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), - new WorkflowConfig(flow.getWorkflowConfig(), flow.getName()))) { - LOG.error("Failed to add workflow configuration for workflow " + flow.getName()); - } + WorkflowConfig newWorkflowConfig = + new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap()) + .setWorkflowId(flow.getName()).build(); - // then add all job configs. + Map<String, String> jobTypes = new HashMap<String, String>(); + // add all job configs. for (String job : flow.getJobConfigs().keySet()) { JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job)); if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) { jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job)); } - addJobConfig(job, jobCfgBuilder.build()); + JobConfig jobCfg = jobCfgBuilder.build(); + if (jobCfg.getJobType() != null) { + jobTypes.put(job, jobCfg.getJobType()); + } + addJobConfig(job, jobCfg); + } + newWorkflowConfig.setJobTypes(jobTypes); + + // add workflow config. + if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) { + LOG.error("Failed to add workflow configuration for workflow " + flow.getName()); } // Finally add workflow resource. @@ -549,6 +558,7 @@ public class TaskDriver { // add job config first. addJobConfig(namespacedJobName, jobConfig); + final String jobType = jobConfig.getJobType(); // Add the job to the end of the queue in the DAG DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { @@ -580,6 +590,17 @@ public class TaskDriver { jobDag.addParentToChild(candidate, namespacedJobName); } + // Add job type if job type is not null + if (jobType != null) { + Map<String, String> jobTypes = + currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name()); + if (jobTypes == null) { + jobTypes = new HashMap<String, String>(); + } + jobTypes.put(jobName, jobType); + currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes); + } + // Save the updated DAG try { currentData
