On Fri, Oct 9, 2009 at 4:57 PM, Michael Hanselmann <[email protected]> wrote:
>
> With this patch all timeouts are pre-calculated. The interface of
> the _LockTimeoutStrategy class is also changed a bit; NextAttempt
> now returns a new instance.
> ---
>  lib/mcpu.py                  |  115 
> +++++++++++++++++++++++++-----------------
>  test/ganeti.mcpu_unittest.py |   16 +++---
>  2 files changed, 78 insertions(+), 53 deletions(-)
>
> diff --git a/lib/mcpu.py b/lib/mcpu.py
> index 1ad93ca..42b4d4f 100644
> --- a/lib/mcpu.py
> +++ b/lib/mcpu.py
> @@ -46,84 +46,105 @@ class _LockAcquireTimeout(Exception):
>   """
>
>
> -class _LockTimeoutStrategy(object):
> +def _CalculateLockAttemptTimeouts():
> +  """Calculate timeouts for lock attempts.
> +
> +  """
> +  running_sum = 0
> +  result = []
> +
> +  # Wait for a total of at least 150s before doing a blocking acquire
> +  while running_sum < 150.0:
> +    try:
> +      prev_timeout = result[-1]
> +    except IndexError:
> +      prev_timeout = 0.5

Don't we use less lines doing result = [0.75] and skipping the try/except here?

> +
> +    timeout = prev_timeout * 1.5
> +
> +    # Cap timeout at 10 seconds. This gives other jobs a chance to run
> +    # even if we're still trying to get our locks, before finally moving
> +    # to a blocking acquire.
> +    if timeout > 10.0:
> +      timeout = 10.0
> +
> +    elif timeout < 0.1:
> +      # Lower boundary for safety
> +      timeout = 0.1
> +
> +    running_sum += timeout
> +    result.append(timeout)
> +
> +  return result
> +
> +
> +class _LockAttemptTimeoutStrategy(object):
>   """Class with lock acquire timeout strategy.
>
>   """
>   __slots__ = [
> -    "_attempts",
> +    "_attempt",
>     "_random_fn",
>     "_start_time",
> +    "_time_fn",
>     ]
>
> -  _MAX_ATTEMPTS = 10
> -  """How many retries before going into blocking mode"""
> +  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
>
> -  _ATTEMPT_FACTOR = 1.75
> -  """Factor between attempts"""
> -
> -  def __init__(self, _random_fn=None):
> +  def __init__(self, attempt=0, _time_fn=time.time, 
> _random_fn=random.random):
>     """Initializes this class.
>
> +   �...@type attempt: int
> +   �...@param attempt: Current attempt number
> +   �...@param _time_fn: Time function for unittests

I'm wondering: do we need to pass all this time_fn in?
In the unittest we can set mcpu.time = mock and avoid all this special casing...

>     @param _random_fn: Random number generator for unittests
>

Same here

>     """
>     object.__init__(self)
>
> -    self._start_time = None
> -    self._attempts = 0
> +    if attempt < 0:
> +      raise ValueError("Attempt must be zero or positive")
>
> -    if _random_fn is None:
> -      self._random_fn = random.random
> -    else:
> -      self._random_fn = _random_fn
> +    self._attempt = attempt
> +    self._time_fn = _time_fn
> +    self._random_fn = _random_fn
> +
> +    self._start_time = None
>
>   def NextAttempt(self):
> -    """Advances to the next attempt.
> +    """Returns the strategy for the next attempt.
>
>     """
> -    assert self._attempts >= 0
> -    self._attempts += 1
> +    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
> +                                       _time_fn=self._time_fn,
> +                                       _random_fn=self._random_fn)
>
>   def CalcRemainingTimeout(self):
>     """Returns the remaining timeout.
>
>     """
> -    assert self._attempts >= 0
> -
> -    if self._attempts == self._MAX_ATTEMPTS:
> -      # Only blocking acquires after 10 retries
> +    try:
> +      timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
> +    except IndexError:
> +      # No more timeouts, do blocking acquire
>       return None
>
> -    if self._attempts > self._MAX_ATTEMPTS:
> -      raise RuntimeError("Blocking acquire ran into timeout")
> -
>     # Get start time on first calculation
>     if self._start_time is None:
> -      self._start_time = time.time()
> +      self._start_time = self._time_fn()
>
>     # Calculate remaining time for this attempt
> -    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
> -               time.time())
> -
> -    if timeout > 10.0:
> -      # Cap timeout at 10 seconds. This gives other jobs a chance to run
> -      # even if we're still trying to get our locks, before finally moving
> -      # to a blocking acquire.
> -      timeout = 10.0
> -
> -    elif timeout < 0.1:
> -      # Lower boundary
> -      timeout = 0.1
> +    remaining_timeout = self._start_time + timeout - self._time_fn()
>
>     # Add a small variation (-/+ 5%) to timeouts. This helps in situations
>     # where two or more jobs are fighting for the same lock(s).
> -    variation_range = timeout * 0.1
> -    timeout += (self._random_fn() * variation_range) - (variation_range * 
> 0.5)
> +    variation_range = remaining_timeout * 0.1
> +    remaining_timeout += ((self._random_fn() * variation_range) -
> +                          (variation_range * 0.5))
>
> -    assert timeout >= 0.0, "Timeout must be positive"
> +    assert remaining_timeout >= 0.0, "Timeout must be positive"
>
> -    return timeout
> +    return remaining_timeout
>
>
>  class OpExecCbBase:
> @@ -414,16 +435,17 @@ class Processor(object):
>       if lu_class is None:
>         raise errors.OpCodeUnknown("Unknown opcode")
>
> -      timeout_strategy = _LockTimeoutStrategy()
> -      calc_timeout = timeout_strategy.CalcRemainingTimeout
> +      timeout_strategy = _LockAttemptTimeoutStrategy()
>
>       while True:
>         try:
> +          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
> +
>           # Acquire the Big Ganeti Lock exclusively if this LU requires it,
>           # and in a shared fashion otherwise (to prevent concurrent run with
>           # an exclusive LU.
>           if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
> -                                not lu_class.REQ_BGL, calc_timeout()) is 
> None:
> +                                not lu_class.REQ_BGL, acquire_timeout) is 
> None:
>             raise _LockAcquireTimeout()
>
>           try:
> @@ -431,7 +453,8 @@ class Processor(object):
>             lu.ExpandNames()
>             assert lu.needed_locks is not None, "needed_locks not set by LU"
>
> -            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, 
> calc_timeout)
> +            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
> +                                       timeout_strategy.CalcRemainingTimeout)
>           finally:
>             self.context.glm.release(locking.LEVEL_CLUSTER)
>
> @@ -439,7 +462,7 @@ class Processor(object):
>           # Timeout while waiting for lock, try again
>           pass
>
> -        timeout_strategy.NextAttempt()
> +        timeout_strategy = timeout_strategy.NextAttempt()
>
>     finally:
>       self._cbs = None
> diff --git a/test/ganeti.mcpu_unittest.py b/test/ganeti.mcpu_unittest.py
> index 02183e1..4185a69 100755
> --- a/test/ganeti.mcpu_unittest.py
> +++ b/test/ganeti.mcpu_unittest.py
> @@ -27,25 +27,27 @@ import unittest
>  from ganeti import mcpu
>
>
> -class TestLockTimeoutStrategy(unittest.TestCase):
> +class TestLockAttemptTimeoutStrategy(unittest.TestCase):
>   def testConstants(self):
> -    self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
> -    self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
> +    tpa = mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT
> +    self.assert_(len(tpa) > 10)
> +    self.assert_(sum(tpa) > 150.0)
>
>   def testSimple(self):
> -    strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
> +    strat = mcpu._LockAttemptTimeoutStrategy(_random_fn=lambda: 0.5,
> +                                             _time_fn=lambda: 0.0)
>
> -    self.assertEqual(strat._attempts, 0)
> +    self.assertEqual(strat._attempt, 0)
>
>     prev = None
> -    for _ in range(strat._MAX_ATTEMPTS):
> +    for _ in 
> range(len(mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT)):
>       timeout = strat.CalcRemainingTimeout()
>       self.assert_(timeout is not None)
>
>       self.assert_(timeout <= 10.0)
>       self.assert_(prev is None or timeout >= prev)
>
> -      strat.NextAttempt()
> +      strat = strat.NextAttempt()
>
>       prev = timeout
>
> --
> 1.6.4.3
>
>

LGTM for the rest

Thanks,

Guido

Reply via email to