This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/task_pool by this push:
     new ba9488b  Allow Configurable Thread Pool Size in TaskStateModelFactory 
(#973)
ba9488b is described below

commit ba9488b81f6a05d16ee7dd0c4cac1fdb1709a209
Author: Neal Sun <[email protected]>
AuthorDate: Tue May 12 20:13:01 2020 -0700

    Allow Configurable Thread Pool Size in TaskStateModelFactory (#973)
    
    This PR changes TaskStateModelFactory logic such that it now obtains the 
thread pool size from InstanceConfig/ClusterConfig that may be specified by the 
users. This PR also added setters and getters for InstanceConfig/ClusterConfig 
for thread pool size fields. This PR also added thread pool size reporting 
during LiveInstance creation, with setters and getters for LiveInstance.
---
 .../helix/manager/zk/ParticipantManager.java       |   3 +
 .../apache/helix/manager/zk/ZKHelixManager.java    |   6 +-
 .../java/org/apache/helix/model/ClusterConfig.java |  41 ++++++-
 .../org/apache/helix/model/InstanceConfig.java     |  28 ++++-
 .../java/org/apache/helix/model/LiveInstance.java  |  23 +++-
 .../java/org/apache/helix/task/TaskConstants.java  |   7 ++
 .../apache/helix/task/TaskStateModelFactory.java   |  71 ++++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  |  54 +++++++++
 .../helix/task/assigner/AssignableInstance.java    |   4 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |  10 ++
 .../multizk/TestMultiZkHelixJavaApis.java          |   4 +
 .../helix/integration/task/TestTaskThreadLeak.java |   5 +-
 .../helix/manager/zk/TestParticipantManager.java   |  44 ++++++++
 .../helix/manager/zk/TestZkClusterManager.java     |  10 +-
 .../org/apache/helix/model/TestClusterConfig.java  |  24 ++++
 .../org/apache/helix/model/TestInstanceConfig.java |  24 ++++
 .../TestLiveInstance.java}                         |  59 ++++++----
 .../helix/task/TestTaskStateModelFactory.java      | 124 +++++++++++++++++++++
 .../java/org/apache/helix/task/TestTaskUtil.java   | 118 ++++++++++++++++++++
 .../task/assigner/TestAssignableInstance.java      |   8 +-
 20 files changed, 615 insertions(+), 52 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 8bb54ff..8d6a99e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -52,6 +52,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import 
org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -254,6 +255,8 @@ public class ParticipantManager {
     liveInstance.setSessionId(_sessionId);
     liveInstance.setHelixVersion(_manager.getVersion());
     
liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+    liveInstance.setCurrentTaskThreadPoolSize(
+        TaskUtil.getTargetThreadPoolSize(_zkclient, _clusterName, 
_instanceName));
 
     // LiveInstanceInfoProvider liveInstanceInfoProvider = 
_manager._liveInstanceInfoProvider;
     if (_liveInstanceInfoProvider != null) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e2429d7..9b1bfdd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1331,7 +1331,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
    * ZkConnections.
    */
   private RealmAwareZkClient createSingleRealmZkClient() {
-    final String shardingKey = buildShardingKey();
+    final String shardingKey = 
HelixUtil.clusterNameToShardingKey(_clusterName);
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
 
@@ -1390,8 +1390,4 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
 
     return zkClientFactory.buildZkClient(helixZkConnectionConfig, 
helixZkClientConfig);
   }
-
-  private String buildShardingKey() {
-    return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..d149182 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -109,7 +109,14 @@ public class ClusterConfig extends HelixProperty {
     // 
https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    // The target size of task thread pools for each participant. If 
participants specify their
+    // individual pool sizes in their InstanceConfig's, this value will NOT be 
used; if participants
+    // don't specify their individual pool sizes, this value will be used for 
all participants; if
+    // none of participants or the cluster define pool sizes,
+    // TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool 
sizes.
+    GLOBAL_TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -137,6 +144,7 @@ public class ClusterConfig extends HelixProperty {
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = 
true;
+  private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
 
   /**
    * Instantiate for a specific cluster
@@ -710,6 +718,37 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Get the global target size of task thread pools. This values applies to 
all participants in
+   * the cluster; it's only used if participants don't specify their 
individual pool sizes in their
+   * InstanceConfig's. If none of participants or the cluster define pool 
sizes,
+   * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool 
sizes.
+   * @return the global target size of task thread pool
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    return _record
+        
.getIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the global target size of task thread pools for this cluster. This 
values applies to all
+   * participants in the cluster; it's only used if participants don't specify 
their individual
+   * pool sizes in their InstanceConfig's. If none of participants or the 
cluster define pool sizes,
+   * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool 
sizes.
+   * @param globalTargetTaskThreadPoolSize - the new global target task thread 
pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size 
is negative
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int 
globalTargetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (globalTargetTaskThreadPoolSize < 0) {
+      throw new IllegalArgumentException("globalTargetTaskThreadPoolSize must 
be non-negative!");
+    }
+    _record
+        
.setIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            globalTargetTaskThreadPoolSize);
+  }
+
+  /**
    * @return The required Instance Capacity Keys. If not configured, return an 
empty list.
    */
   public List<String> getInstanceCapacityKeys() {
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 24e6154..010d943 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -56,11 +56,13 @@ public class InstanceConfig extends HelixProperty {
     DOMAIN,
     DELAY_REBALANCE_ENABLED,
     MAX_CONCURRENT_TASK,
-    INSTANCE_CAPACITY_MAP
+    INSTANCE_CAPACITY_MAP,
+    TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public static final int WEIGHT_NOT_SET = -1;
   public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
+  private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
 
   private static final Logger _logger = 
LoggerFactory.getLogger(InstanceConfig.class.getName());
 
@@ -507,6 +509,30 @@ public class InstanceConfig extends HelixProperty {
   }
 
   /**
+   * Get the target size of task thread pool.
+   * @return the target size of task thread pool
+   */
+  public int getTargetTaskThreadPoolSize() {
+    return _record
+        
.getIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+            TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the target size of task thread pool.
+   * @param targetTaskThreadPoolSize - the new target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size 
is negative
+   */
+  public void setTargetTaskThreadPoolSize(int targetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (targetTaskThreadPoolSize < 0) {
+      throw new IllegalArgumentException("targetTaskThreadPoolSize must be 
non-negative!");
+    }
+    
_record.setIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+        targetTaskThreadPoolSize);
+  }
+
+  /**
    * Get the instance capacity information from the map fields.
    * @return data map if it exists, or empty map
    */
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java 
b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 74260cc..d9945c7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
 import java.util.Map;
 
 import org.apache.helix.HelixProperty;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,8 @@ public class LiveInstance extends HelixProperty {
     HELIX_VERSION,
     LIVE_INSTANCE,
     ZKPROPERTYTRANSFERURL,
-    RESOURCE_CAPACITY
+    RESOURCE_CAPACITY,
+    CURRENT_TASK_THREAD_POOL_SIZE
   }
 
   /**
@@ -190,6 +192,25 @@ public class LiveInstance extends HelixProperty {
     
_record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), 
url);
   }
 
+  /**
+   * Get the current task thread pool size of the instance. For backward 
compatibility, return
+   * DEFAULT_TASK_THREAD_POOL_SIZE if it's not defined
+   * @return the current task thread pool size
+   */
+  public int getCurrentTaskThreadPoolSize() {
+    return 
_record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  /**
+   * Set the current task thread pool size of the instance
+   * @param currentTaskThreadPoolSize the current task thread pool size
+   */
+  public void setCurrentTaskThreadPoolSize(int currentTaskThreadPoolSize) {
+    
_record.setIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        currentTaskThreadPoolSize);
+  }
+
   @Override
   public boolean isValid() {
     if (getEphemeralOwner() == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index d17e29a..d9745b6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -49,4 +49,11 @@ public class TaskConstants {
   public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
+
+  /**
+   * The default task thread pool size that will be used to create thread 
pools if target thread
+   * pool sizes are not defined in InstanceConfig or ClusterConfig; also used 
as the current thread
+   * pool size default value if the current thread pool size is not defined in 
LiveInstance
+   */
+  public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index f30dd9f..90873d5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -27,12 +28,22 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.JMException;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Factory class for {@link TaskStateModel}.
  */
@@ -44,25 +55,31 @@ public class TaskStateModelFactory extends 
StateModelFactory<TaskStateModel> {
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> 
taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new 
ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, 
Executors.newScheduledThreadPool(TaskUtil
+        .getTargetThreadPoolSize(createZkClient(manager), 
manager.getClusterName(),
+            manager.getInstanceName()), new ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + 
threadId.getAndIncrement());
+      }
+    }));
   }
 
+  // DO NOT USE! This size of provided thread pool will not be reflected to 
controller
+  // properly, the controller may over schedule tasks to this participant. 
Task Framework needs to
+  // have full control of the thread pool unlike the state transition thread 
pool.
+  @Deprecated
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> 
taskFactoryRegistry,
       ScheduledExecutorService taskExecutor) {
     _manager = manager;
     _taskFactoryRegistry = taskFactoryRegistry;
     _taskExecutor = taskExecutor;
+    // TODO: Hunter: I'm not sure why this needs to be a single thread 
executor. We could certainly
+    // use more threads for timer tasks.
     _timerTaskExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
@@ -74,7 +91,7 @@ public class TaskStateModelFactory extends 
StateModelFactory<TaskStateModel> {
         _monitor = new 
ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
             (ThreadPoolExecutor) _taskExecutor);
       } catch (JMException e) {
-        LOG.warn("Error in creating ThreadPoolExecutorMonitor for 
TaskStateModelFactory.");
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for 
TaskStateModelFactory.", e);
       }
     }
   }
@@ -102,4 +119,36 @@ public class TaskStateModelFactory extends 
StateModelFactory<TaskStateModel> {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Create a RealmAwareZkClient to get thread pool sizes
+   */
+  protected static RealmAwareZkClient createZkClient(HelixManager manager) {
+    // TODO: revisit the logic here - we are creating a connection although we 
already have a
+    // manager. We cannot use the connection within manager because some users 
connect the manager
+    // after registering the state model factory (in which case we cannot use 
manager's connection),
+    // and some connect the manager before registering the state model factory 
(in which case we
+    //can use manager's connection). We need to think about the right order 
and determine if we
+    //want to enforce it, which may cause backward incompatibility.
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new 
ZNRecordSerializer());
+
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      String clusterName = manager.getClusterName();
+      String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+          new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+              .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+              .setZkRealmShardingKey(shardingKey).build();
+      try {
+        return new FederatedZkClient(connectionConfig, clientConfig);
+      } catch (IOException | InvalidRoutingDataException | 
IllegalStateException e) {
+        throw new HelixException("Failed to create FederatedZkClient!", e);
+      }
+    }
+
+    return SharedZkClientFactory.getInstance().buildZkClient(
+        new 
HelixZkClient.ZkConnectionConfig(manager.getMetadataStoreConnectionString()),
+        clientConfig.createHelixZkClientConfig().setZkSerializer(new 
ZNRecordSerializer()));
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 917a69b..c67c14c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -30,11 +30,17 @@ import java.util.Set;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -1045,4 +1051,52 @@ public class TaskUtil {
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if InstanceConfig 
doesn't exist or the
+   * value is undefined, try ClusterConfig; if the value is undefined in 
ClusterConfig, fall back
+   * to the default value.
+   * @param zkClient - ZooKeeper connection for config reading
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(RealmAwareZkClient zkClient, 
String clusterName,
+      String instanceName) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+
+    // Check instance config first for thread pool size
+    if (ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName, 
InstanceType.PARTICIPANT)) {
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = 
instanceConfig.getTargetTaskThreadPoolSize();
+        // Reject negative values. The pool size is only negative when it's 
not set in
+        // InstanceConfig, or when the users bypassed the setter logic in 
InstanceConfig. We treat
+        // negative values as the value is not set, and continue with 
ClusterConfig.
+        if (targetTaskThreadPoolSize >= 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. 
Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    }
+
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    if (clusterConfig != null) {
+      int globalTargetTaskThreadPoolSize = 
clusterConfig.getGlobalTargetTaskThreadPoolSize();
+      // Reject negative values. The pool size is only negative when it's not 
set in
+      // ClusterConfig, or when the users bypassed the setter logic in 
ClusterConfig. We treat
+      // negative values as the value is not set, and continue with the 
default value.
+      if (globalTargetTaskThreadPoolSize >= 0) {
+        return globalTargetTaskThreadPoolSize;
+      }
+    } else {
+      LOG.warn("Got null as ClusterConfig for cluster {}. Returning default 
value: {}. ",
+          clusterName, TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index c1f2ef6..194db41 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -28,7 +28,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +113,7 @@ public class AssignableInstance {
     if (resourceCapacity == null) {
       resourceCapacity = new HashMap<>();
       
resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
-          Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
+          Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
       logger.debug("No resource capacity provided in LiveInstance {}, assuming 
default capacity: {}",
           _instanceConfig.getInstanceName(), resourceCapacity);
     }
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 3480fb6..bbf6446 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -87,6 +87,16 @@ public final class HelixUtil {
     return path.substring(path.lastIndexOf('/') + 1);
   }
 
+  /**
+   * Convert a cluster name to a sharding key for routing purpose by adding a 
"/" to the front.
+   * Check if the cluster name already has a "/" at the front; if so just 
return it.
+   * @param clusterName - cluster name
+   * @return the sharding key corresponding the cluster name
+   */
+  public static String clusterNameToShardingKey(String clusterName) {
+    return clusterName.charAt(0) == '/' ? clusterName : "/" + clusterName;
+  }
+
   public static String serializeByComma(List<String> objects) {
     return Joiner.on(",").join(objects);
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
 
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index d4bdcf9..742bb76 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -73,6 +73,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -149,6 +150,9 @@ public class TestMultiZkHelixJavaApis {
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
         "http://"; + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + 
msdsNamespace);
 
+    // HttpRoutingDataReader's routing data may be set by other tests using 
the same endpoint;
+    // reset() for good measure
+    HttpRoutingDataReader.reset();
     // Create a FederatedZkClient for admin work
     _zkClient =
         new FederatedZkClient(new 
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
index 4e3e289..2f6b034 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
@@ -25,8 +25,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.testng.Assert;
@@ -73,7 +73,8 @@ public class TestTaskThreadLeak extends TaskTestBase {
     int threadCountAfter = getThreadCount("TaskStateModelFactory");
 
     Assert.assertTrue(
-        (threadCountAfter - _threadCountBefore) <= 
TaskStateModelFactory.TASK_THREADPOOL_SIZE + 1);
+        (threadCountAfter - _threadCountBefore) <= 
TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE
+            + 1);
   }
 
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
index a03f4f2..1ebe747 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
@@ -28,7 +28,10 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -145,6 +148,47 @@ public class TestParticipantManager extends ZkTestBase {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    // Using a pool sized different from the default value to verify 
correctness
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE 
+ 1;
+
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
+        new 
ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_ADDR).build());
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    final String instanceName = "localhost_12918";
+    final MockParticipantManager manager =
+        new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+    InstanceConfig instanceConfig = 
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    instanceConfig.setTargetTaskThreadPoolSize(testThreadPoolSize);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), 
instanceConfig);
+
+    manager.syncStart();
+
+    final LiveInstance liveInstance = 
accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    Assert.assertNotNull(liveInstance);
+    Assert.assertEquals(liveInstance.getCurrentTaskThreadPoolSize(), 
testThreadPoolSize);
+
+    // Clean up.
+    manager.syncStop();
+    deleteCluster(clusterName);
+  }
+
   /*
    * Mocks PreConnectCallback to insert session expiry during 
ParticipantManager#handleNewSession()
    */
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 9c5b64b..cfbc42d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -167,7 +167,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
         
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_0"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 0);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 0);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 3);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
 
     manager.disconnect();
 
@@ -180,7 +180,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = 
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_1"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
 
     manager.disconnect();
 
@@ -193,7 +193,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = 
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_2"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
 
@@ -208,7 +208,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = 
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     String sessionId = liveInstance.getEphemeralOwner();
@@ -219,7 +219,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = 
accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     Assert.assertFalse(sessionId.equals(liveInstance.getEphemeralOwner()));
diff --git 
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java 
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..ac2763b 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -70,6 +70,30 @@ public class TestClusterConfig {
   }
 
   @Test
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord().setIntField(
+        
ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 
100);
+
+    Assert.assertEquals(testConfig.getGlobalTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        
ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 
-1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetGlobalTargetTaskThreadPoolSizeIllegalArgument() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(-1);
+  }
+
+  @Test
   public void testGetRebalancePreference() {
     Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new 
HashMap<>();
     preference.put(EVENNESS, 5);
diff --git 
a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java 
b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 9b47677..d0b2f58 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -123,4 +123,28 @@ public class TestInstanceConfig {
     InstanceConfig testConfig = new InstanceConfig("testConfig");
     testConfig.setInstanceCapacityMap(capacityDataMap);
   }
+
+  @Test
+  public void testGetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.getRecord().setIntField(
+        
InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        
InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 
100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeIllegalArgument() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(-1);
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
 b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
similarity index 81%
rename from 
helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
rename to helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
index c3aace7..f53f9ec 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
@@ -1,4 +1,4 @@
-package org.apache.helix.manager.zk;
+package org.apache.helix.model;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -35,15 +35,27 @@ import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestZKLiveInstanceData extends ZkUnitTestBase {
+public class TestLiveInstance extends ZkUnitTestBase {
   private final String clusterName = CLUSTER_PREFIX + "_" + 
getShortClassName();
 
+  @BeforeClass()
+  public void beforeClass() throws Exception {
+    _gSetupTool.addCluster(clusterName, true);
+    _gSetupTool
+        .addInstancesToCluster(clusterName, new String[] { "localhost:54321", 
"localhost:54322" });
+  }
+
+  @AfterClass()
+  public void afterClass() throws Exception {
+    deleteCluster(clusterName);
+  }
+
   @Test
   public void testDataChange() throws Exception {
     // Create an admin and add LiveInstanceChange listener to it
@@ -101,23 +113,6 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase 
{
     Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live 
instance");
 
     adminManager.disconnect();
-
-  }
-
-  @BeforeClass()
-  public void beforeClass() throws Exception {
-    _gSetupTool.addCluster(clusterName, true);
-    _gSetupTool
-        .addInstancesToCluster(clusterName, new String[] { "localhost:54321", 
"localhost:54322" });
-  }
-
-  @AfterClass()
-  public void afterClass() throws Exception {
-    deleteCluster(clusterName);
-  }
-
-  private String[] getArgs(String... args) {
-    return args;
   }
 
   private List<LiveInstance> deepCopy(List<LiveInstance> instances) {
@@ -127,4 +122,28 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase 
{
     }
     return result;
   }
+
+  @Test(dependsOnMethods = "testDataChange")
+  public void testGetCurrentTaskThreadPoolSize() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+    testLiveInstance.getRecord()
+        
.setIntField(LiveInstance.LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
 100);
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize")
+  public void testGetCurrentTaskThreadPoolSizeDefault() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 
TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSizeDefault")
+  public void testSetCurrentTaskThreadPoolSize() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+    testLiveInstance.setCurrentTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
new file mode 100644
index 0000000..514256f
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify 
correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    
instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: Refactor all MSDS_SERVER_ENDPOINT creation in system property to 
one place.
+    // Any test that modifies MSDS_SERVER_ENDPOINT system property and accesses
+    // HttpRoutingDataReader (ex. TestMultiZkHelixJavaApis and this test) will 
cause the
+    // MSDS_SERVER_ENDPOINT system property to be recorded as final in 
HttpRoutingDataReader; that
+    // means any test class that satisfies the aforementioned condition and is 
executed first gets
+    // to "decide" the default msds endpoint. The only workaround is for all 
these test classes to
+    // use the same default msds endpoint.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";
+    Map<String, Collection<String>> routingData = new HashMap<>();
+    routingData
+        .put(ZK_ADDR, Collections.singletonList("/" + 
anyParticipantManager.getClusterName()));
+    MockMetadataStoreDirectoryServer msds =
+        new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, 
msdsNamespace, routingData);
+    msds.startServer();
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = 
System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    String prevMsdsServerEndpoint =
+        
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    // Turn on multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // MSDS endpoint: 
http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+        "http://"; + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + 
msdsNamespace);
+
+    HttpRoutingDataReader.reset();
+    RealmAwareZkClient zkClient = 
TaskStateModelFactory.createZkClient(anyParticipantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, 
anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), 
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, 
prevMultiZkEnabled);
+    }
+    if (prevMsdsServerEndpoint == null) {
+      System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+    } else {
+      System.setProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY, 
prevMsdsServerEndpoint);
+    }
+    msds.stopServer();
+  }
+
+  @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
+  public void testConfigAccessorCreationSingleZk() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = 
System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    // Turn off multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
+
+    RealmAwareZkClient zkClient = 
TaskStateModelFactory.createZkClient(anyParticipantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, 
anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), 
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, 
prevMultiZkEnabled);
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..a6bfb69
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,118 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskUtil extends TaskTestBase {
+  // This value has to be different from the default value to verify 
correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testGetTaskThreadPoolSize() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    
instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    ClusterConfig clusterConfig = new 
ClusterConfig(anyParticipantManager.getClusterName());
+    
clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE
 + 1);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), 
clusterConfig);
+
+    
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), 
anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSize")
+  public void testGetTaskThreadPoolSizeInstanceConfigUndefined() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        
InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    ClusterConfig clusterConfig = new 
ClusterConfig(anyParticipantManager.getClusterName());
+    
clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), 
clusterConfig);
+
+    
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), 
anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeInstanceConfigUndefined")
+  public void testGetTaskThreadPoolSizeInstanceConfigDoesNotExist() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    HelixDataAccessor helixDataAccessor = 
anyParticipantManager.getHelixDataAccessor();
+    helixDataAccessor.removeProperty(
+        
helixDataAccessor.keyBuilder().instanceConfig(anyParticipantManager.getInstanceName()));
+
+    ClusterConfig clusterConfig = new 
ClusterConfig(anyParticipantManager.getClusterName());
+    
clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), 
clusterConfig);
+
+    
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), 
anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = 
"testGetTaskThreadPoolSizeInstanceConfigDoesNotExist")
+  public void testGetTaskThreadPoolSizeClusterConfigUndefined() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    ClusterConfig clusterConfig = new 
ClusterConfig(anyParticipantManager.getClusterName());
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), 
clusterConfig);
+
+    
Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), 
anyParticipantManager.getInstanceName()),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeClusterConfigUndefined", 
expectedExceptions = HelixException.class)
+  public void testGetTaskThreadPoolSizeClusterConfigDoesNotExist() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    HelixDataAccessor helixDataAccessor = 
anyParticipantManager.getHelixDataAccessor();
+    
helixDataAccessor.removeProperty(helixDataAccessor.keyBuilder().clusterConfig());
+    TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), 
anyParticipantManager.getInstanceName());
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 0e8b1d0..1dac153 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -26,7 +26,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
@@ -66,7 +66,7 @@ public class TestAssignableInstance extends AssignerTestBase {
     Assert.assertEquals(
         (int) 
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
             .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
 
@@ -106,7 +106,7 @@ public class TestAssignableInstance extends 
AssignerTestBase {
         testQuotaTypes.length);
     Assert.assertEquals(
         
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-        
calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, 
testQuotaTypes,
+        
calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE, 
testQuotaTypes,
             testQuotaRatio));
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
@@ -176,7 +176,7 @@ public class TestAssignableInstance extends 
AssignerTestBase {
 
     // When nothing is configured, we should use default quota type to assign
     Map<String, TaskAssignResult> results = new HashMap<>();
-    for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
+    for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
       String taskId = Integer.toString(i);
       TaskConfig task = new TaskConfig("", null, taskId, null);
       TaskAssignResult result = ai.tryAssign(task, 
AssignableInstance.DEFAULT_QUOTA_TYPE);

Reply via email to