This should improve the situation:
diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
index 1207e68..fc068e2 100644
--- a/qa/qa_job_utils.py
+++ b/qa/qa_job_utils.py
@@ -125,7 +125,7 @@ def _GetNodeUUIDMap(nodes):
def _FindLockNames(locks):
- """ Finds a mapping of internal lock names to the names of entities
locked.
+ """ Finds the ids and descriptions of locks that given locks can block.
@type locks: dict of locking level to list
@param locks: The locks that gnt-debug delay is holding.
@@ -133,6 +133,10 @@ def _FindLockNames(locks):
@rtype: dict of string to string
@return: The lock name to entity name map.
+ For a given set of locks, some internal locks (e.g. ALL_SET locks) can be
+ blocked even though they were not listed explicitly. This function has
to take
+ care and list all locks that can be blocked by the locks given as
parameters.
+
"""
lock_map = {}
@@ -147,8 +151,8 @@ def _FindLockNames(locks):
for name in name_uuid_map:
lock_map["node/%s" % name_uuid_map[name]] = name
- # With locking.ALL_SET being defined as None, only node_locks does not
- # suffice
+ # If ALL_SET was requested explicitly, or there is at least one lock
+ # Note that locking.ALL_SET is None and hence the strange form of the
if
if node_locks == locking.ALL_SET or node_locks:
lock_map["node/[lockset]"] = "joint node lock"
On Mon, Feb 24, 2014 at 12:46 PM, Petr Pudlák <[email protected]> wrote:
>
>
>
> On Tue, Feb 18, 2014 at 3:39 PM, Hrvoje Ribicic <[email protected]> wrote:
>
>> This patch adds threading to the RunWithTests function, allowing one
>> thread to execute the QA test, and the other to monitor if it is being
>> blocked by locks set up during the test. If it is, terminate the
>> blocking job, and let the QA continue, reporting the test failure at
>> the very end.
>>
>> Signed-off-by: Hrvoje Ribicic <[email protected]>
>> ---
>> qa/qa_job_utils.py | 130
>> +++++++++++++++++++++++++++++++++++++++++++++++++----
>> 1 file changed, 122 insertions(+), 8 deletions(-)
>>
>> diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
>> index bc4733c..03f8ee1 100644
>> --- a/qa/qa_job_utils.py
>> +++ b/qa/qa_job_utils.py
>> @@ -24,6 +24,8 @@
>> """
>>
>> import re
>> +import threading
>> +import time
>>
>> from ganeti import constants
>> from ganeti import locking
>> @@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput,
>> GetObjectInfo
>> AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
>>
>>
>> -def _GetOutputFromMaster(cmd):
>> +def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
>> """ Gets the output of a command executed on master.
>>
>> """
>> @@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd):
>> # buildbot
>> cmdstr += " 2>&1"
>>
>> - return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr)
>> + return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
>> + use_multiplexer=use_multiplexer,
>> log_cmd=log_cmd)
>>
>>
>> def ExecuteJobProducingCommand(cmd):
>> @@ -107,6 +110,83 @@ def _TerminateDelayFunction(termination_socket):
>> AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" %
>> termination_socket)
>>
>>
>> +def _GetNodeUUIDMap(nodes):
>> + """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
>> +
>> + @type nodes: list of string
>> + @param nodes: The nodes to retrieve a map for. If empty, returns
>> information
>> + for all the nodes.
>> +
>> + """
>> + cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
>> + cmd.extend(nodes)
>> + output = _GetOutputFromMaster(cmd)
>> + return dict(map(lambda x: x.split(), output.splitlines()))
>> +
>> +
>>
>
> As discussed offline, it'd be good to document this function in more
> detail so that it's easier to add more locks later. Perhaps there could be
> a comment section at the start of the module describing what's necessary to
> add another type of locks.
>
>
>> +def _FindLockNames(locks):
>> + """ Finds a mapping of internal lock names to the names of entities
>> locked.
>> +
>> + @type locks: dict of locking level to list
>> + @param locks: The locks that gnt-debug delay is holding.
>> +
>> + @rtype: dict of string to string
>> + @return: The lock name to entity name map.
>> +
>> + """
>> + lock_map = {}
>> +
>> + if locking.LEVEL_NODE in locks:
>> + node_locks = locks[locking.LEVEL_NODE]
>> + if node_locks == locking.ALL_SET:
>> + # Empty list retrieves all info
>> + name_uuid_map = _GetNodeUUIDMap([])
>> + else:
>> + name_uuid_map = _GetNodeUUIDMap(node_locks)
>> +
>> + for name in name_uuid_map:
>> + lock_map["node/%s" % name_uuid_map[name]] = name
>> +
>> + # With locking.ALL_SET being defined as None, only node_locks does
>> not
>> + # suffice
>> + if node_locks == locking.ALL_SET or node_locks:
>> + lock_map["node/[lockset]"] = "joint node lock"
>> +
>> + #TODO add other lock types here when support for these is added
>> + return lock_map
>> +
>> +
>> +def _GetBlockingLocks():
>> + """ Finds out which locks are blocking jobs by invoking "gnt-debug
>> locks".
>> +
>> + @rtype: list of string
>> + @return: The names of the locks currently blocking any job.
>> +
>> + """
>> + # Due to mysterious issues when a SSH multiplexer is being used by two
>> + # threads, we turn it off, and block most of the logging to improve the
>> + # visibility of the other thread's output
>> + locks_output = _GetOutputFromMaster("gnt-debug locks",
>> use_multiplexer=False,
>> + log_cmd=False)
>> +
>> + # The first non-empty line is the header, which we do not need
>> + lock_lines = locks_output.splitlines()[1:]
>> +
>> + blocking_locks = []
>> + for lock_line in lock_lines:
>> + components = lock_line.split()
>> + if len(components) != 4:
>> + raise qa_error.Error("Error while parsing gnt-debug locks output, "
>> + "line at fault is: %s" % lock_line)
>> +
>> + lock_name, _, _, pending_jobs = components
>> +
>> + if pending_jobs != '-':
>> + blocking_locks.append(lock_name)
>> +
>> + return blocking_locks
>> +
>> +
>> # TODO: Can this be done as a decorator? Implement as needed.
>> def RunWithLocks(fn, locks, timeout, *args, **kwargs):
>> """ Runs the given function, acquiring a set of locks beforehand.
>> @@ -123,10 +203,11 @@ def RunWithLocks(fn, locks, timeout, *args,
>> **kwargs):
>> test, to try and see if the function can run in parallel with other
>> operations.
>>
>> - The current version simply creates the locks, which expire after a
>> given
>> - timeout, and attempts to invoke the provided function.
>> -
>> - This will probably block the QA, and future versions will address this.
>> + Locks are acquired by invoking a gnt-debug delay operation which can be
>> + interrupted as needed. The QA test is then run in a separate thread,
>> with the
>> + current thread observing jobs waiting for locks. When a job is spotted
>> waiting
>> + for a lock held by the started delay operation, this is noted, and the
>> delay
>> + is interrupted, allowing the QA test to continue.
>>
>> A default timeout is not provided by design - the test creator must
>> make a
>> good conservative estimate.
>> @@ -139,11 +220,44 @@ def RunWithLocks(fn, locks, timeout, *args,
>> **kwargs):
>> # The watcher may interfere by issuing its own jobs - therefore pause
>> it
>> AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
>>
>> + # Find out the lock names prior to starting the delay function
>> + lock_name_map = _FindLockNames(locks)
>> +
>> termination_socket = _StartDelayFunction(locks, timeout)
>>
>> - fn(*args, **kwargs)
>> + qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
>> + qa_thread.start()
>>
>> - _TerminateDelayFunction(termination_socket)
>> + blocking_owned_locks = []
>> + test_blocked = False
>> +
>> + try:
>> + while qa_thread.isAlive():
>> + blocking_locks = _GetBlockingLocks()
>> + blocking_owned_locks = \
>> + set(blocking_locks).intersection(set(lock_name_map))
>> +
>> + if blocking_owned_locks:
>> + test_blocked = True
>> + _TerminateDelayFunction(termination_socket)
>> + break
>> +
>> + # The sleeping time has been set arbitrarily
>> + time.sleep(5)
>> + except:
>> + # If anything goes wrong here, we should be responsible and
>> terminate the
>> + # delay job
>> + _TerminateDelayFunction(termination_socket)
>> + raise
>>
> +
>> + qa_thread.join()
>> +
>> + if test_blocked:
>> + blocking_lock_names = map(lock_name_map.get, blocking_owned_locks)
>> + raise qa_error.Error("QA test succeded, but was blocked by the
>> locks: %s" %
>> + ", ".join(blocking_lock_names))
>> + else:
>> + _TerminateDelayFunction(termination_socket)
>>
>> # Revive the watcher
>> AssertCommand(["gnt-cluster", "watcher", "continue"])
>> --
>> 1.9.0.rc1.175.g0b1dcb5
>>
>>
> LGTM, thanks.
>