LGTM, thanks
On Wed, Jun 11, 2014 at 6:05 PM, 'Klaus Aehlig' via ganeti-devel < [email protected]> wrote: > Given that the Python job queue now only handles a single job, > there is no need to keep track on whether it accepts new jobs. > Hence remove that code. Also remove the corresponding tests. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/jqueue/__init__.py | 40 -------- > test/py/ganeti.jqueue_unittest.py | 190 > -------------------------------------- > 2 files changed, 230 deletions(-) > > diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py > index 0258f9d..1f61d58 100644 > --- a/lib/jqueue/__init__.py > +++ b/lib/jqueue/__init__.py > @@ -83,12 +83,6 @@ class CancelJob(Exception): > """ > > > -class QueueShutdown(Exception): > - """Special exception to abort a job when the job queue is shutting down. > - > - """ > - > - > def TimeStampNow(): > """Returns the current timestamp. > > @@ -612,11 +606,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): > logging.debug("Canceling opcode") > raise CancelJob() > > - # See if queue is shutting down > - if not self._queue.AcceptingJobsUnlocked(): > - logging.debug("Queue is shutting down") > - raise QueueShutdown() > - > @locking.ssynchronized(_QUEUE, shared=1) > def NotifyStart(self): > """Mark the opcode as running, not lock-waiting. > @@ -1168,10 +1157,6 @@ class _JobProcessor(object): > if op.status == constants.OP_STATUS_CANCELING: > return (constants.OP_STATUS_CANCELING, None) > > - # Queue is shutting down, return to queued > - if not self.queue.AcceptingJobsUnlocked(): > - return (constants.OP_STATUS_QUEUED, None) > - > # Stay in waitlock while trying to re-acquire lock > return (constants.OP_STATUS_WAITING, None) > except CancelJob: > @@ -1179,14 +1164,6 @@ class _JobProcessor(object): > assert op.status == constants.OP_STATUS_CANCELING > return (constants.OP_STATUS_CANCELING, None) > > - except QueueShutdown: > - logging.exception("%s: Queue is shutting down", opctx.log_prefix) > - > - assert op.status == constants.OP_STATUS_WAITING > - > - # Job hadn't been started yet, so it should return to the queue > - return (constants.OP_STATUS_QUEUED, None) > - > except Exception, err: # pylint: disable=W0703 > logging.exception("%s: Caught exception in %s", > opctx.log_prefix, opctx.summary) > @@ -1665,9 +1642,6 @@ class JobQueue(object): > self.acquire = self._lock.acquire > self.release = self._lock.release > > - # Accept jobs by default > - self._accepting_jobs = True > - > # Read serial file > self._last_serial = jstore.ReadSerial() > assert self._last_serial is not None, ("Serial file was modified > between" > @@ -2512,22 +2486,8 @@ class JobQueue(object): > @return: Whether there are any running jobs > > """ > - if self._accepting_jobs: > - self._accepting_jobs = False > - > return self._wpool.HasRunningTasks() > > - def AcceptingJobsUnlocked(self): > - """Returns whether jobs are accepted. > - > - Once L{PrepareShutdown} has been called, no new jobs are accepted and > the > - queue is shutting down. > - > - @rtype: bool > - > - """ > - return self._accepting_jobs > - > @locking.ssynchronized(_LOCK) > def Shutdown(self): > """Stops the job queue. > diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/ > ganeti.jqueue_unittest.py > index 1aa3ec9..7ff14a0 100755 > --- a/test/py/ganeti.jqueue_unittest.py > +++ b/test/py/ganeti.jqueue_unittest.py > @@ -715,7 +715,6 @@ class _FakeQueueForProc: > self._acquired = False > self._updates = [] > self._submitted = [] > - self._accepting_jobs = True > > self._submit_count = itertools.count(1000) > > @@ -751,12 +750,6 @@ class _FakeQueueForProc: > self._submitted.extend(zip(job_ids, jobs)) > return job_ids > > - def StopAcceptingJobs(self): > - self._accepting_jobs = False > - > - def AcceptingJobsUnlocked(self): > - return self._accepting_jobs > - > > class _FakeExecOpCodeForProc: > def __init__(self, queue, before_start, after_start): > @@ -1154,10 +1147,6 @@ class TestJobProcessor(unittest.TestCase, > _JobProcessorTestUtils): > > self._TestCancelWhileSomething(fn) > > - def testCancelDuringQueueShutdown(self): > - queue = self._TestCancelWhileSomething(lambda q: > q.StopAcceptingJobs()) > - self.assertFalse(queue.AcceptingJobsUnlocked()) > - > def testCancelWhileRunning(self): > # Tests canceling a job with finished opcodes and more, unprocessed > ones > queue = _FakeQueueForProc() > @@ -1204,185 +1193,6 @@ class TestJobProcessor(unittest.TestCase, > _JobProcessorTestUtils): > ["Res0", "Job canceled by request", > "Job canceled by request"]]) > > - def _TestQueueShutdown(self, queue, opexec, job, runcount): > - self.assertTrue(queue.AcceptingJobsUnlocked()) > - > - # Simulate shutdown > - queue.StopAcceptingJobs() > - > - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), > - jqueue._JobProcessor.DEFER) > - > - # Check result > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - self.assertEqual(job.GetInfo(["status"]), > [constants.JOB_STATUS_QUEUED]) > - self.assertFalse(job.cur_opctx) > - self.assertTrue(job.start_timestamp) > - self.assertFalse(job.end_timestamp) > - self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) > - self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp > - for op in job.ops[:runcount])) > - self.assertFalse(job.ops[runcount].end_timestamp) > - self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp > - for op in job.ops[(runcount + 1):])) > - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), > - [(([constants.OP_STATUS_SUCCESS] * runcount) + > - ([constants.OP_STATUS_QUEUED] * > - (len(job.ops) - runcount))), > - (["Res%s" % i for i in range(runcount)] + > - ([None] * (len(job.ops) - runcount)))]) > - > - # Must have been written and replicated > - self.assertEqual(queue.GetNextUpdate(), (job, True)) > - self.assertRaises(IndexError, queue.GetNextUpdate) > - > - def testQueueShutdownWhileRunning(self): > - # Tests shutting down the queue while a job is running > - queue = _FakeQueueForProc() > - > - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) > - for i in range(3)] > - > - # Create job > - job_id = 2718211587 > - job = self._CreateJob(queue, job_id, ops) > - > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - > - opexec = _FakeExecOpCodeForProc(queue, None, None) > - > - self.assertRaises(IndexError, queue.GetNextUpdate) > - > - # Run one opcode > - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), > - jqueue._JobProcessor.DEFER) > - > - # Job goes back to queued > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), > - [[constants.OP_STATUS_SUCCESS, > - constants.OP_STATUS_QUEUED, > - constants.OP_STATUS_QUEUED], > - ["Res0", None, None]]) > - self.assertFalse(job.cur_opctx) > - > - # Writes for waiting, running and result > - for _ in range(3): > - self.assertEqual(queue.GetNextUpdate(), (job, True)) > - > - # Run second opcode > - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), > - jqueue._JobProcessor.DEFER) > - > - # Job goes back to queued > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), > - [[constants.OP_STATUS_SUCCESS, > - constants.OP_STATUS_SUCCESS, > - constants.OP_STATUS_QUEUED], > - ["Res0", "Res1", None]]) > - self.assertFalse(job.cur_opctx) > - > - # Writes for waiting, running and result > - for _ in range(3): > - self.assertEqual(queue.GetNextUpdate(), (job, True)) > - > - self._TestQueueShutdown(queue, opexec, job, 2) > - > - def testQueueShutdownWithLockTimeout(self): > - # Tests shutting down while a lock acquire times out > - queue = _FakeQueueForProc() > - > - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) > - for i in range(3)] > - > - # Create job > - job_id = 1304231178 > - job = self._CreateJob(queue, job_id, ops) > - > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - > - acquire_timeout = False > - > - def _BeforeStart(timeout, priority): > - self.assertFalse(queue.IsAcquired()) > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) > - if acquire_timeout: > - raise mcpu.LockAcquireTimeout() > - > - opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None) > - > - self.assertRaises(IndexError, queue.GetNextUpdate) > - > - # Run one opcode > - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), > - jqueue._JobProcessor.DEFER) > - > - # Job goes back to queued > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), > - [[constants.OP_STATUS_SUCCESS, > - constants.OP_STATUS_QUEUED, > - constants.OP_STATUS_QUEUED], > - ["Res0", None, None]]) > - self.assertFalse(job.cur_opctx) > - > - # Writes for waiting, running and result > - for _ in range(3): > - self.assertEqual(queue.GetNextUpdate(), (job, True)) > - > - # The next opcode should have expiring lock acquires > - acquire_timeout = True > - > - self._TestQueueShutdown(queue, opexec, job, 1) > - > - def testQueueShutdownWhileInQueue(self): > - # This should never happen in reality (no new jobs are started by the > - # workerpool once a shutdown has been initiated), but it's better to > test > - # the job processor for this scenario > - queue = _FakeQueueForProc() > - > - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) > - for i in range(5)] > - > - # Create job > - job_id = 2031 > - job = self._CreateJob(queue, job_id, ops) > - > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - self.assertRaises(IndexError, queue.GetNextUpdate) > - > - self.assertFalse(job.start_timestamp) > - self.assertFalse(job.end_timestamp) > - self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED > - for op in job.ops)) > - > - opexec = _FakeExecOpCodeForProc(queue, None, None) > - self._TestQueueShutdown(queue, opexec, job, 0) > - > - def testQueueShutdownWhileWaitlockInQueue(self): > - queue = _FakeQueueForProc() > - > - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) > - for i in range(5)] > - > - # Create job > - job_id = 53125685 > - job = self._CreateJob(queue, job_id, ops) > - > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > - > - job.ops[0].status = constants.OP_STATUS_WAITING > - > - assert len(job.ops) == 5 > - > - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) > - > - self.assertRaises(IndexError, queue.GetNextUpdate) > - > - opexec = _FakeExecOpCodeForProc(queue, None, None) > - self._TestQueueShutdown(queue, opexec, job, 0) > - > def testPartiallyRun(self): > # Tests calling the processor on a job that's been partially run > before the > # program was restarted > -- > 2.0.0.526.g5318336 > >
