This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/task_pool by this push:
new 4841bec Add CRUD endpoints to TaskDriver for configurable thread pool
size support (#1011)
4841bec is described below
commit 4841bec6ae350d5098317ca6d3ecc1b57e61e4a8
Author: Neal Sun <[email protected]>
AuthorDate: Mon May 18 11:33:58 2020 -0700
Add CRUD endpoints to TaskDriver for configurable thread pool size support
(#1011)
We are adding CRUD endpoints to TaskDriver that support "setting target
thread pool sizes", "getting target thread pool sizes", "setting global target
thread pool sizes", "getting global target thread pool sizes", and "getting
current thread pool sizes".
---
.../java/org/apache/helix/task/TaskDriver.java | 87 +++++++++++++++++
.../java/org/apache/helix/task/TestTaskDriver.java | 108 +++++++++++++++++++++
2 files changed, 195 insertions(+)
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 506a06b..5f26f73 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -41,7 +41,10 @@ import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
@@ -1137,4 +1140,88 @@ public class TaskDriver {
"Cannot create more workflows or jobs because there are already too
many items created in the path CONFIGS.");
}
}
+
+ /**
+ * Get the target task thread pool size of an instance, a value that's used
to construct the task
+ * thread pool and is created by users.
+ * @param instanceName - name of the instance
+ * @return the target task thread pool size of the instance
+ */
+ public int getTargetTaskThreadPoolSize(String instanceName) {
+ InstanceConfig instanceConfig = getInstanceConfig(instanceName);
+ return instanceConfig.getTargetTaskThreadPoolSize();
+ }
+
+ /**
+ * Set the target task thread pool size of an instance. The target task
thread pool size goes to
+ * InstanceConfig, and is used to construct the task thread pool. The
newly-set target task
+ * thread pool size will take effect upon a JVM restart.
+ * @param instanceName - name of the instance
+ * @param targetTaskThreadPoolSize - the target task thread pool size of the
instance
+ */
+ public void setTargetTaskThreadPoolSize(String instanceName, int
targetTaskThreadPoolSize) {
+ InstanceConfig instanceConfig = getInstanceConfig(instanceName);
+ instanceConfig.setTargetTaskThreadPoolSize(targetTaskThreadPoolSize);
+ }
+
+ private InstanceConfig getInstanceConfig(String instanceName) {
+ InstanceConfig instanceConfig =
+
_accessor.getProperty(_accessor.keyBuilder().instanceConfig(instanceName));
+ if (instanceConfig == null) {
+ throw new IllegalArgumentException(
+ "Failed to find InstanceConfig with provided instance name " +
instanceName + "!");
+ }
+ return instanceConfig;
+ }
+
+ /**
+ * Get the global target task thread pool size of the cluster, a value
that's used to construct
+ * task thread pools for the cluster's instances and is created by users.
+ * @return the global target task thread pool size of the cluster
+ */
+ public int getGlobalTargetTaskThreadPoolSize() {
+ ClusterConfig clusterConfig = getClusterConfig();
+ return clusterConfig.getGlobalTargetTaskThreadPoolSize();
+ }
+
+ /**
+ * Set the global target task thread pool size of the cluster. The global
target task thread pool
+ * size goes to ClusterConfig, and is applied to all instances of the
cluster. If an instance
+ * doesn't specify its target thread pool size in InstanceConfig, then this
value in ClusterConfig
+ * will be used to construct its task thread pool. The newly-set target task
thread pool size will
+ * take effect upon a JVM restart. If none of the global and per-instance
target thread pool sizes
+ * are set, a default size will be used.
+ * @param globalTargetTaskThreadPoolSize - the global target task thread
pool size of the cluster
+ */
+ public void setGlobalTargetTaskThreadPoolSize(int
globalTargetTaskThreadPoolSize) {
+ ClusterConfig clusterConfig = getClusterConfig();
+
clusterConfig.setGlobalTargetTaskThreadPoolSize(globalTargetTaskThreadPoolSize);
+ }
+
+ private ClusterConfig getClusterConfig() {
+ ClusterConfig clusterConfig =
_accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+ if (clusterConfig == null) {
+ throw new IllegalStateException(
+ "Failed to find ClusterConfig for cluster " + _clusterName + "!");
+ }
+ return clusterConfig;
+ }
+
+ /**
+ * Get the current target task thread pool size of an instance. This value
reflects the current
+ * task thread pool size that's already created on the instance, and may be
different from the
+ * target thread pool size.
+ * @param instanceName - name of the instance
+ * @return the current task thread pool size of the instance
+ */
+ public int getCurrentTaskThreadPoolSize(String instanceName) {
+ LiveInstance liveInstance =
+
_accessor.getProperty(_accessor.keyBuilder().liveInstance(instanceName));
+ if (liveInstance == null) {
+ throw new IllegalArgumentException(
+ "Failed to find LiveInstance with provided instance name " +
instanceName + "!");
+ }
+
+ return liveInstance.getCurrentTaskThreadPoolSize();
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
new file mode 100644
index 0000000..dd0ef3d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
@@ -0,0 +1,108 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestTaskDriver extends TaskTestBase {
+ // Use a thread pool size that's different from the default value for test
+ private static final int TEST_THREAD_POOL_SIZE =
TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+ private static final String NON_EXISTENT_INSTANCE_NAME =
"NON_EXISTENT_INSTANCE_NAME";
+
+ private TaskDriver _taskDriver;
+ private ConfigAccessor _configAccessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+
+ _taskDriver = new TaskDriver(_controller);
+ _configAccessor = _controller.getConfigAccessor();
+ }
+
+ @Test
+ public void testGetTargetTaskThreadPoolSize() {
+ String validInstanceName = _participants[0].getInstanceName();
+ InstanceConfig instanceConfig =
+ _configAccessor.getInstanceConfig(CLUSTER_NAME, validInstanceName);
+ instanceConfig.setTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+ _configAccessor.setInstanceConfig(CLUSTER_NAME, validInstanceName,
instanceConfig);
+
+
Assert.assertEquals(_taskDriver.getTargetTaskThreadPoolSize(validInstanceName),
+ TEST_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetTargetTaskThreadPoolSize",
expectedExceptions = IllegalArgumentException.class)
+ public void testGetTargetTaskThreadPoolSizeWrongInstanceName() {
+ _taskDriver.getTargetTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME);
+ }
+
+ @Test(dependsOnMethods = "testGetTargetTaskThreadPoolSizeWrongInstanceName")
+ public void testSetTargetTaskThreadPoolSize() {
+ String validInstanceName = _participants[0].getInstanceName();
+ _taskDriver.setTargetTaskThreadPoolSize(validInstanceName,
TEST_THREAD_POOL_SIZE);
+ InstanceConfig instanceConfig =
+ _configAccessor.getInstanceConfig(CLUSTER_NAME, validInstanceName);
+
+ Assert.assertEquals(instanceConfig.getTargetTaskThreadPoolSize(),
TEST_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testSetTargetTaskThreadPoolSize",
expectedExceptions = IllegalArgumentException.class)
+ public void testSetTargetTaskThreadPoolSizeWrongInstanceName() {
+ _taskDriver.setTargetTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME,
TEST_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testSetTargetTaskThreadPoolSizeWrongInstanceName")
+ public void testGetGlobalTargetTaskThreadPoolSize() {
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ Assert.assertEquals(_taskDriver.getGlobalTargetTaskThreadPoolSize(),
TEST_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetGlobalTargetTaskThreadPoolSize")
+ public void testSetGlobalTargetTaskThreadPoolSize() {
+ _taskDriver.setGlobalTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+
+ Assert.assertEquals(clusterConfig.getGlobalTargetTaskThreadPoolSize(),
TEST_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testSetGlobalTargetTaskThreadPoolSize")
+ public void testGetCurrentTaskThreadPoolSize() {
+ String validInstanceName = _participants[0].getInstanceName();
+
+
Assert.assertEquals(_taskDriver.getCurrentTaskThreadPoolSize(validInstanceName),
+ TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize",
expectedExceptions = IllegalArgumentException.class)
+ public void testGetCurrentTaskThreadPoolSizeWrongInstanceName() {
+ _taskDriver.getCurrentTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME);
+ }
+}