On Fri, Oct 9, 2009 at 4:57 PM, Michael Hanselmann <[email protected]> wrote:
>
> With this patch all timeouts are pre-calculated. The interface of
> the _LockTimeoutStrategy class is also changed a bit; NextAttempt
> now returns a new instance.
> ---
> lib/mcpu.py | 115
> +++++++++++++++++++++++++-----------------
> test/ganeti.mcpu_unittest.py | 16 +++---
> 2 files changed, 78 insertions(+), 53 deletions(-)
>
> diff --git a/lib/mcpu.py b/lib/mcpu.py
> index 1ad93ca..42b4d4f 100644
> --- a/lib/mcpu.py
> +++ b/lib/mcpu.py
> @@ -46,84 +46,105 @@ class _LockAcquireTimeout(Exception):
> """
>
>
> -class _LockTimeoutStrategy(object):
> +def _CalculateLockAttemptTimeouts():
> + """Calculate timeouts for lock attempts.
> +
> + """
> + running_sum = 0
> + result = []
> +
> + # Wait for a total of at least 150s before doing a blocking acquire
> + while running_sum < 150.0:
> + try:
> + prev_timeout = result[-1]
> + except IndexError:
> + prev_timeout = 0.5
Don't we use less lines doing result = [0.75] and skipping the try/except here?
> +
> + timeout = prev_timeout * 1.5
> +
> + # Cap timeout at 10 seconds. This gives other jobs a chance to run
> + # even if we're still trying to get our locks, before finally moving
> + # to a blocking acquire.
> + if timeout > 10.0:
> + timeout = 10.0
> +
> + elif timeout < 0.1:
> + # Lower boundary for safety
> + timeout = 0.1
> +
> + running_sum += timeout
> + result.append(timeout)
> +
> + return result
> +
> +
> +class _LockAttemptTimeoutStrategy(object):
> """Class with lock acquire timeout strategy.
>
> """
> __slots__ = [
> - "_attempts",
> + "_attempt",
> "_random_fn",
> "_start_time",
> + "_time_fn",
> ]
>
> - _MAX_ATTEMPTS = 10
> - """How many retries before going into blocking mode"""
> + _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
>
> - _ATTEMPT_FACTOR = 1.75
> - """Factor between attempts"""
> -
> - def __init__(self, _random_fn=None):
> + def __init__(self, attempt=0, _time_fn=time.time,
> _random_fn=random.random):
> """Initializes this class.
>
> + �...@type attempt: int
> + �...@param attempt: Current attempt number
> + �...@param _time_fn: Time function for unittests
I'm wondering: do we need to pass all this time_fn in?
In the unittest we can set mcpu.time = mock and avoid all this special casing...
> @param _random_fn: Random number generator for unittests
>
Same here
> """
> object.__init__(self)
>
> - self._start_time = None
> - self._attempts = 0
> + if attempt < 0:
> + raise ValueError("Attempt must be zero or positive")
>
> - if _random_fn is None:
> - self._random_fn = random.random
> - else:
> - self._random_fn = _random_fn
> + self._attempt = attempt
> + self._time_fn = _time_fn
> + self._random_fn = _random_fn
> +
> + self._start_time = None
>
> def NextAttempt(self):
> - """Advances to the next attempt.
> + """Returns the strategy for the next attempt.
>
> """
> - assert self._attempts >= 0
> - self._attempts += 1
> + return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
> + _time_fn=self._time_fn,
> + _random_fn=self._random_fn)
>
> def CalcRemainingTimeout(self):
> """Returns the remaining timeout.
>
> """
> - assert self._attempts >= 0
> -
> - if self._attempts == self._MAX_ATTEMPTS:
> - # Only blocking acquires after 10 retries
> + try:
> + timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
> + except IndexError:
> + # No more timeouts, do blocking acquire
> return None
>
> - if self._attempts > self._MAX_ATTEMPTS:
> - raise RuntimeError("Blocking acquire ran into timeout")
> -
> # Get start time on first calculation
> if self._start_time is None:
> - self._start_time = time.time()
> + self._start_time = self._time_fn()
>
> # Calculate remaining time for this attempt
> - timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
> - time.time())
> -
> - if timeout > 10.0:
> - # Cap timeout at 10 seconds. This gives other jobs a chance to run
> - # even if we're still trying to get our locks, before finally moving
> - # to a blocking acquire.
> - timeout = 10.0
> -
> - elif timeout < 0.1:
> - # Lower boundary
> - timeout = 0.1
> + remaining_timeout = self._start_time + timeout - self._time_fn()
>
> # Add a small variation (-/+ 5%) to timeouts. This helps in situations
> # where two or more jobs are fighting for the same lock(s).
> - variation_range = timeout * 0.1
> - timeout += (self._random_fn() * variation_range) - (variation_range *
> 0.5)
> + variation_range = remaining_timeout * 0.1
> + remaining_timeout += ((self._random_fn() * variation_range) -
> + (variation_range * 0.5))
>
> - assert timeout >= 0.0, "Timeout must be positive"
> + assert remaining_timeout >= 0.0, "Timeout must be positive"
>
> - return timeout
> + return remaining_timeout
>
>
> class OpExecCbBase:
> @@ -414,16 +435,17 @@ class Processor(object):
> if lu_class is None:
> raise errors.OpCodeUnknown("Unknown opcode")
>
> - timeout_strategy = _LockTimeoutStrategy()
> - calc_timeout = timeout_strategy.CalcRemainingTimeout
> + timeout_strategy = _LockAttemptTimeoutStrategy()
>
> while True:
> try:
> + acquire_timeout = timeout_strategy.CalcRemainingTimeout()
> +
> # Acquire the Big Ganeti Lock exclusively if this LU requires it,
> # and in a shared fashion otherwise (to prevent concurrent run with
> # an exclusive LU.
> if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
> - not lu_class.REQ_BGL, calc_timeout()) is
> None:
> + not lu_class.REQ_BGL, acquire_timeout) is
> None:
> raise _LockAcquireTimeout()
>
> try:
> @@ -431,7 +453,8 @@ class Processor(object):
> lu.ExpandNames()
> assert lu.needed_locks is not None, "needed_locks not set by LU"
>
> - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
> calc_timeout)
> + return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
> + timeout_strategy.CalcRemainingTimeout)
> finally:
> self.context.glm.release(locking.LEVEL_CLUSTER)
>
> @@ -439,7 +462,7 @@ class Processor(object):
> # Timeout while waiting for lock, try again
> pass
>
> - timeout_strategy.NextAttempt()
> + timeout_strategy = timeout_strategy.NextAttempt()
>
> finally:
> self._cbs = None
> diff --git a/test/ganeti.mcpu_unittest.py b/test/ganeti.mcpu_unittest.py
> index 02183e1..4185a69 100755
> --- a/test/ganeti.mcpu_unittest.py
> +++ b/test/ganeti.mcpu_unittest.py
> @@ -27,25 +27,27 @@ import unittest
> from ganeti import mcpu
>
>
> -class TestLockTimeoutStrategy(unittest.TestCase):
> +class TestLockAttemptTimeoutStrategy(unittest.TestCase):
> def testConstants(self):
> - self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
> - self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
> + tpa = mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT
> + self.assert_(len(tpa) > 10)
> + self.assert_(sum(tpa) > 150.0)
>
> def testSimple(self):
> - strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
> + strat = mcpu._LockAttemptTimeoutStrategy(_random_fn=lambda: 0.5,
> + _time_fn=lambda: 0.0)
>
> - self.assertEqual(strat._attempts, 0)
> + self.assertEqual(strat._attempt, 0)
>
> prev = None
> - for _ in range(strat._MAX_ATTEMPTS):
> + for _ in
> range(len(mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT)):
> timeout = strat.CalcRemainingTimeout()
> self.assert_(timeout is not None)
>
> self.assert_(timeout <= 10.0)
> self.assert_(prev is None or timeout >= prev)
>
> - strat.NextAttempt()
> + strat = strat.NextAttempt()
>
> prev = timeout
>
> --
> 1.6.4.3
>
>
LGTM for the rest
Thanks,
Guido