This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/task_pool by this push:
new ba9488b Allow Configurable Thread Pool Size in TaskStateModelFactory
(#973)
ba9488b is described below
commit ba9488b81f6a05d16ee7dd0c4cac1fdb1709a209
Author: Neal Sun <[email protected]>
AuthorDate: Tue May 12 20:13:01 2020 -0700
Allow Configurable Thread Pool Size in TaskStateModelFactory (#973)
This PR changes TaskStateModelFactory logic such that it now obtains the
thread pool size from InstanceConfig/ClusterConfig that may be specified by the
users. This PR also added setters and getters for InstanceConfig/ClusterConfig
for thread pool size fields. This PR also added thread pool size reporting
during LiveInstance creation, with setters and getters for LiveInstance.
---
.../helix/manager/zk/ParticipantManager.java | 3 +
.../apache/helix/manager/zk/ZKHelixManager.java | 6 +-
.../java/org/apache/helix/model/ClusterConfig.java | 41 ++++++-
.../org/apache/helix/model/InstanceConfig.java | 28 ++++-
.../java/org/apache/helix/model/LiveInstance.java | 23 +++-
.../java/org/apache/helix/task/TaskConstants.java | 7 ++
.../apache/helix/task/TaskStateModelFactory.java | 71 ++++++++++--
.../main/java/org/apache/helix/task/TaskUtil.java | 54 +++++++++
.../helix/task/assigner/AssignableInstance.java | 4 +-
.../main/java/org/apache/helix/util/HelixUtil.java | 10 ++
.../multizk/TestMultiZkHelixJavaApis.java | 4 +
.../helix/integration/task/TestTaskThreadLeak.java | 5 +-
.../helix/manager/zk/TestParticipantManager.java | 44 ++++++++
.../helix/manager/zk/TestZkClusterManager.java | 10 +-
.../org/apache/helix/model/TestClusterConfig.java | 24 ++++
.../org/apache/helix/model/TestInstanceConfig.java | 24 ++++
.../TestLiveInstance.java} | 59 ++++++----
.../helix/task/TestTaskStateModelFactory.java | 124 +++++++++++++++++++++
.../java/org/apache/helix/task/TestTaskUtil.java | 118 ++++++++++++++++++++
.../task/assigner/TestAssignableInstance.java | 8 +-
20 files changed, 615 insertions(+), 52 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 8bb54ff..8d6a99e 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -52,6 +52,7 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import
org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -254,6 +255,8 @@ public class ParticipantManager {
liveInstance.setSessionId(_sessionId);
liveInstance.setHelixVersion(_manager.getVersion());
liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+ liveInstance.setCurrentTaskThreadPoolSize(
+ TaskUtil.getTargetThreadPoolSize(_zkclient, _clusterName,
_instanceName));
// LiveInstanceInfoProvider liveInstanceInfoProvider =
_manager._liveInstanceInfoProvider;
if (_liveInstanceInfoProvider != null) {
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e2429d7..9b1bfdd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1331,7 +1331,7 @@ public class ZKHelixManager implements HelixManager,
IZkStateListener {
* ZkConnections.
*/
private RealmAwareZkClient createSingleRealmZkClient() {
- final String shardingKey = buildShardingKey();
+ final String shardingKey =
HelixUtil.clusterNameToShardingKey(_clusterName);
PathBasedZkSerializer zkSerializer =
ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
@@ -1390,8 +1390,4 @@ public class ZKHelixManager implements HelixManager,
IZkStateListener {
return zkClientFactory.buildZkClient(helixZkConnectionConfig,
helixZkClientConfig);
}
-
- private String buildShardingKey() {
- return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
- }
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..d149182 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -109,7 +109,14 @@ public class ClusterConfig extends HelixProperty {
//
https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
//
// Default to be true.
- GLOBAL_REBALANCE_ASYNC_MODE
+ GLOBAL_REBALANCE_ASYNC_MODE,
+
+ // The target size of task thread pools for each participant. If
participants specify their
+ // individual pool sizes in their InstanceConfig's, this value will NOT be
used; if participants
+ // don't specify their individual pool sizes, this value will be used for
all participants; if
+ // none of participants or the cluster define pool sizes,
+ // TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool
sizes.
+ GLOBAL_TARGET_TASK_THREAD_POOL_SIZE
}
public enum GlobalRebalancePreferenceKey {
@@ -137,6 +144,7 @@ public class ClusterConfig extends HelixProperty {
private final static int MAX_REBALANCE_PREFERENCE = 10;
private final static int MIN_REBALANCE_PREFERENCE = 0;
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED =
true;
+ private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
/**
* Instantiate for a specific cluster
@@ -710,6 +718,37 @@ public class ClusterConfig extends HelixProperty {
}
/**
+ * Get the global target size of task thread pools. This values applies to
all participants in
+ * the cluster; it's only used if participants don't specify their
individual pool sizes in their
+ * InstanceConfig's. If none of participants or the cluster define pool
sizes,
+ * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool
sizes.
+ * @return the global target size of task thread pool
+ */
+ public int getGlobalTargetTaskThreadPoolSize() {
+ return _record
+
.getIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+ GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+ }
+
+ /**
+ * Set the global target size of task thread pools for this cluster. This
values applies to all
+ * participants in the cluster; it's only used if participants don't specify
their individual
+ * pool sizes in their InstanceConfig's. If none of participants or the
cluster define pool sizes,
+ * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool
sizes.
+ * @param globalTargetTaskThreadPoolSize - the new global target task thread
pool size
+ * @throws IllegalArgumentException - when the provided new thread pool size
is negative
+ */
+ public void setGlobalTargetTaskThreadPoolSize(int
globalTargetTaskThreadPoolSize)
+ throws IllegalArgumentException {
+ if (globalTargetTaskThreadPoolSize < 0) {
+ throw new IllegalArgumentException("globalTargetTaskThreadPoolSize must
be non-negative!");
+ }
+ _record
+
.setIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+ globalTargetTaskThreadPoolSize);
+ }
+
+ /**
* @return The required Instance Capacity Keys. If not configured, return an
empty list.
*/
public List<String> getInstanceCapacityKeys() {
diff --git
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 24e6154..010d943 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -56,11 +56,13 @@ public class InstanceConfig extends HelixProperty {
DOMAIN,
DELAY_REBALANCE_ENABLED,
MAX_CONCURRENT_TASK,
- INSTANCE_CAPACITY_MAP
+ INSTANCE_CAPACITY_MAP,
+ TARGET_TASK_THREAD_POOL_SIZE
}
public static final int WEIGHT_NOT_SET = -1;
public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
+ private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final Logger _logger =
LoggerFactory.getLogger(InstanceConfig.class.getName());
@@ -507,6 +509,30 @@ public class InstanceConfig extends HelixProperty {
}
/**
+ * Get the target size of task thread pool.
+ * @return the target size of task thread pool
+ */
+ public int getTargetTaskThreadPoolSize() {
+ return _record
+
.getIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+ TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+ }
+
+ /**
+ * Set the target size of task thread pool.
+ * @param targetTaskThreadPoolSize - the new target task thread pool size
+ * @throws IllegalArgumentException - when the provided new thread pool size
is negative
+ */
+ public void setTargetTaskThreadPoolSize(int targetTaskThreadPoolSize)
+ throws IllegalArgumentException {
+ if (targetTaskThreadPoolSize < 0) {
+ throw new IllegalArgumentException("targetTaskThreadPoolSize must be
non-negative!");
+ }
+
_record.setIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+ targetTaskThreadPoolSize);
+ }
+
+ /**
* Get the instance capacity information from the map fields.
* @return data map if it exists, or empty map
*/
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 74260cc..d9945c7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
import java.util.Map;
import org.apache.helix.HelixProperty;
+import org.apache.helix.task.TaskConstants;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,8 @@ public class LiveInstance extends HelixProperty {
HELIX_VERSION,
LIVE_INSTANCE,
ZKPROPERTYTRANSFERURL,
- RESOURCE_CAPACITY
+ RESOURCE_CAPACITY,
+ CURRENT_TASK_THREAD_POOL_SIZE
}
/**
@@ -190,6 +192,25 @@ public class LiveInstance extends HelixProperty {
_record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(),
url);
}
+ /**
+ * Get the current task thread pool size of the instance. For backward
compatibility, return
+ * DEFAULT_TASK_THREAD_POOL_SIZE if it's not defined
+ * @return the current task thread pool size
+ */
+ public int getCurrentTaskThreadPoolSize() {
+ return
_record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+ TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ }
+
+ /**
+ * Set the current task thread pool size of the instance
+ * @param currentTaskThreadPoolSize the current task thread pool size
+ */
+ public void setCurrentTaskThreadPoolSize(int currentTaskThreadPoolSize) {
+
_record.setIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+ currentTaskThreadPoolSize);
+ }
+
@Override
public boolean isValid() {
if (getEphemeralOwner() == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index d17e29a..d9745b6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -49,4 +49,11 @@ public class TaskConstants {
public static final String PREV_RA_NODE = "PreviousResourceAssignment";
public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
+
+ /**
+ * The default task thread pool size that will be used to create thread
pools if target thread
+ * pool sizes are not defined in InstanceConfig or ClusterConfig; also used
as the current thread
+ * pool size default value if the current thread pool size is not defined in
LiveInstance
+ */
+ public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;
}
diff --git
a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index f30dd9f..90873d5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
* under the License.
*/
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -27,12 +28,22 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMException;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Factory class for {@link TaskStateModel}.
*/
@@ -44,25 +55,31 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
private final ScheduledExecutorService _taskExecutor;
private final ScheduledExecutorService _timerTaskExecutor;
private ThreadPoolExecutorMonitor _monitor;
- public final static int TASK_THREADPOOL_SIZE = 40;
public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory>
taskFactoryRegistry) {
- this(manager, taskFactoryRegistry,
- Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new
ThreadFactory() {
- private AtomicInteger threadId = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "TaskStateModelFactory-task_thread-" +
threadId.getAndIncrement());
- }
- }));
+ this(manager, taskFactoryRegistry,
Executors.newScheduledThreadPool(TaskUtil
+ .getTargetThreadPoolSize(createZkClient(manager),
manager.getClusterName(),
+ manager.getInstanceName()), new ThreadFactory() {
+ private AtomicInteger threadId = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TaskStateModelFactory-task_thread-" +
threadId.getAndIncrement());
+ }
+ }));
}
+ // DO NOT USE! This size of provided thread pool will not be reflected to
controller
+ // properly, the controller may over schedule tasks to this participant.
Task Framework needs to
+ // have full control of the thread pool unlike the state transition thread
pool.
+ @Deprecated
public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory>
taskFactoryRegistry,
ScheduledExecutorService taskExecutor) {
_manager = manager;
_taskFactoryRegistry = taskFactoryRegistry;
_taskExecutor = taskExecutor;
+ // TODO: Hunter: I'm not sure why this needs to be a single thread
executor. We could certainly
+ // use more threads for timer tasks.
_timerTaskExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -74,7 +91,7 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
_monitor = new
ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
(ThreadPoolExecutor) _taskExecutor);
} catch (JMException e) {
- LOG.warn("Error in creating ThreadPoolExecutorMonitor for
TaskStateModelFactory.");
+ LOG.warn("Error in creating ThreadPoolExecutorMonitor for
TaskStateModelFactory.", e);
}
}
}
@@ -102,4 +119,36 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
public boolean isTerminated() {
return _taskExecutor.isTerminated();
}
+
+ /*
+ * Create a RealmAwareZkClient to get thread pool sizes
+ */
+ protected static RealmAwareZkClient createZkClient(HelixManager manager) {
+ // TODO: revisit the logic here - we are creating a connection although we
already have a
+ // manager. We cannot use the connection within manager because some users
connect the manager
+ // after registering the state model factory (in which case we cannot use
manager's connection),
+ // and some connect the manager before registering the state model factory
(in which case we
+ //can use manager's connection). We need to think about the right order
and determine if we
+ //want to enforce it, which may cause backward incompatibility.
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new
ZNRecordSerializer());
+
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ String clusterName = manager.getClusterName();
+ String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+ .setZkRealmShardingKey(shardingKey).build();
+ try {
+ return new FederatedZkClient(connectionConfig, clientConfig);
+ } catch (IOException | InvalidRoutingDataException |
IllegalStateException e) {
+ throw new HelixException("Failed to create FederatedZkClient!", e);
+ }
+ }
+
+ return SharedZkClientFactory.getInstance().buildZkClient(
+ new
HelixZkClient.ZkConnectionConfig(manager.getMetadataStoreConnectionString()),
+ clientConfig.createHelixZkClientConfig().setZkSerializer(new
ZNRecordSerializer()));
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 917a69b..c67c14c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -30,11 +30,17 @@ import java.util.Set;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -1045,4 +1051,52 @@ public class TaskUtil {
rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
}
}
+
+ /**
+ * Get target thread pool size from InstanceConfig first; if InstanceConfig
doesn't exist or the
+ * value is undefined, try ClusterConfig; if the value is undefined in
ClusterConfig, fall back
+ * to the default value.
+ * @param zkClient - ZooKeeper connection for config reading
+ * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+ * @param instanceName - the instance name for InstanceConfig
+ * @return target thread pool size
+ */
+ public static int getTargetThreadPoolSize(RealmAwareZkClient zkClient,
String clusterName,
+ String instanceName) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+
+ // Check instance config first for thread pool size
+ if (ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName,
InstanceType.PARTICIPANT)) {
+ InstanceConfig instanceConfig =
configAccessor.getInstanceConfig(clusterName, instanceName);
+ if (instanceConfig != null) {
+ int targetTaskThreadPoolSize =
instanceConfig.getTargetTaskThreadPoolSize();
+ // Reject negative values. The pool size is only negative when it's
not set in
+ // InstanceConfig, or when the users bypassed the setter logic in
InstanceConfig. We treat
+ // negative values as the value is not set, and continue with
ClusterConfig.
+ if (targetTaskThreadPoolSize >= 0) {
+ return targetTaskThreadPoolSize;
+ }
+ } else {
+ LOG.warn(
+ "Got null as InstanceConfig for instance {} in cluster {}.
Continuing with ClusterConfig. ",
+ instanceName, clusterName);
+ }
+ }
+
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ if (clusterConfig != null) {
+ int globalTargetTaskThreadPoolSize =
clusterConfig.getGlobalTargetTaskThreadPoolSize();
+ // Reject negative values. The pool size is only negative when it's not
set in
+ // ClusterConfig, or when the users bypassed the setter logic in
ClusterConfig. We treat
+ // negative values as the value is not set, and continue with the
default value.
+ if (globalTargetTaskThreadPoolSize >= 0) {
+ return globalTargetTaskThreadPoolSize;
+ }
+ } else {
+ LOG.warn("Got null as ClusterConfig for cluster {}. Returning default
value: {}. ",
+ clusterName, TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ }
+
+ return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index c1f2ef6..194db41 100644
---
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -28,7 +28,7 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +113,7 @@ public class AssignableInstance {
if (resourceCapacity == null) {
resourceCapacity = new HashMap<>();
resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
- Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
+ Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
logger.debug("No resource capacity provided in LiveInstance {}, assuming
default capacity: {}",
_instanceConfig.getInstanceName(), resourceCapacity);
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 3480fb6..bbf6446 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -87,6 +87,16 @@ public final class HelixUtil {
return path.substring(path.lastIndexOf('/') + 1);
}
+ /**
+ * Convert a cluster name to a sharding key for routing purpose by adding a
"/" to the front.
+ * Check if the cluster name already has a "/" at the front; if so just
return it.
+ * @param clusterName - cluster name
+ * @return the sharding key corresponding the cluster name
+ */
+ public static String clusterNameToShardingKey(String clusterName) {
+ return clusterName.charAt(0) == '/' ? clusterName : "/" + clusterName;
+ }
+
public static String serializeByComma(List<String> objects) {
return Joiner.on(",").join(objects);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index d4bdcf9..742bb76 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -73,6 +73,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -149,6 +150,9 @@ public class TestMultiZkHelixJavaApis {
System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
"http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" +
msdsNamespace);
+ // HttpRoutingDataReader's routing data may be set by other tests using
the same endpoint;
+ // reset() for good measure
+ HttpRoutingDataReader.reset();
// Create a FederatedZkClient for admin work
_zkClient =
new FederatedZkClient(new
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
index 4e3e289..2f6b034 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
@@ -25,8 +25,8 @@ import org.apache.helix.TestHelper;
import org.apache.helix.model.IdealState;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.testng.Assert;
@@ -73,7 +73,8 @@ public class TestTaskThreadLeak extends TaskTestBase {
int threadCountAfter = getThreadCount("TaskStateModelFactory");
Assert.assertTrue(
- (threadCountAfter - _threadCountBefore) <=
TaskStateModelFactory.TASK_THREADPOOL_SIZE + 1);
+ (threadCountAfter - _threadCountBefore) <=
TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE
+ + 1);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
index a03f4f2..1ebe747 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
@@ -28,7 +28,10 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -145,6 +148,47 @@ public class TestParticipantManager extends ZkTestBase {
deleteCluster(clusterName);
}
+ @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+ public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+ // Using a pool sized different from the default value to verify
correctness
+ final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE
+ 1;
+
+ final String className = TestHelper.getTestClassName();
+ final String methodName = TestHelper.getTestMethodName();
+ final String clusterName = className + "_" + methodName;
+
+ final ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
+ new
ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_ADDR).build());
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ final String instanceName = "localhost_12918";
+ final MockParticipantManager manager =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+ InstanceConfig instanceConfig =
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+ instanceConfig.setTargetTaskThreadPoolSize(testThreadPoolSize);
+ accessor.setProperty(keyBuilder.instanceConfig(instanceName),
instanceConfig);
+
+ manager.syncStart();
+
+ final LiveInstance liveInstance =
accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ Assert.assertNotNull(liveInstance);
+ Assert.assertEquals(liveInstance.getCurrentTaskThreadPoolSize(),
testThreadPoolSize);
+
+ // Clean up.
+ manager.syncStop();
+ deleteCluster(clusterName);
+ }
+
/*
* Mocks PreConnectCallback to insert session expiry during
ParticipantManager#handleNewSession()
*/
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 9c5b64b..cfbc42d 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -167,7 +167,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_0"));
Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 0);
Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 0);
- Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 3);
+ Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
manager.disconnect();
@@ -180,7 +180,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
liveInstance =
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_1"));
Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
- Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
+ Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
manager.disconnect();
@@ -193,7 +193,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
liveInstance =
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_2"));
Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
- Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+ Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
@@ -208,7 +208,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
liveInstance =
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
- Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+ Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
String sessionId = liveInstance.getEphemeralOwner();
@@ -219,7 +219,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
liveInstance =
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
- Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+ Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
Assert.assertFalse(sessionId.equals(liveInstance.getEphemeralOwner()));
diff --git
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..ac2763b 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -70,6 +70,30 @@ public class TestClusterConfig {
}
@Test
+ public void testGetGlobalTargetTaskThreadPoolSize() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.getRecord().setIntField(
+
ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
100);
+
+ Assert.assertEquals(testConfig.getGlobalTargetTaskThreadPoolSize(), 100);
+ }
+
+ @Test
+ public void testSetGlobalTargetTaskThreadPoolSize() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setGlobalTargetTaskThreadPoolSize(100);
+
+ Assert.assertEquals(testConfig.getRecord().getIntField(
+
ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
-1), 100);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSetGlobalTargetTaskThreadPoolSizeIllegalArgument() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setGlobalTargetTaskThreadPoolSize(-1);
+ }
+
+ @Test
public void testGetRebalancePreference() {
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new
HashMap<>();
preference.put(EVENNESS, 5);
diff --git
a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 9b47677..d0b2f58 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -123,4 +123,28 @@ public class TestInstanceConfig {
InstanceConfig testConfig = new InstanceConfig("testConfig");
testConfig.setInstanceCapacityMap(capacityDataMap);
}
+
+ @Test
+ public void testGetTargetTaskThreadPoolSize() {
+ InstanceConfig testConfig = new InstanceConfig("testConfig");
+ testConfig.getRecord().setIntField(
+
InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+ Assert.assertEquals(testConfig.getTargetTaskThreadPoolSize(), 100);
+ }
+
+ @Test
+ public void testSetTargetTaskThreadPoolSize() {
+ InstanceConfig testConfig = new InstanceConfig("testConfig");
+ testConfig.setTargetTaskThreadPoolSize(100);
+
+ Assert.assertEquals(testConfig.getRecord().getIntField(
+
InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), -1),
100);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSetTargetTaskThreadPoolSizeIllegalArgument() {
+ InstanceConfig testConfig = new InstanceConfig("testConfig");
+ testConfig.setTargetTaskThreadPoolSize(-1);
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
similarity index 81%
rename from
helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
rename to helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
index c3aace7..f53f9ec 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
@@ -1,4 +1,4 @@
-package org.apache.helix.manager.zk;
+package org.apache.helix.model;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -35,15 +35,27 @@ import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestZKLiveInstanceData extends ZkUnitTestBase {
+public class TestLiveInstance extends ZkUnitTestBase {
private final String clusterName = CLUSTER_PREFIX + "_" +
getShortClassName();
+ @BeforeClass()
+ public void beforeClass() throws Exception {
+ _gSetupTool.addCluster(clusterName, true);
+ _gSetupTool
+ .addInstancesToCluster(clusterName, new String[] { "localhost:54321",
"localhost:54322" });
+ }
+
+ @AfterClass()
+ public void afterClass() throws Exception {
+ deleteCluster(clusterName);
+ }
+
@Test
public void testDataChange() throws Exception {
// Create an admin and add LiveInstanceChange listener to it
@@ -101,23 +113,6 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase
{
Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live
instance");
adminManager.disconnect();
-
- }
-
- @BeforeClass()
- public void beforeClass() throws Exception {
- _gSetupTool.addCluster(clusterName, true);
- _gSetupTool
- .addInstancesToCluster(clusterName, new String[] { "localhost:54321",
"localhost:54322" });
- }
-
- @AfterClass()
- public void afterClass() throws Exception {
- deleteCluster(clusterName);
- }
-
- private String[] getArgs(String... args) {
- return args;
}
private List<LiveInstance> deepCopy(List<LiveInstance> instances) {
@@ -127,4 +122,28 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase
{
}
return result;
}
+
+ @Test(dependsOnMethods = "testDataChange")
+ public void testGetCurrentTaskThreadPoolSize() {
+ LiveInstance testLiveInstance = new LiveInstance("testId");
+ testLiveInstance.getRecord()
+
.setIntField(LiveInstance.LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
100);
+
+ Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+ }
+
+ @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize")
+ public void testGetCurrentTaskThreadPoolSizeDefault() {
+ LiveInstance testLiveInstance = new LiveInstance("testId");
+
+ Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(),
TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSizeDefault")
+ public void testSetCurrentTaskThreadPoolSize() {
+ LiveInstance testLiveInstance = new LiveInstance("testId");
+ testLiveInstance.setCurrentTaskThreadPoolSize(100);
+
+ Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
new file mode 100644
index 0000000..514256f
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+ // This value has to be different from the default value to verify
correctness
+ private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+ TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+ @Test
+ public void testConfigAccessorCreationMultiZk() throws Exception {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ InstanceConfig instanceConfig =
+
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+
instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ anyParticipantManager.getConfigAccessor()
+ .setInstanceConfig(anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName(), instanceConfig);
+
+ // Start a msds server
+ // TODO: Refactor all MSDS_SERVER_ENDPOINT creation in system property to
one place.
+ // Any test that modifies MSDS_SERVER_ENDPOINT system property and accesses
+ // HttpRoutingDataReader (ex. TestMultiZkHelixJavaApis and this test) will
cause the
+ // MSDS_SERVER_ENDPOINT system property to be recorded as final in
HttpRoutingDataReader; that
+ // means any test class that satisfies the aforementioned condition and is
executed first gets
+ // to "decide" the default msds endpoint. The only workaround is for all
these test classes to
+ // use the same default msds endpoint.
+ final String msdsHostName = "localhost";
+ final int msdsPort = 11117;
+ final String msdsNamespace = "multiZkTest";
+ Map<String, Collection<String>> routingData = new HashMap<>();
+ routingData
+ .put(ZK_ADDR, Collections.singletonList("/" +
anyParticipantManager.getClusterName()));
+ MockMetadataStoreDirectoryServer msds =
+ new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort,
msdsNamespace, routingData);
+ msds.startServer();
+
+ // Save previously-set system configs
+ String prevMultiZkEnabled =
System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+ String prevMsdsServerEndpoint =
+
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+ // Turn on multiZk mode in System config
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+ // MSDS endpoint:
http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+ "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" +
msdsNamespace);
+
+ HttpRoutingDataReader.reset();
+ RealmAwareZkClient zkClient =
TaskStateModelFactory.createZkClient(anyParticipantManager);
+ Assert.assertEquals(TaskUtil
+ .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+ // Restore system properties
+ if (prevMultiZkEnabled == null) {
+ System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+ } else {
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
prevMultiZkEnabled);
+ }
+ if (prevMsdsServerEndpoint == null) {
+ System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+ } else {
+ System.setProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY,
prevMsdsServerEndpoint);
+ }
+ msds.stopServer();
+ }
+
+ @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
+ public void testConfigAccessorCreationSingleZk() {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ // Save previously-set system configs
+ String prevMultiZkEnabled =
System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+ // Turn off multiZk mode in System config
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
+
+ RealmAwareZkClient zkClient =
TaskStateModelFactory.createZkClient(anyParticipantManager);
+ Assert.assertEquals(TaskUtil
+ .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+ // Restore system properties
+ if (prevMultiZkEnabled == null) {
+ System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+ } else {
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
prevMultiZkEnabled);
+ }
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..a6bfb69
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,118 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskUtil extends TaskTestBase {
+ // This value has to be different from the default value to verify
correctness
+ private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+ TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+ @Test
+ public void testGetTaskThreadPoolSize() {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ InstanceConfig instanceConfig =
+
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+
instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ anyParticipantManager.getConfigAccessor()
+ .setInstanceConfig(anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName(), instanceConfig);
+
+ ClusterConfig clusterConfig = new
ClusterConfig(anyParticipantManager.getClusterName());
+
clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE
+ 1);
+ anyParticipantManager.getConfigAccessor()
+ .setClusterConfig(anyParticipantManager.getClusterName(),
clusterConfig);
+
+
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+ anyParticipantManager.getClusterName(),
anyParticipantManager.getInstanceName()),
+ TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetTaskThreadPoolSize")
+ public void testGetTaskThreadPoolSizeInstanceConfigUndefined() {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ InstanceConfig instanceConfig =
+
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+ anyParticipantManager.getConfigAccessor()
+ .setInstanceConfig(anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName(), instanceConfig);
+
+ ClusterConfig clusterConfig = new
ClusterConfig(anyParticipantManager.getClusterName());
+
clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ anyParticipantManager.getConfigAccessor()
+ .setClusterConfig(anyParticipantManager.getClusterName(),
clusterConfig);
+
+
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+ anyParticipantManager.getClusterName(),
anyParticipantManager.getInstanceName()),
+ TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetTaskThreadPoolSizeInstanceConfigUndefined")
+ public void testGetTaskThreadPoolSizeInstanceConfigDoesNotExist() {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ HelixDataAccessor helixDataAccessor =
anyParticipantManager.getHelixDataAccessor();
+ helixDataAccessor.removeProperty(
+
helixDataAccessor.keyBuilder().instanceConfig(anyParticipantManager.getInstanceName()));
+
+ ClusterConfig clusterConfig = new
ClusterConfig(anyParticipantManager.getClusterName());
+
clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ anyParticipantManager.getConfigAccessor()
+ .setClusterConfig(anyParticipantManager.getClusterName(),
clusterConfig);
+
+
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+ anyParticipantManager.getClusterName(),
anyParticipantManager.getInstanceName()),
+ TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods =
"testGetTaskThreadPoolSizeInstanceConfigDoesNotExist")
+ public void testGetTaskThreadPoolSizeClusterConfigUndefined() {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ ClusterConfig clusterConfig = new
ClusterConfig(anyParticipantManager.getClusterName());
+ anyParticipantManager.getConfigAccessor()
+ .setClusterConfig(anyParticipantManager.getClusterName(),
clusterConfig);
+
+
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+ anyParticipantManager.getClusterName(),
anyParticipantManager.getInstanceName()),
+ TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ }
+
+ @Test(dependsOnMethods = "testGetTaskThreadPoolSizeClusterConfigUndefined",
expectedExceptions = HelixException.class)
+ public void testGetTaskThreadPoolSizeClusterConfigDoesNotExist() {
+ MockParticipantManager anyParticipantManager = _participants[0];
+
+ HelixDataAccessor helixDataAccessor =
anyParticipantManager.getHelixDataAccessor();
+
helixDataAccessor.removeProperty(helixDataAccessor.keyBuilder().clusterConfig());
+ TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+ anyParticipantManager.getClusterName(),
anyParticipantManager.getInstanceName());
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 0e8b1d0..1dac153 100644
---
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -26,7 +26,7 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
@@ -66,7 +66,7 @@ public class TestAssignableInstance extends AssignerTestBase {
Assert.assertEquals(
(int)
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
.get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+ TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@@ -106,7 +106,7 @@ public class TestAssignableInstance extends
AssignerTestBase {
testQuotaTypes.length);
Assert.assertEquals(
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-
calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE,
testQuotaTypes,
+
calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE,
testQuotaTypes,
testQuotaRatio));
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@@ -176,7 +176,7 @@ public class TestAssignableInstance extends
AssignerTestBase {
// When nothing is configured, we should use default quota type to assign
Map<String, TaskAssignResult> results = new HashMap<>();
- for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
+ for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
String taskId = Integer.toString(i);
TaskConfig task = new TaskConfig("", null, taskId, null);
TaskAssignResult result = ai.tryAssign(task,
AssignableInstance.DEFAULT_QUOTA_TYPE);