LGTM
On Wed, Jun 11, 2014 at 6:05 PM, 'Klaus Aehlig' via ganeti-devel < [email protected]> wrote: > Given that now each process only runs a single job, there is no need > anymore to keep track about the worker pool being active. Hence drop > the corresponding code. > > The only active use of the SetActive functionality is in the tests; > these tests, however, test priority handling in the worker pool, > which is not that important for single-worker worker pools anyway. > So we just remove the tests. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/jqueue/__init__.py | 11 +- > lib/workerpool.py | 25 +--- > test/py/ganeti.workerpool_unittest.py | 250 > ---------------------------------- > 3 files changed, 4 insertions(+), 282 deletions(-) > > diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py > index 89a745c..0258f9d 100644 > --- a/lib/jqueue/__init__.py > +++ b/lib/jqueue/__init__.py > @@ -2504,11 +2504,9 @@ class JobQueue(object): > def PrepareShutdown(self): > """Prepare to stop the job queue. > > - Disables execution of jobs in the workerpool and returns whether > there are > - any jobs currently running. If the latter is the case, the job queue > is not > - yet ready for shutdown. Once this function returns C{True} > L{Shutdown} can > - be called without interfering with any job. Queued and unfinished > jobs will > - be resumed next time. > + Returns whether there are any jobs currently running. If the latter > is the > + case, the job queue is not yet ready for shutdown. Once this function > + returns C{True} L{Shutdown} can be called without interfering with > any job. > > @rtype: bool > @return: Whether there are any running jobs > @@ -2517,9 +2515,6 @@ class JobQueue(object): > if self._accepting_jobs: > self._accepting_jobs = False > > - # Tell worker pool to stop processing pending tasks > - self._wpool.SetActive(False) > - > return self._wpool.HasRunningTasks() > > def AcceptingJobsUnlocked(self): > diff --git a/lib/workerpool.py b/lib/workerpool.py > index 6b558ce..734530a 100644 > --- a/lib/workerpool.py > +++ b/lib/workerpool.py > @@ -281,7 +281,6 @@ class WorkerPool(object): > self._last_worker_id = 0 > self._workers = [] > self._quiescing = False > - self._active = True > > # Terminating workers > self._termworkers = [] > @@ -447,28 +446,6 @@ class WorkerPool(object): > finally: > self._lock.release() > > - def SetActive(self, active): > - """Enable/disable processing of tasks. > - > - This is different from L{Quiesce} in the sense that this function just > - changes an internal flag and doesn't wait for the queue to be empty. > Tasks > - already being processed continue normally, but no new tasks will be > - started. New tasks can still be added. > - > - @type active: bool > - @param active: Whether tasks should be processed > - > - """ > - self._lock.acquire() > - try: > - self._active = active > - > - if active: > - # Tell all workers to continue processing > - self._pool_to_worker.notifyAll() > - finally: > - self._lock.release() > - > def _WaitForTaskUnlocked(self, worker): > """Waits for a task for a worker. > > @@ -481,7 +458,7 @@ class WorkerPool(object): > return _TERMINATE > > # If there's a pending task, return it immediately > - if self._active and self._tasks: > + if self._tasks: > # Get task from queue and tell pool about it > try: > task = heapq.heappop(self._tasks) > diff --git a/test/py/ganeti.workerpool_unittest.py b/test/py/ > ganeti.workerpool_unittest.py > index 8f35a69..a2cc476 100755 > --- a/test/py/ganeti.workerpool_unittest.py > +++ b/test/py/ganeti.workerpool_unittest.py > @@ -196,53 +196,6 @@ class TestWorkerpool(unittest.TestCase): > wp.TerminateWorkers() > self._CheckWorkerCount(wp, 0) > > - def testActive(self): > - ctx = CountingContext() > - wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) > - try: > - self._CheckWorkerCount(wp, 5) > - self.assertTrue(wp._active) > - > - # Process some tasks > - for _ in range(10): > - wp.AddTask((ctx, None)) > - > - wp.Quiesce() > - self._CheckNoTasks(wp) > - self.assertEquals(ctx.GetDoneTasks(), 10) > - > - # Repeat a few times > - for count in range(10): > - # Deactivate pool > - wp.SetActive(False) > - self._CheckNoTasks(wp) > - > - # Queue some more tasks > - for _ in range(10): > - wp.AddTask((ctx, None)) > - > - for _ in range(5): > - # Short delays to give other threads a chance to cause breakage > - time.sleep(.01) > - wp.AddTask((ctx, "Hello world %s" % 999)) > - self.assertFalse(wp._active) > - > - self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) > - > - # Start processing again > - wp.SetActive(True) > - self.assertTrue(wp._active) > - > - # Wait for tasks to finish > - wp.Quiesce() > - self._CheckNoTasks(wp) > - self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) > - > - self._CheckWorkerCount(wp, 5) > - finally: > - wp.TerminateWorkers() > - self._CheckWorkerCount(wp, 0) > - > def testChecksum(self): > # Tests whether all tasks are run and, since we're only using a single > # thread, whether everything is started in order. > @@ -523,209 +476,6 @@ class TestWorkerpool(unittest.TestCase): > wp.TerminateWorkers() > self._CheckWorkerCount(wp, 0) > > - def testChangeTaskPriority(self): > - wp = workerpool.WorkerPool("Test", 1, PriorityWorker) > - try: > - self._CheckWorkerCount(wp, 1) > - > - ctx = PriorityContext() > - > - # Use static seed for this test > - rnd = random.Random(4727) > - > - # Disable processing of tasks > - wp.SetActive(False) > - > - # No task ID > - self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, > - None, 0) > - > - # Pre-generate task IDs and priorities > - count = 100 > - task_ids = range(0, count) > - priorities = range(200, 200 + count) * 2 > - > - rnd.shuffle(task_ids) > - rnd.shuffle(priorities) > - > - # Make sure there are some duplicate priorities, but not all > - priorities[count * 2 - 10:count * 2 - 1] = \ > - priorities[count - 10: count - 1] > - > - assert len(priorities) == 2 * count > - assert priorities[0:(count - 1)] != priorities[count:(2 * count - > 1)] > - > - # Add some tasks; this loop consumes the first half of all > previously > - # generated priorities > - for (idx, task_id) in enumerate(task_ids): > - wp.AddTask((ctx, idx), > - priority=priorities.pop(), > - task_id=task_id) > - > - self.assertEqual(len(wp._tasks), len(task_ids)) > - self.assertEqual(len(wp._taskdata), len(task_ids)) > - > - # Tasks have been added, so half of the priorities should have been > - # consumed > - assert len(priorities) == len(task_ids) > - > - # Change task priority > - expected = [] > - for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities): > - wp.ChangeTaskPriority(task_id, prio) > - expected.append((prio, idx)) > - > - self.assertEqual(len(wp._taskdata), len(task_ids)) > - > - # Half the entries are now abandoned tasks > - self.assertEqual(len(wp._tasks), len(task_ids) * 2) > - > - assert len(priorities) == count > - assert len(task_ids) == count > - > - # Start processing > - wp.SetActive(True) > - > - # Wait for tasks to finish > - wp.Quiesce() > - > - self._CheckNoTasks(wp) > - > - for task_id in task_ids: > - # All tasks are done > - self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, > - task_id, 0) > - > - # Check result > - ctx.lock.acquire() > - try: > - self.assertEqual(ctx.result, sorted(expected)) > - finally: > - ctx.lock.release() > - > - self._CheckWorkerCount(wp, 1) > - finally: > - wp.TerminateWorkers() > - self._CheckWorkerCount(wp, 0) > - > - def testChangeTaskPriorityInteralStructures(self): > - wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker) > - try: > - self._CheckWorkerCount(wp, 1) > - > - # Use static seed for this test > - rnd = random.Random(643) > - > - (num1, num2) = rnd.sample(range(1000), 2) > - > - # Disable processing of tasks > - wp.SetActive(False) > - > - self.assertFalse(wp._tasks) > - self.assertFalse(wp._taskdata) > - > - # No priority or task ID > - wp.AddTask(()) > - self.assertEqual(wp._tasks, [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - ]) > - self.assertFalse(wp._taskdata) > - > - # No task ID > - wp.AddTask((), priority=7413) > - self.assertEqual(wp._tasks, [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [7413, 1, None, ()], > - ]) > - self.assertFalse(wp._taskdata) > - > - # Start adding real tasks > - wp.AddTask((), priority=10267659, task_id=num1) > - self.assertEqual(wp._tasks, [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [7413, 1, None, ()], > - [10267659, 2, num1, ()], > - ]) > - self.assertEqual(wp._taskdata, { > - num1: [10267659, 2, num1, ()], > - }) > - > - wp.AddTask((), priority=123, task_id=num2) > - self.assertEqual(sorted(wp._tasks), [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [123, 3, num2, ()], > - [7413, 1, None, ()], > - [10267659, 2, num1, ()], > - ]) > - self.assertEqual(wp._taskdata, { > - num1: [10267659, 2, num1, ()], > - num2: [123, 3, num2, ()], > - }) > - > - wp.ChangeTaskPriority(num1, 100) > - self.assertEqual(sorted(wp._tasks), [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [100, 2, num1, ()], > - [123, 3, num2, ()], > - [7413, 1, None, ()], > - [10267659, 2, num1, None], > - ]) > - self.assertEqual(wp._taskdata, { > - num1: [100, 2, num1, ()], > - num2: [123, 3, num2, ()], > - }) > - > - wp.ChangeTaskPriority(num2, 91337) > - self.assertEqual(sorted(wp._tasks), [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [100, 2, num1, ()], > - [123, 3, num2, None], > - [7413, 1, None, ()], > - [91337, 3, num2, ()], > - [10267659, 2, num1, None], > - ]) > - self.assertEqual(wp._taskdata, { > - num1: [100, 2, num1, ()], > - num2: [91337, 3, num2, ()], > - }) > - > - wp.ChangeTaskPriority(num1, 10139) > - self.assertEqual(sorted(wp._tasks), [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [100, 2, num1, None], > - [123, 3, num2, None], > - [7413, 1, None, ()], > - [10139, 2, num1, ()], > - [91337, 3, num2, ()], > - [10267659, 2, num1, None], > - ]) > - self.assertEqual(wp._taskdata, { > - num1: [10139, 2, num1, ()], > - num2: [91337, 3, num2, ()], > - }) > - > - # Change to the same priority once again > - wp.ChangeTaskPriority(num1, 10139) > - self.assertEqual(sorted(wp._tasks), [ > - [workerpool._DEFAULT_PRIORITY, 0, None, ()], > - [100, 2, num1, None], > - [123, 3, num2, None], > - [7413, 1, None, ()], > - [10139, 2, num1, None], > - [10139, 2, num1, ()], > - [91337, 3, num2, ()], > - [10267659, 2, num1, None], > - ]) > - self.assertEqual(wp._taskdata, { > - num1: [10139, 2, num1, ()], > - num2: [91337, 3, num2, ()], > - }) > - > - self._CheckWorkerCount(wp, 1) > - finally: > - wp.TerminateWorkers() > - self._CheckWorkerCount(wp, 0) > - > > if __name__ == "__main__": > testutils.GanetiTestProgram() > -- > 2.0.0.526.g5318336 > >
