This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new ec214b2  Fix ondemand rebalance flooding and log flooding caused by 
dangling jobs (#1508)
ec214b2 is described below

commit ec214b2e809c6895382019243007e0167680502b
Author: Neal Sun <[email protected]>
AuthorDate: Wed Nov 4 09:44:08 2020 -0800

    Fix ondemand rebalance flooding and log flooding caused by dangling jobs 
(#1508)
    
    This commit changes runtime dag refresh logic to eliminate ondemand 
rebalance
    flooding caused by dangling jobs. This commit also changes log level to get 
rid of
    log flooding caused by missing target resources.
---
 .../apache/helix/common/caches/TaskDataCache.java  | 33 +----------
 .../stages/ResourceComputationStage.java           |  3 +-
 .../TestWorkflowControllerDataProvider.java        | 66 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 32 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index f6f6a72..d400e78 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -116,36 +116,7 @@ public class TaskDataCache extends AbstractDataCache {
       }
     }
 
-    // The following 3 blocks is for finding a list of workflows whose JobDAGs 
have been changed
-    // because their RuntimeJobDags would need to be re-built
-    // newly added jobs
-    for (String jobName : newJobConfigs.keySet()) {
-      if (!_jobConfigMap.containsKey(jobName) && 
newJobConfigs.get(jobName).getWorkflow() != null) {
-        workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow());
-      }
-
-      // Only for JobQueues when a new job is enqueued, there exists a race 
condition where only
-      // JobConfig is updated and the RuntimeJobDag does not get updated 
because when the client
-      // (TaskDriver) submits, it creates JobConfig ZNode first and modifies 
its parent JobDag next.
-      // To ensure that they are both properly updated, check that workflow's 
DAG and existing
-      // JobConfigs are consistent for JobQueues
-      JobConfig jobConfig = newJobConfigs.get(jobName);
-      if (_workflowConfigMap.containsKey(jobConfig.getWorkflow())) {
-        WorkflowConfig workflowConfig = 
_workflowConfigMap.get(jobConfig.getWorkflow());
-        // Check that the job's parent workflow's DAG contains this job
-        if ((workflowConfig.isJobQueue() || !workflowConfig.isTerminable()) && 
!_runtimeJobDagMap
-            
.get(workflowConfig.getWorkflowId()).getAllNodes().contains(jobName)) {
-          // Inconsistency between JobConfigs and DAGs found. Add the workflow 
to workflowsUpdated
-          // to rebuild the RuntimeJobDag
-          workflowsUpdated.add(jobConfig.getWorkflow());
-        }
-      }
-    }
-
-    // Removed jobs
-    // This block makes sure that the workflow config has been changed.
-    // This avoid the race condition where job config has been purged but job 
has not been deleted
-    // from JobDag yet
+    // If the workflow config has been updated, it's possible that the dag has 
been changed.
     for (String workflowName : _workflowConfigMap.keySet()) {
       if (_runtimeJobDagMap.containsKey(workflowName)) {
         if (_workflowConfigMap.get(workflowName).getRecord().getVersion() != 
_runtimeJobDagMap
@@ -189,7 +160,7 @@ public class TaskDataCache extends AbstractDataCache {
     for (String resourceName : childNames) {
       contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
     }
-    
+
     List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, 
null, 0, true);
 
     for (int i = 0; i < contexts.size(); i++) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 284479c..7c2cac6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -164,7 +164,8 @@ public class ResourceComputationStage extends 
AbstractBaseStage {
       if (numPartitions == 0 && idealStates != null) {
         IdealState targetIs = idealStates.get(jobConfig.getTargetResource());
         if (targetIs == null) {
-          LOG.warn("Target resource " + jobConfig.getTargetResource() + " does 
not exist for job " + resourceName);
+          LOG.debug("Target resource " + jobConfig.getTargetResource() + " 
does not exist for job "
+              + resourceName);
         } else {
           numPartitions = targetIs.getPartitionSet().size();
         }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
 
b/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
index 312a1b3..d8f9c1d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/dataproviders/TestWorkflowControllerDataProvider.java
@@ -19,10 +19,20 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.RuntimeJobDag;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -70,4 +80,60 @@ public class TestWorkflowControllerDataProvider extends 
TaskTestBase {
     Assert.assertTrue(expectedValuesAchieved);
 
   }
+
+  @Test (dependsOnMethods = "testResourceConfigRefresh")
+  public void testRuntimeDagRefresh() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(jobQueueName);
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+    builder.enqueueJob(WorkflowGenerator.JOB_NAME_1, jobBuilder);
+    String jobName1 = TaskUtil.getNamespacedJobName(jobQueueName, 
WorkflowGenerator.JOB_NAME_1);
+    _driver.start(builder.build());
+
+    WorkflowControllerDataProvider cache =
+        new WorkflowControllerDataProvider("CLUSTER_" + 
TestHelper.getTestClassName());
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      cache.requireFullRefresh();
+      cache.refresh(_manager.getHelixDataAccessor());
+      return cache.getTaskDataCache().getJobConfig(jobName1) != null;
+    }, TestHelper.WAIT_DURATION));
+    RuntimeJobDag runtimeJobDag = 
cache.getTaskDataCache().getRuntimeJobDag(jobQueueName);
+    Assert.assertEquals(Collections.singleton(jobName1), 
runtimeJobDag.getAllNodes());
+
+    // Mimic job running
+    runtimeJobDag.getNextJob();
+
+    // Add job config without adding it to the dag
+    String danglingJobName = TaskUtil.getNamespacedJobName(jobQueueName, 
"DanglingJob");
+    JobConfig danglingJobConfig = new JobConfig(danglingJobName, 
jobBuilder.build());
+    PropertyKey.Builder keyBuilder = 
_manager.getHelixDataAccessor().keyBuilder();
+    _baseAccessor
+        .create(keyBuilder.resourceConfig(danglingJobName).getPath(), 
danglingJobConfig.getRecord(),
+            AccessOption.PERSISTENT);
+
+    // There shouldn't be a refresh to runtime dag. The dag should only 
contain one job and the job is inflight.
+    Assert.assertTrue(TestHelper.verify(() -> {
+      cache.requireFullRefresh();
+      cache.refresh(_manager.getHelixDataAccessor());
+      return cache.getTaskDataCache().getJobConfig(danglingJobName) != null;
+    }, TestHelper.WAIT_DURATION));
+    runtimeJobDag = cache.getTaskDataCache().getRuntimeJobDag(jobQueueName);
+    Assert.assertEquals(Collections.singleton(jobName1), 
runtimeJobDag.getAllNodes());
+    Assert.assertEquals(Collections.singleton(jobName1), 
runtimeJobDag.getInflightJobList());
+
+    _driver.enqueueJob(jobQueueName, WorkflowGenerator.JOB_NAME_2, jobBuilder);
+    String jobName2 = TaskUtil.getNamespacedJobName(jobQueueName, 
WorkflowGenerator.JOB_NAME_2);
+
+    // There should be a refresh to runtime dag.
+    Assert.assertTrue(TestHelper.verify(() -> {
+      cache.requireFullRefresh();
+      cache.refresh(_manager.getHelixDataAccessor());
+      return cache.getTaskDataCache().getJobConfig(jobName2) != null;
+    }, TestHelper.WAIT_DURATION));
+    runtimeJobDag = cache.getTaskDataCache().getRuntimeJobDag(jobQueueName);
+    Assert.assertEquals(new HashSet<>(Arrays.asList(jobName1, jobName2)),
+        runtimeJobDag.getAllNodes());
+    Assert.assertEquals(Collections.emptyList(), 
runtimeJobDag.getInflightJobList());
+  }
 }

Reply via email to