This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/task_pool by this push:
new cb8fb76 Modify AssignableInstance for Configurable Thread Pool Size
(#1009)
cb8fb76 is described below
commit cb8fb76b8a58d86a03be3ee1a5a29be275e83e1a
Author: Neal Sun <[email protected]>
AuthorDate: Fri May 15 14:26:56 2020 -0700
Modify AssignableInstance for Configurable Thread Pool Size (#1009)
AssignableInstance used to assign a default value of 40 to its resource
capacity (resource capacity has only one field: TASK_EXEC_THREAD). With the
recent change related to configurable thread pool size, AssignableInstance
should respect the reported thread pool size in LiveInstance's.
---
.../helix/task/assigner/AssignableInstance.java | 2 +-
.../task/assigner/TestAssignableInstance.java | 28 +++++++++++++++-------
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 194db41..67199a3 100644
---
a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++
b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -113,7 +113,7 @@ public class AssignableInstance {
if (resourceCapacity == null) {
resourceCapacity = new HashMap<>();
resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
- Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
+ Integer.toString(_liveInstance.getCurrentTaskThreadPoolSize()));
logger.debug("No resource capacity provided in LiveInstance {}, assuming
default capacity: {}",
_instanceConfig.getInstanceName(), resourceCapacity);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 1dac153..fbbf06e 100644
---
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -55,18 +55,20 @@ public class TestAssignableInstance extends
AssignerTestBase {
@Test
public void testInitializationWithQuotaUnset() {
+ int expectedCurrentTaskThreadPoolSize = 100;
+ LiveInstance liveInstance = createLiveInstance(null, null);
+
liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
// Initialize AssignableInstance with neither resource capacity nor quota
ratio provided
AssignableInstance ai = new AssignableInstance(createClusterConfig(null,
null, false),
- new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+ new InstanceConfig(testInstanceName), liveInstance);
Assert.assertEquals(ai.getUsedCapacity().size(), 1);
Assert.assertEquals(
(int)
ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
- .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- 0);
+ .get(AssignableInstance.DEFAULT_QUOTA_TYPE), 0);
Assert.assertEquals(
(int)
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
- .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
expectedCurrentTaskThreadPoolSize);
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@@ -91,10 +93,14 @@ public class TestAssignableInstance extends
AssignerTestBase {
@Test
public void testInitializationWithOnlyQuotaType() {
+ int expectedCurrentTaskThreadPoolSize = 100;
+ LiveInstance liveInstance = createLiveInstance(null, null);
+
liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
// Initialize AssignableInstance with only quota type provided
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes,
testQuotaRatio, false),
- new InstanceConfig(testInstanceName), createLiveInstance(null,
null));
+ new InstanceConfig(testInstanceName), liveInstance);
Assert.assertEquals(ai.getTotalCapacity().size(), 1);
Assert.assertEquals(ai.getUsedCapacity().size(), 1);
@@ -106,7 +112,7 @@ public class TestAssignableInstance extends
AssignerTestBase {
testQuotaTypes.length);
Assert.assertEquals(
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-
calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE,
testQuotaTypes,
+ calculateExpectedQuotaPerType(expectedCurrentTaskThreadPoolSize,
testQuotaTypes,
testQuotaRatio));
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@@ -171,12 +177,16 @@ public class TestAssignableInstance extends
AssignerTestBase {
@Test
public void testNormalTryAssign() {
+ int testCurrentTaskThreadPoolSize = 100;
+ LiveInstance liveInstance = createLiveInstance(null, null);
+ liveInstance.setCurrentTaskThreadPoolSize(testCurrentTaskThreadPoolSize);
+
AssignableInstance ai = new AssignableInstance(createClusterConfig(null,
null, true),
- new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+ new InstanceConfig(testInstanceName), liveInstance);
// When nothing is configured, we should use default quota type to assign
Map<String, TaskAssignResult> results = new HashMap<>();
- for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
+ for (int i = 0; i < testCurrentTaskThreadPoolSize; i++) {
String taskId = Integer.toString(i);
TaskConfig task = new TaskConfig("", null, taskId, null);
TaskAssignResult result = ai.tryAssign(task,
AssignableInstance.DEFAULT_QUOTA_TYPE);