With this patch all timeouts are pre-calculated. The interface of
the _LockTimeoutStrategy class is also changed a bit; NextAttempt
now returns a new instance.
---
lib/mcpu.py | 115 +++++++++++++++++++++++++-----------------
test/ganeti.mcpu_unittest.py | 16 +++---
2 files changed, 78 insertions(+), 53 deletions(-)
diff --git a/lib/mcpu.py b/lib/mcpu.py
index 1ad93ca..42b4d4f 100644
--- a/lib/mcpu.py
+++ b/lib/mcpu.py
@@ -46,84 +46,105 @@ class _LockAcquireTimeout(Exception):
"""
-class _LockTimeoutStrategy(object):
+def _CalculateLockAttemptTimeouts():
+ """Calculate timeouts for lock attempts.
+
+ """
+ running_sum = 0
+ result = []
+
+ # Wait for a total of at least 150s before doing a blocking acquire
+ while running_sum < 150.0:
+ try:
+ prev_timeout = result[-1]
+ except IndexError:
+ prev_timeout = 0.5
+
+ timeout = prev_timeout * 1.5
+
+ # Cap timeout at 10 seconds. This gives other jobs a chance to run
+ # even if we're still trying to get our locks, before finally moving
+ # to a blocking acquire.
+ if timeout > 10.0:
+ timeout = 10.0
+
+ elif timeout < 0.1:
+ # Lower boundary for safety
+ timeout = 0.1
+
+ running_sum += timeout
+ result.append(timeout)
+
+ return result
+
+
+class _LockAttemptTimeoutStrategy(object):
"""Class with lock acquire timeout strategy.
"""
__slots__ = [
- "_attempts",
+ "_attempt",
"_random_fn",
"_start_time",
+ "_time_fn",
]
- _MAX_ATTEMPTS = 10
- """How many retries before going into blocking mode"""
+ _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
- _ATTEMPT_FACTOR = 1.75
- """Factor between attempts"""
-
- def __init__(self, _random_fn=None):
+ def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
"""Initializes this class.
+ @type attempt: int
+ @param attempt: Current attempt number
+ @param _time_fn: Time function for unittests
@param _random_fn: Random number generator for unittests
"""
object.__init__(self)
- self._start_time = None
- self._attempts = 0
+ if attempt < 0:
+ raise ValueError("Attempt must be zero or positive")
- if _random_fn is None:
- self._random_fn = random.random
- else:
- self._random_fn = _random_fn
+ self._attempt = attempt
+ self._time_fn = _time_fn
+ self._random_fn = _random_fn
+
+ self._start_time = None
def NextAttempt(self):
- """Advances to the next attempt.
+ """Returns the strategy for the next attempt.
"""
- assert self._attempts >= 0
- self._attempts += 1
+ return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
+ _time_fn=self._time_fn,
+ _random_fn=self._random_fn)
def CalcRemainingTimeout(self):
"""Returns the remaining timeout.
"""
- assert self._attempts >= 0
-
- if self._attempts == self._MAX_ATTEMPTS:
- # Only blocking acquires after 10 retries
+ try:
+ timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
+ except IndexError:
+ # No more timeouts, do blocking acquire
return None
- if self._attempts > self._MAX_ATTEMPTS:
- raise RuntimeError("Blocking acquire ran into timeout")
-
# Get start time on first calculation
if self._start_time is None:
- self._start_time = time.time()
+ self._start_time = self._time_fn()
# Calculate remaining time for this attempt
- timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
- time.time())
-
- if timeout > 10.0:
- # Cap timeout at 10 seconds. This gives other jobs a chance to run
- # even if we're still trying to get our locks, before finally moving
- # to a blocking acquire.
- timeout = 10.0
-
- elif timeout < 0.1:
- # Lower boundary
- timeout = 0.1
+ remaining_timeout = self._start_time + timeout - self._time_fn()
# Add a small variation (-/+ 5%) to timeouts. This helps in situations
# where two or more jobs are fighting for the same lock(s).
- variation_range = timeout * 0.1
- timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
+ variation_range = remaining_timeout * 0.1
+ remaining_timeout += ((self._random_fn() * variation_range) -
+ (variation_range * 0.5))
- assert timeout >= 0.0, "Timeout must be positive"
+ assert remaining_timeout >= 0.0, "Timeout must be positive"
- return timeout
+ return remaining_timeout
class OpExecCbBase:
@@ -414,16 +435,17 @@ class Processor(object):
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
- timeout_strategy = _LockTimeoutStrategy()
- calc_timeout = timeout_strategy.CalcRemainingTimeout
+ timeout_strategy = _LockAttemptTimeoutStrategy()
while True:
try:
+ acquire_timeout = timeout_strategy.CalcRemainingTimeout()
+
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
- not lu_class.REQ_BGL, calc_timeout()) is None:
+ not lu_class.REQ_BGL, acquire_timeout) is None:
raise _LockAcquireTimeout()
try:
@@ -431,7 +453,8 @@ class Processor(object):
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
- return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
calc_timeout)
+ return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+ timeout_strategy.CalcRemainingTimeout)
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
@@ -439,7 +462,7 @@ class Processor(object):
# Timeout while waiting for lock, try again
pass
- timeout_strategy.NextAttempt()
+ timeout_strategy = timeout_strategy.NextAttempt()
finally:
self._cbs = None
diff --git a/test/ganeti.mcpu_unittest.py b/test/ganeti.mcpu_unittest.py
index 02183e1..4185a69 100755
--- a/test/ganeti.mcpu_unittest.py
+++ b/test/ganeti.mcpu_unittest.py
@@ -27,25 +27,27 @@ import unittest
from ganeti import mcpu
-class TestLockTimeoutStrategy(unittest.TestCase):
+class TestLockAttemptTimeoutStrategy(unittest.TestCase):
def testConstants(self):
- self.assert_(mcpu._LockTimeoutStrategy._MAX_ATTEMPTS > 0)
- self.assert_(mcpu._LockTimeoutStrategy._ATTEMPT_FACTOR > 1.0)
+ tpa = mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT
+ self.assert_(len(tpa) > 10)
+ self.assert_(sum(tpa) > 150.0)
def testSimple(self):
- strat = mcpu._LockTimeoutStrategy(_random_fn=lambda: 0.5)
+ strat = mcpu._LockAttemptTimeoutStrategy(_random_fn=lambda: 0.5,
+ _time_fn=lambda: 0.0)
- self.assertEqual(strat._attempts, 0)
+ self.assertEqual(strat._attempt, 0)
prev = None
- for _ in range(strat._MAX_ATTEMPTS):
+ for _ in range(len(mcpu._LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT)):
timeout = strat.CalcRemainingTimeout()
self.assert_(timeout is not None)
self.assert_(timeout <= 10.0)
self.assert_(prev is None or timeout >= prev)
- strat.NextAttempt()
+ strat = strat.NextAttempt()
prev = timeout
--
1.6.4.3