Repository: helix Updated Branches: refs/heads/master 047ad51e8 -> cc625065b
[HELIX-735] Make AssignmentCalculators non-static so that tests pass With the introduction of quota-based scheduling, every task that gets scheduled takes up a thread. However, previously these AssignmentCalculators (both generic and fixed for generic jobs and targeted jobs) were stateless so they were instantiated statically. Since AssignmentCalculators now are stateful due to them operating on AssignableInstances' quota profile, they were made non-static so that they would be re-instantiated every pipeline. This problem is specific to the testing environment where static variables live on from test to test, causing AssignmentCalculators to hold on to the very first reference to AssignableInstanceManager. Tasks were not being assigned and scheduled because the first set of AssignableInstances would get filled up and never get freed. Changelist: 1. Make AssignmentCalculators non-static 2. Adjust sleep duration for some tests for stability Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cc625065 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cc625065 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cc625065 Branch: refs/heads/master Commit: cc625065bffced9a66566eeccb3055ec28a74611 Parents: 047ad51 Author: Hunter Lee <[email protected]> Authored: Mon Jul 16 15:48:38 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Tue Jul 17 11:46:04 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/task/JobRebalancer.java | 15 +++------- .../task/TestQuotaBasedScheduling.java | 31 ++++++++++++++++---- .../integration/task/TestTaskThrottling.java | 4 +-- ...signableInstanceManagerControllerSwitch.java | 16 +++++----- 4 files changed, 40 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/cc625065/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index c49a365..ddda41a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -52,9 +52,6 @@ import com.google.common.collect.ImmutableMap; */ public class JobRebalancer extends TaskRebalancer { private static final Logger LOG = LoggerFactory.getLogger(JobRebalancer.class); - private static TaskAssignmentCalculator _fixTaskAssignmentCal; - private static TaskAssignmentCalculator _threadCountBasedTaskAssignmentCal; - private static final String PREV_RA_NODE = "PreviousResourceAssignment"; @Override @@ -434,14 +431,10 @@ public class JobRebalancer extends TaskRebalancer { private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig, ClusterDataCache cache) { AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager(); - if (_threadCountBasedTaskAssignmentCal == null) { - _threadCountBasedTaskAssignmentCal = new ThreadCountBasedTaskAssignmentCalculator( - new ThreadCountBasedTaskAssigner(), assignableInstanceManager); - } - if (_fixTaskAssignmentCal == null) { - _fixTaskAssignmentCal = new FixedTargetTaskAssignmentCalculator(assignableInstanceManager); + if (TaskUtil.isGenericTaskJob(jobConfig)) { + return new ThreadCountBasedTaskAssignmentCalculator(new ThreadCountBasedTaskAssigner(), + assignableInstanceManager); } - return TaskUtil.isGenericTaskJob(jobConfig) ? _threadCountBasedTaskAssignmentCal - : _fixTaskAssignmentCal; + return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/cc625065/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java index 7f25693..8dd24db 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java @@ -51,11 +51,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class TestQuotaBasedScheduling extends TaskTestBase { - private static final long LONG_RUNNING_TASK_DURATION = 100000L; private static final String DEFAULT_QUOTA_TYPE = "DEFAULT"; private static final String JOB_COMMAND = "DummyCommand"; private Map<String, String> _jobCommandMap; private Map<String, Integer> _quotaTypeExecutionCount = new ConcurrentHashMap<>(); + private boolean _finishTask = false; @BeforeClass public void beforeClass() throws Exception { @@ -129,6 +129,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { @BeforeMethod public void beforeMethod() { _quotaTypeExecutionCount.clear(); + _finishTask = false; } /** @@ -280,6 +281,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase { TaskState jobState = _driver.getWorkflowContext(workflowName).getJobState(workflowName + "_JOB_C"); Assert.assertEquals(jobState, TaskState.IN_PROGRESS); + + // Finish rest of the tasks + _finishTask = true; + Thread.sleep(2000L); } /** @@ -351,6 +356,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase { // due to thread pool saturation TaskState secondWorkflowState = _driver.getWorkflowContext("secondWorkflow").getWorkflowState(); Assert.assertEquals(secondWorkflowState, TaskState.IN_PROGRESS); + + // Finish rest of the tasks + _finishTask = true; + Thread.sleep(2000L); } /** @@ -409,6 +418,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase { * Tests that by repeatedly scheduling workflows and jobs that there is no thread leak when there * are a multidude of successful and failed tests. The number of total tasks run must be well * above the number of total thread capacity. + * Note: disabled because this is holding up mvn test due to its job/task load. * @throws InterruptedException */ @Test(dependsOnMethods = "testSchedulingWithoutQuota") @@ -452,6 +462,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase { Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowName).getWorkflowState(), state); } + + // Finish rest of the tasks + _finishTask = true; + Thread.sleep(2000L); } /** @@ -521,6 +535,10 @@ public class TestQuotaBasedScheduling extends TaskTestBase { Assert.assertEquals((int) _quotaTypeExecutionCount.get("A"), 5); Assert.assertEquals((int) _quotaTypeExecutionCount.get("B"), 5); Assert.assertFalse(_quotaTypeExecutionCount.containsKey(DEFAULT_QUOTA_TYPE)); + + // Finish rest of the tasks + _finishTask = true; + Thread.sleep(2000L); } /** @@ -605,10 +623,13 @@ public class TestQuotaBasedScheduling extends TaskTestBase { if (_quotaType != null) { _quotaTypeExecutionCount.put(_quotaType, _quotaTypeExecutionCount.get(_quotaType) + 1); } - try { - Thread.sleep(LONG_RUNNING_TASK_DURATION); - } catch (InterruptedException e) { - e.printStackTrace(); + // Only take long if finishTask is false + while (!_finishTask) { + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + e.printStackTrace(); + } } return new TaskResult(TaskResult.Status.COMPLETED, generateInfoMessageForDebugging(_instanceName, _quotaType)); http://git-wip-us.apache.org/repos/asf/helix/blob/cc625065/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java index d25397a..592feda 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java @@ -63,7 +63,7 @@ public class TestTaskThrottling extends TaskTestBase { _driver.pollForJobState(flow.getName(), TaskUtil.getNamespacedJobName(flow.getName(), jobName1), TaskState.IN_PROGRESS); // Wait for tasks to be picked up - Thread.sleep(1500); + Thread.sleep(2000); Assert.assertEquals(countRunningPartition(flow, jobName1), numTasks); @@ -180,4 +180,4 @@ public class TestTaskThrottling extends TaskTestBase { .setInstanceConfig(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + i), instanceConfig); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/cc625065/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java index ca6085e..21e36e0 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java @@ -18,6 +18,7 @@ package org.apache.helix.task; * specific language governing permissions and limitations * under the License. */ + import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collection; @@ -45,6 +46,12 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase private int numJobs = 2; private int numTasks = 3; + /** + * Tests the duality of two AssignableInstanceManager instances to model the + * situation where there is a Controller switch and AssignableInstanceManager is + * built back from scratch. + * @throws InterruptedException + */ @Test public void testControllerSwitch() throws InterruptedException { setupAndRunJobs(); @@ -71,7 +78,7 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true); // Wait for the job pipeline - Thread.sleep(100); + Thread.sleep(1000); taskDataCache.refresh(accessor, resourceConfigMap); // Create prev manager and build @@ -83,13 +90,6 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase Map<String, TaskAssignResult> prevTaskAssignResultMap = new HashMap<>(prevAssignableInstanceManager.getTaskAssignResultMap()); - // Stop the current controller - _controller.syncStop(); - // Start a new controller - String newControllerName = CONTROLLER_PREFIX + "_2"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, newControllerName); - _controller.syncStart(); - // Generate a new AssignableInstanceManager taskDataCache.refresh(accessor, resourceConfigMap); AssignableInstanceManager newAssignableInstanceManager = new AssignableInstanceManager();
