This should improve the situation:

diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
index 1207e68..fc068e2 100644
--- a/qa/qa_job_utils.py
+++ b/qa/qa_job_utils.py
@@ -125,7 +125,7 @@ def _GetNodeUUIDMap(nodes):


 def _FindLockNames(locks):
-  """ Finds a mapping of internal lock names to the names of entities
locked.
+  """ Finds the ids and descriptions of locks that given locks can block.

   @type locks: dict of locking level to list
   @param locks: The locks that gnt-debug delay is holding.
@@ -133,6 +133,10 @@ def _FindLockNames(locks):
   @rtype: dict of string to string
   @return: The lock name to entity name map.

+  For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
+  blocked even though they were not listed explicitly. This function has
to take
+  care and list all locks that can be blocked by the locks given as
parameters.
+
   """
   lock_map = {}

@@ -147,8 +151,8 @@ def _FindLockNames(locks):
     for name in name_uuid_map:
       lock_map["node/%s" % name_uuid_map[name]] = name

-    # With locking.ALL_SET being defined as None, only node_locks does not
-    # suffice
+    # If ALL_SET was requested explicitly, or there is at least one lock
+    # Note that locking.ALL_SET is None and hence the strange form of the
if
     if node_locks == locking.ALL_SET or node_locks:
       lock_map["node/[lockset]"] = "joint node lock"




On Mon, Feb 24, 2014 at 12:46 PM, Petr Pudlák <[email protected]> wrote:

>
>
>
> On Tue, Feb 18, 2014 at 3:39 PM, Hrvoje Ribicic <[email protected]> wrote:
>
>> This patch adds threading to the RunWithTests function, allowing one
>> thread to execute the QA test, and the other to monitor if it is being
>> blocked by locks set up during the test. If it is, terminate the
>> blocking job, and let the QA continue, reporting the test failure at
>> the very end.
>>
>> Signed-off-by: Hrvoje Ribicic <[email protected]>
>> ---
>>  qa/qa_job_utils.py | 130
>> +++++++++++++++++++++++++++++++++++++++++++++++++----
>>  1 file changed, 122 insertions(+), 8 deletions(-)
>>
>> diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
>> index bc4733c..03f8ee1 100644
>> --- a/qa/qa_job_utils.py
>> +++ b/qa/qa_job_utils.py
>> @@ -24,6 +24,8 @@
>>  """
>>
>>  import re
>> +import threading
>> +import time
>>
>>  from ganeti import constants
>>  from ganeti import locking
>> @@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput,
>> GetObjectInfo
>>  AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
>>
>>
>> -def _GetOutputFromMaster(cmd):
>> +def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
>>    """ Gets the output of a command executed on master.
>>
>>    """
>> @@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd):
>>    # buildbot
>>    cmdstr += " 2>&1"
>>
>> -  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr)
>> +  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
>> +                          use_multiplexer=use_multiplexer,
>> log_cmd=log_cmd)
>>
>>
>>  def ExecuteJobProducingCommand(cmd):
>> @@ -107,6 +110,83 @@ def _TerminateDelayFunction(termination_socket):
>>    AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" %
>> termination_socket)
>>
>>
>> +def _GetNodeUUIDMap(nodes):
>> +  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
>> +
>> +  @type nodes: list of string
>> +  @param nodes: The nodes to retrieve a map for. If empty, returns
>> information
>> +                for all the nodes.
>> +
>> +  """
>> +  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
>> +  cmd.extend(nodes)
>> +  output = _GetOutputFromMaster(cmd)
>> +  return dict(map(lambda x: x.split(), output.splitlines()))
>> +
>> +
>>
>
> As discussed offline, it'd be good to document this function in more
> detail so that it's easier to add more locks later. Perhaps there could be
> a comment section at the start of the module describing what's necessary to
> add another type of locks.
>
>
>> +def _FindLockNames(locks):
>> +  """ Finds a mapping of internal lock names to the names of entities
>> locked.
>> +
>> +  @type locks: dict of locking level to list
>> +  @param locks: The locks that gnt-debug delay is holding.
>> +
>> +  @rtype: dict of string to string
>> +  @return: The lock name to entity name map.
>> +
>> +  """
>> +  lock_map = {}
>> +
>> +  if locking.LEVEL_NODE in locks:
>> +    node_locks = locks[locking.LEVEL_NODE]
>> +    if node_locks == locking.ALL_SET:
>> +      # Empty list retrieves all info
>> +      name_uuid_map = _GetNodeUUIDMap([])
>> +    else:
>> +      name_uuid_map = _GetNodeUUIDMap(node_locks)
>> +
>> +    for name in name_uuid_map:
>> +      lock_map["node/%s" % name_uuid_map[name]] = name
>> +
>> +    # With locking.ALL_SET being defined as None, only node_locks does
>> not
>> +    # suffice
>> +    if node_locks == locking.ALL_SET or node_locks:
>> +      lock_map["node/[lockset]"] = "joint node lock"
>> +
>> +  #TODO add other lock types here when support for these is added
>> +  return lock_map
>> +
>> +
>> +def _GetBlockingLocks():
>> +  """ Finds out which locks are blocking jobs by invoking "gnt-debug
>> locks".
>> +
>> +  @rtype: list of string
>> +  @return: The names of the locks currently blocking any job.
>> +
>> +  """
>> +  # Due to mysterious issues when a SSH multiplexer is being used by two
>> +  # threads, we turn it off, and block most of the logging to improve the
>> +  # visibility of the other thread's output
>> +  locks_output = _GetOutputFromMaster("gnt-debug locks",
>> use_multiplexer=False,
>> +                                      log_cmd=False)
>> +
>> +  # The first non-empty line is the header, which we do not need
>> +  lock_lines = locks_output.splitlines()[1:]
>> +
>> +  blocking_locks = []
>> +  for lock_line in lock_lines:
>> +    components = lock_line.split()
>> +    if len(components) != 4:
>> +      raise qa_error.Error("Error while parsing gnt-debug locks output, "
>> +                           "line at fault is: %s" % lock_line)
>> +
>> +    lock_name, _, _, pending_jobs = components
>> +
>> +    if pending_jobs != '-':
>> +      blocking_locks.append(lock_name)
>> +
>> +  return blocking_locks
>> +
>> +
>>  # TODO: Can this be done as a decorator? Implement as needed.
>>  def RunWithLocks(fn, locks, timeout, *args, **kwargs):
>>    """ Runs the given function, acquiring a set of locks beforehand.
>> @@ -123,10 +203,11 @@ def RunWithLocks(fn, locks, timeout, *args,
>> **kwargs):
>>    test, to try and see if the function can run in parallel with other
>>    operations.
>>
>> -  The current version simply creates the locks, which expire after a
>> given
>> -  timeout, and attempts to invoke the provided function.
>> -
>> -  This will probably block the QA, and future versions will address this.
>> +  Locks are acquired by invoking a gnt-debug delay operation which can be
>> +  interrupted as needed. The QA test is then run in a separate thread,
>> with the
>> +  current thread observing jobs waiting for locks. When a job is spotted
>> waiting
>> +  for a lock held by the started delay operation, this is noted, and the
>> delay
>> +  is interrupted, allowing the QA test to continue.
>>
>>    A default timeout is not provided by design - the test creator must
>> make a
>>    good conservative estimate.
>> @@ -139,11 +220,44 @@ def RunWithLocks(fn, locks, timeout, *args,
>> **kwargs):
>>    # The watcher may interfere by issuing its own jobs - therefore pause
>> it
>>    AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
>>
>> +  # Find out the lock names prior to starting the delay function
>> +  lock_name_map = _FindLockNames(locks)
>> +
>>    termination_socket = _StartDelayFunction(locks, timeout)
>>
>> -  fn(*args, **kwargs)
>> +  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
>> +  qa_thread.start()
>>
>> -  _TerminateDelayFunction(termination_socket)
>> +  blocking_owned_locks = []
>> +  test_blocked = False
>> +
>> +  try:
>> +    while qa_thread.isAlive():
>> +      blocking_locks = _GetBlockingLocks()
>> +      blocking_owned_locks = \
>> +        set(blocking_locks).intersection(set(lock_name_map))
>> +
>> +      if blocking_owned_locks:
>> +        test_blocked = True
>> +        _TerminateDelayFunction(termination_socket)
>> +        break
>> +
>> +      # The sleeping time has been set arbitrarily
>> +      time.sleep(5)
>> +  except:
>> +    # If anything goes wrong here, we should be responsible and
>> terminate the
>> +    # delay job
>> +    _TerminateDelayFunction(termination_socket)
>> +    raise
>>
> +
>> +  qa_thread.join()
>> +
>> +  if test_blocked:
>> +    blocking_lock_names = map(lock_name_map.get, blocking_owned_locks)
>> +    raise qa_error.Error("QA test succeded, but was blocked by the
>> locks: %s" %
>> +                         ", ".join(blocking_lock_names))
>> +  else:
>> +    _TerminateDelayFunction(termination_socket)
>>
>>    # Revive the watcher
>>    AssertCommand(["gnt-cluster", "watcher", "continue"])
>> --
>> 1.9.0.rc1.175.g0b1dcb5
>>
>>
> LGTM, thanks.
>

Reply via email to