Change lock allocation in mcpu to honor the required minimum
of locks to allocate in opportunistic lock allocation.

Signed-off-by: Klaus Aehlig <[email protected]>
Reviewed-by: Hrvoje Ribicic <[email protected]>

Cherry-picked from f3f1fc574671d41fefb05661b01d4ff93312eef7.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/mcpu.py | 28 ++++++++++++++++++++--------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/lib/mcpu.py b/lib/mcpu.py
index 7e061cd..f08c3c9 100644
--- a/lib/mcpu.py
+++ b/lib/mcpu.py
@@ -335,7 +335,8 @@ class Processor(object):
     if not self._enable_locks:
       raise errors.ProgrammerError("Attempted to use disabled locks")
 
-  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
+  def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
+                    opportunistic_count=1):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -396,12 +397,10 @@ class Processor(object):
     else:
       request = [[lock, "exclusive"] for lock in locks]
 
-    if opportunistic:
-      logging.debug("Opportunistically acquring some of %s for %s.",
-                    locks, self._wconfdcontext)
-      locks = self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext,
-                                                          request)
-    elif timeout is None:
+    if timeout is None:
+      ## Note: once we are so desperate for locks to request them
+      ## unconditionally, we no longer care about an original plan
+      ## to acquire locks opportunistically.
       logging.info("Definitely requesting %s for %s",
                    request, self._wconfdcontext)
       ## The only way to be sure of not getting starved is to sequentially
@@ -415,6 +414,17 @@ class Processor(object):
           if not pending:
             break
           time.sleep(10.0 * random.random())
+
+    elif opportunistic:
+      logging.debug("For %ss trying to opportunistically acquire"
+                    "  at least %d of %s for %s.",
+                    timeout, opportunistic_count, locks, self._wconfdcontext)
+      locks = utils.SimpleRetry(
+        lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
+        2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
+      logging.debug("Managed to get the following locks: %s", locks)
+      if locks == []:
+        raise LockAcquireTimeout()
     else:
       logging.debug("Trying %ss to request %s for %s",
                     timeout, request, self._wconfdcontext)
@@ -538,6 +548,7 @@ class Processor(object):
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
       opportunistic = lu.opportunistic_locks[level]
+      opportunistic_count = lu.opportunistic_locks_count[level]
 
       try:
         assert adding_locks ^ acquiring_locks, \
@@ -553,7 +564,8 @@ class Processor(object):
           use_opportunistic = False
 
         self._AcquireLocks(level, needed_locks, share, use_opportunistic,
-                           calc_timeout())
+                           calc_timeout(),
+                           opportunistic_count=opportunistic_count)
         lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
 
         result = self._LockAndExecLU(lu, level + 1, calc_timeout)
-- 
2.0.0.526.g5318336

Reply via email to