Add throttling to prevent too many workflows/jobs created. ZK has issue that a large amount of nodes in one path will prevent getChildNames to be return successfully. This change is a workaround to minimize the problem before ZK service side is ready.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/51170220 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/51170220 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/51170220 Branch: refs/heads/master Commit: 51170220b6be851d0bdc4c3977f0420259c4a8e9 Parents: ac39b39 Author: Jiajun Wang <[email protected]> Authored: Fri Nov 3 15:54:04 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:30:25 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 48 +++++++---- .../integration/task/WorkflowGenerator.java | 16 ++++ .../helix/task/TestTaskCreateThrottling.java | 88 ++++++++++++++++++++ 3 files changed, 136 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/51170220/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 732a717..99fa761 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -19,23 +19,8 @@ package org.apache.helix.task; * under the License. */ -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - import org.I0Itec.zkclient.DataUpdater; -import org.apache.helix.AccessOption; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.ZNRecord; +import org.apache.helix.*; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -49,6 +34,8 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; + /** * CLI for scheduling/canceling workflows */ @@ -70,11 +57,23 @@ public class TaskDriver { /** Default time out for monitoring workflow or job state */ private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */ + // HELIX-619 This is a temporary solution for too many ZK nodes issue. + // Limit workflows/jobs creation to prevent the problem. + // + // Note this limitation should be smaller than ZK capacity. If current nodes count already exceeds + // the CAP, the verification method will not throw exception since the getChildNames() call will + // return empty list. + // + // TODO Implement or configure the limitation in ZK server. + private final static int DEFAULT_CONFIGS_LIMITATION = 10000; + protected int _configsLimitation = DEFAULT_CONFIGS_LIMITATION; + private final HelixDataAccessor _accessor; private final HelixPropertyStore<ZNRecord> _propertyStore; private final HelixAdmin _admin; private final String _clusterName; + public TaskDriver(HelixManager manager) { this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), manager.getClusterName()); @@ -114,6 +113,8 @@ public class TaskDriver { LOG.info("Starting workflow " + flow.getName()); flow.validate(); + validateZKNodeLimitation(flow.getJobConfigs().keySet().size() + 1); + WorkflowConfig newWorkflowConfig = new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build(); @@ -324,6 +325,8 @@ public class TaskDriver { } } + validateZKNodeLimitation(1); + // Create the job to ensure that it validates JobConfig jobConfig = jobBuilder.setWorkflow(queue).build(); final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job); @@ -786,4 +789,17 @@ public class TaskDriver { throws InterruptedException { return pollForJobState(workflowName, jobName, _defaultTimeout, states); } + + /** + * Throw Exception if children nodes will exceed limitation after adding newNodesCount children. + * @param newConfigNodeCount + */ + private void validateZKNodeLimitation(int newConfigNodeCount) { + List<String> resourceConfigs = + _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs()); + if (resourceConfigs.size() + newConfigNodeCount > _configsLimitation) { + throw new HelixException( + "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS."); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/51170220/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java index 38797d3..5db0431 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java @@ -76,4 +76,20 @@ public class WorkflowGenerator { return builder; } + + public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName, int jobCount) { + Workflow.Builder builder = new Workflow.Builder(workflowName); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG); + jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG); + + builder.addJob(JOB_NAME_1, jobBuilder); + + for (int i = 0; i < jobCount - 1; i++) { + String jobName = JOB_NAME_2 + "-" + i; + builder.addParentChildDependency(JOB_NAME_1, jobName); + builder.addJob(jobName, jobBuilder); + } + + return builder; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/51170220/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java new file mode 100644 index 0000000..3cb1605 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskCreateThrottling.java @@ -0,0 +1,88 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.TaskTestUtil; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +public class TestTaskCreateThrottling extends TaskTestBase { + @BeforeClass + public void beforeClass() throws Exception { + setSingleTestEnvironment(); + super.beforeClass(); + _driver._configsLimitation = 10; + } + + @Test + public void testTaskCreatingThrottle() { + Workflow flow = WorkflowGenerator + .generateDefaultRepeatedJobWorkflowBuilder("hugeWorkflow", _driver._configsLimitation + 1) + .build(); + try { + _driver.start(flow); + Assert.fail("Creating a huge workflow contains more jobs than expected should fail."); + } catch (HelixException e) { + // expected + } + } + + @Test(dependsOnMethods = { "testTaskCreatingThrottle" }) + public void testEnqueueJobsThrottle() throws InterruptedException { + List<String> jobs = new ArrayList<>(); + // Use a short name for testing + JobQueue.Builder builder = TaskTestUtil.buildJobQueue("Q"); + builder.setCapacity(Integer.MAX_VALUE); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2) + .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L); + for (int i = 0; i < _driver._configsLimitation - 5; i++) { + builder.enqueueJob("J" + i, jobBuilder); + jobs.add("J" + i); + } + JobQueue jobQueue = builder.build(); + // check if large number of jobs smaller than the threshold is OK. + _driver.start(jobQueue); + _driver.stop(jobQueue.getName()); + try { + for (int i = 0; i < _driver._configsLimitation; i++) { + _driver.enqueueJob(jobQueue.getName(), "EJ" + i, jobBuilder); + jobs.add("EJ" + i); + } + Assert.fail("Enqueuing a huge number of jobs should fail."); + } catch (HelixException e) { + // expected + } + + for (String job : jobs) { + _driver.deleteJob(jobQueue.getName(), job); + } + _driver.delete(jobQueue.getName()); + } +}
