Add throttling to prevent too many workflows/jobs created.

ZK has issue that a large amount of nodes in one path will prevent 
getChildNames to be return successfully.
This change is a workaround to minimize the problem before ZK service side is 
ready.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/51170220
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/51170220
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/51170220

Branch: refs/heads/master
Commit: 51170220b6be851d0bdc4c3977f0420259c4a8e9
Parents: ac39b39
Author: Jiajun Wang <[email protected]>
Authored: Fri Nov 3 15:54:04 2017 -0700
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:30:25 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 48 +++++++----
 .../integration/task/WorkflowGenerator.java     | 16 ++++
 .../helix/task/TestTaskCreateThrottling.java    | 88 ++++++++++++++++++++
 3 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/51170220/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 732a717..99fa761 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -19,23 +19,8 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.*;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -49,6 +34,8 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+
 /**
  * CLI for scheduling/canceling workflows
  */
@@ -70,11 +57,23 @@ public class TaskDriver {
   /** Default time out for monitoring workflow or job state */
   private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
 
+  // HELIX-619 This is a temporary solution for too many ZK nodes issue.
+  // Limit workflows/jobs creation to prevent the problem.
+  //
+  // Note this limitation should be smaller than ZK capacity. If current nodes 
count already exceeds
+  // the CAP, the verification method will not throw exception since the 
getChildNames() call will
+  // return empty list.
+  //
+  // TODO Implement or configure the limitation in ZK server.
+  private final static int DEFAULT_CONFIGS_LIMITATION = 10000;
+  protected int _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
+
   private final HelixDataAccessor _accessor;
   private final HelixPropertyStore<ZNRecord> _propertyStore;
   private final HelixAdmin _admin;
   private final String _clusterName;
 
+
   public TaskDriver(HelixManager manager) {
     this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
         manager.getHelixPropertyStore(), manager.getClusterName());
@@ -114,6 +113,8 @@ public class TaskDriver {
     LOG.info("Starting workflow " + flow.getName());
     flow.validate();
 
+    validateZKNodeLimitation(flow.getJobConfigs().keySet().size() + 1);
+
     WorkflowConfig newWorkflowConfig =
         new 
WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
 
@@ -324,6 +325,8 @@ public class TaskDriver {
       }
     }
 
+    validateZKNodeLimitation(1);
+
     // Create the job to ensure that it validates
     JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
     final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
@@ -786,4 +789,17 @@ public class TaskDriver {
       throws InterruptedException {
     return pollForJobState(workflowName, jobName, _defaultTimeout, states);
   }
+
+  /**
+   * Throw Exception if children nodes will exceed limitation after adding 
newNodesCount children.
+   * @param newConfigNodeCount
+   */
+  private void validateZKNodeLimitation(int newConfigNodeCount) {
+    List<String> resourceConfigs =
+        _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
+    if (resourceConfigs.size() + newConfigNodeCount > _configsLimitation) {
+      throw new HelixException(
+          "Cannot create more workflows or jobs because there are already too 
many items created in the path CONFIGS.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51170220/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 38797d3..5db0431 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -76,4 +76,20 @@ public class WorkflowGenerator {
 
     return builder;
   }
+
+  public static Workflow.Builder 
generateDefaultRepeatedJobWorkflowBuilder(String workflowName, int jobCount) {
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    JobConfig.Builder jobBuilder = 
JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+    jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+    builder.addJob(JOB_NAME_1, jobBuilder);
+
+    for (int i = 0; i < jobCount - 1; i++) {
+      String jobName = JOB_NAME_2 + "-" + i;
+      builder.addParentChildDependency(JOB_NAME_1, jobName);
+      builder.addJob(jobName, jobBuilder);
+    }
+
+    return builder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51170220/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java
new file mode 100644
index 0000000..3cb1605
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java
@@ -0,0 +1,88 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestTaskCreateThrottling extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+    _driver._configsLimitation = 10;
+  }
+
+  @Test
+  public void testTaskCreatingThrottle() {
+    Workflow flow = WorkflowGenerator
+        .generateDefaultRepeatedJobWorkflowBuilder("hugeWorkflow", 
_driver._configsLimitation + 1)
+        .build();
+    try {
+      _driver.start(flow);
+      Assert.fail("Creating a huge workflow contains more jobs than expected 
should fail.");
+    } catch (HelixException e) {
+      // expected
+    }
+  }
+
+  @Test(dependsOnMethods = { "testTaskCreatingThrottle" })
+  public void testEnqueueJobsThrottle() throws InterruptedException {
+    List<String> jobs = new ArrayList<>();
+    // Use a short name for testing
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue("Q");
+    builder.setCapacity(Integer.MAX_VALUE);
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            
.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+    for (int i = 0; i < _driver._configsLimitation - 5; i++) {
+      builder.enqueueJob("J" + i, jobBuilder);
+      jobs.add("J" + i);
+    }
+    JobQueue jobQueue = builder.build();
+    // check if large number of jobs smaller than the threshold is OK.
+    _driver.start(jobQueue);
+    _driver.stop(jobQueue.getName());
+    try {
+      for (int i = 0; i < _driver._configsLimitation; i++) {
+        _driver.enqueueJob(jobQueue.getName(), "EJ" + i, jobBuilder);
+        jobs.add("EJ" + i);
+      }
+      Assert.fail("Enqueuing a huge number of jobs should fail.");
+    } catch (HelixException e) {
+      // expected
+    }
+
+    for (String job : jobs) {
+      _driver.deleteJob(jobQueue.getName(), job);
+    }
+    _driver.delete(jobQueue.getName());
+  }
+}

Reply via email to