LGTM, thanks

On Wed, Jun 11, 2014 at 6:05 PM, 'Klaus Aehlig' via ganeti-devel <
[email protected]> wrote:

> Given that the Python job queue now only handles a single job,
> there is no need to keep track on whether it accepts new jobs.
> Hence remove that code. Also remove the corresponding tests.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/jqueue/__init__.py            |  40 --------
>  test/py/ganeti.jqueue_unittest.py | 190
> --------------------------------------
>  2 files changed, 230 deletions(-)
>
> diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
> index 0258f9d..1f61d58 100644
> --- a/lib/jqueue/__init__.py
> +++ b/lib/jqueue/__init__.py
> @@ -83,12 +83,6 @@ class CancelJob(Exception):
>    """
>
>
> -class QueueShutdown(Exception):
> -  """Special exception to abort a job when the job queue is shutting down.
> -
> -  """
> -
> -
>  def TimeStampNow():
>    """Returns the current timestamp.
>
> @@ -612,11 +606,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
>        logging.debug("Canceling opcode")
>        raise CancelJob()
>
> -    # See if queue is shutting down
> -    if not self._queue.AcceptingJobsUnlocked():
> -      logging.debug("Queue is shutting down")
> -      raise QueueShutdown()
> -
>    @locking.ssynchronized(_QUEUE, shared=1)
>    def NotifyStart(self):
>      """Mark the opcode as running, not lock-waiting.
> @@ -1168,10 +1157,6 @@ class _JobProcessor(object):
>        if op.status == constants.OP_STATUS_CANCELING:
>          return (constants.OP_STATUS_CANCELING, None)
>
> -      # Queue is shutting down, return to queued
> -      if not self.queue.AcceptingJobsUnlocked():
> -        return (constants.OP_STATUS_QUEUED, None)
> -
>        # Stay in waitlock while trying to re-acquire lock
>        return (constants.OP_STATUS_WAITING, None)
>      except CancelJob:
> @@ -1179,14 +1164,6 @@ class _JobProcessor(object):
>        assert op.status == constants.OP_STATUS_CANCELING
>        return (constants.OP_STATUS_CANCELING, None)
>
> -    except QueueShutdown:
> -      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
> -
> -      assert op.status == constants.OP_STATUS_WAITING
> -
> -      # Job hadn't been started yet, so it should return to the queue
> -      return (constants.OP_STATUS_QUEUED, None)
> -
>      except Exception, err: # pylint: disable=W0703
>        logging.exception("%s: Caught exception in %s",
>                          opctx.log_prefix, opctx.summary)
> @@ -1665,9 +1642,6 @@ class JobQueue(object):
>      self.acquire = self._lock.acquire
>      self.release = self._lock.release
>
> -    # Accept jobs by default
> -    self._accepting_jobs = True
> -
>      # Read serial file
>      self._last_serial = jstore.ReadSerial()
>      assert self._last_serial is not None, ("Serial file was modified
> between"
> @@ -2512,22 +2486,8 @@ class JobQueue(object):
>      @return: Whether there are any running jobs
>
>      """
> -    if self._accepting_jobs:
> -      self._accepting_jobs = False
> -
>      return self._wpool.HasRunningTasks()
>
> -  def AcceptingJobsUnlocked(self):
> -    """Returns whether jobs are accepted.
> -
> -    Once L{PrepareShutdown} has been called, no new jobs are accepted and
> the
> -    queue is shutting down.
> -
> -    @rtype: bool
> -
> -    """
> -    return self._accepting_jobs
> -
>    @locking.ssynchronized(_LOCK)
>    def Shutdown(self):
>      """Stops the job queue.
> diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/
> ganeti.jqueue_unittest.py
> index 1aa3ec9..7ff14a0 100755
> --- a/test/py/ganeti.jqueue_unittest.py
> +++ b/test/py/ganeti.jqueue_unittest.py
> @@ -715,7 +715,6 @@ class _FakeQueueForProc:
>      self._acquired = False
>      self._updates = []
>      self._submitted = []
> -    self._accepting_jobs = True
>
>      self._submit_count = itertools.count(1000)
>
> @@ -751,12 +750,6 @@ class _FakeQueueForProc:
>      self._submitted.extend(zip(job_ids, jobs))
>      return job_ids
>
> -  def StopAcceptingJobs(self):
> -    self._accepting_jobs = False
> -
> -  def AcceptingJobsUnlocked(self):
> -    return self._accepting_jobs
> -
>
>  class _FakeExecOpCodeForProc:
>    def __init__(self, queue, before_start, after_start):
> @@ -1154,10 +1147,6 @@ class TestJobProcessor(unittest.TestCase,
> _JobProcessorTestUtils):
>
>      self._TestCancelWhileSomething(fn)
>
> -  def testCancelDuringQueueShutdown(self):
> -    queue = self._TestCancelWhileSomething(lambda q:
> q.StopAcceptingJobs())
> -    self.assertFalse(queue.AcceptingJobsUnlocked())
> -
>    def testCancelWhileRunning(self):
>      # Tests canceling a job with finished opcodes and more, unprocessed
> ones
>      queue = _FakeQueueForProc()
> @@ -1204,185 +1193,6 @@ class TestJobProcessor(unittest.TestCase,
> _JobProcessorTestUtils):
>                        ["Res0", "Job canceled by request",
>                         "Job canceled by request"]])
>
> -  def _TestQueueShutdown(self, queue, opexec, job, runcount):
> -    self.assertTrue(queue.AcceptingJobsUnlocked())
> -
> -    # Simulate shutdown
> -    queue.StopAcceptingJobs()
> -
> -    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
> -                     jqueue._JobProcessor.DEFER)
> -
> -    # Check result
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -    self.assertEqual(job.GetInfo(["status"]),
> [constants.JOB_STATUS_QUEUED])
> -    self.assertFalse(job.cur_opctx)
> -    self.assertTrue(job.start_timestamp)
> -    self.assertFalse(job.end_timestamp)
> -    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
> -    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
> -                               for op in job.ops[:runcount]))
> -    self.assertFalse(job.ops[runcount].end_timestamp)
> -    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
> -                                for op in job.ops[(runcount + 1):]))
> -    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
> -                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
> -                       ([constants.OP_STATUS_QUEUED] *
> -                        (len(job.ops) - runcount))),
> -                      (["Res%s" % i for i in range(runcount)] +
> -                       ([None] * (len(job.ops) - runcount)))])
> -
> -    # Must have been written and replicated
> -    self.assertEqual(queue.GetNextUpdate(), (job, True))
> -    self.assertRaises(IndexError, queue.GetNextUpdate)
> -
> -  def testQueueShutdownWhileRunning(self):
> -    # Tests shutting down the queue while a job is running
> -    queue = _FakeQueueForProc()
> -
> -    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
> -           for i in range(3)]
> -
> -    # Create job
> -    job_id = 2718211587
> -    job = self._CreateJob(queue, job_id, ops)
> -
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -
> -    opexec = _FakeExecOpCodeForProc(queue, None, None)
> -
> -    self.assertRaises(IndexError, queue.GetNextUpdate)
> -
> -    # Run one opcode
> -    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
> -                     jqueue._JobProcessor.DEFER)
> -
> -    # Job goes back to queued
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
> -                     [[constants.OP_STATUS_SUCCESS,
> -                       constants.OP_STATUS_QUEUED,
> -                       constants.OP_STATUS_QUEUED],
> -                      ["Res0", None, None]])
> -    self.assertFalse(job.cur_opctx)
> -
> -    # Writes for waiting, running and result
> -    for _ in range(3):
> -      self.assertEqual(queue.GetNextUpdate(), (job, True))
> -
> -    # Run second opcode
> -    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
> -                     jqueue._JobProcessor.DEFER)
> -
> -    # Job goes back to queued
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
> -                     [[constants.OP_STATUS_SUCCESS,
> -                       constants.OP_STATUS_SUCCESS,
> -                       constants.OP_STATUS_QUEUED],
> -                      ["Res0", "Res1", None]])
> -    self.assertFalse(job.cur_opctx)
> -
> -    # Writes for waiting, running and result
> -    for _ in range(3):
> -      self.assertEqual(queue.GetNextUpdate(), (job, True))
> -
> -    self._TestQueueShutdown(queue, opexec, job, 2)
> -
> -  def testQueueShutdownWithLockTimeout(self):
> -    # Tests shutting down while a lock acquire times out
> -    queue = _FakeQueueForProc()
> -
> -    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
> -           for i in range(3)]
> -
> -    # Create job
> -    job_id = 1304231178
> -    job = self._CreateJob(queue, job_id, ops)
> -
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -
> -    acquire_timeout = False
> -
> -    def _BeforeStart(timeout, priority):
> -      self.assertFalse(queue.IsAcquired())
> -      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
> -      if acquire_timeout:
> -        raise mcpu.LockAcquireTimeout()
> -
> -    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
> -
> -    self.assertRaises(IndexError, queue.GetNextUpdate)
> -
> -    # Run one opcode
> -    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
> -                     jqueue._JobProcessor.DEFER)
> -
> -    # Job goes back to queued
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
> -                     [[constants.OP_STATUS_SUCCESS,
> -                       constants.OP_STATUS_QUEUED,
> -                       constants.OP_STATUS_QUEUED],
> -                      ["Res0", None, None]])
> -    self.assertFalse(job.cur_opctx)
> -
> -    # Writes for waiting, running and result
> -    for _ in range(3):
> -      self.assertEqual(queue.GetNextUpdate(), (job, True))
> -
> -    # The next opcode should have expiring lock acquires
> -    acquire_timeout = True
> -
> -    self._TestQueueShutdown(queue, opexec, job, 1)
> -
> -  def testQueueShutdownWhileInQueue(self):
> -    # This should never happen in reality (no new jobs are started by the
> -    # workerpool once a shutdown has been initiated), but it's better to
> test
> -    # the job processor for this scenario
> -    queue = _FakeQueueForProc()
> -
> -    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
> -           for i in range(5)]
> -
> -    # Create job
> -    job_id = 2031
> -    job = self._CreateJob(queue, job_id, ops)
> -
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -    self.assertRaises(IndexError, queue.GetNextUpdate)
> -
> -    self.assertFalse(job.start_timestamp)
> -    self.assertFalse(job.end_timestamp)
> -    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
> -                               for op in job.ops))
> -
> -    opexec = _FakeExecOpCodeForProc(queue, None, None)
> -    self._TestQueueShutdown(queue, opexec, job, 0)
> -
> -  def testQueueShutdownWhileWaitlockInQueue(self):
> -    queue = _FakeQueueForProc()
> -
> -    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
> -           for i in range(5)]
> -
> -    # Create job
> -    job_id = 53125685
> -    job = self._CreateJob(queue, job_id, ops)
> -
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> -
> -    job.ops[0].status = constants.OP_STATUS_WAITING
> -
> -    assert len(job.ops) == 5
> -
> -    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
> -
> -    self.assertRaises(IndexError, queue.GetNextUpdate)
> -
> -    opexec = _FakeExecOpCodeForProc(queue, None, None)
> -    self._TestQueueShutdown(queue, opexec, job, 0)
> -
>    def testPartiallyRun(self):
>      # Tests calling the processor on a job that's been partially run
> before the
>      # program was restarted
> --
> 2.0.0.526.g5318336
>
>

Reply via email to