Repository: cloudstack Updated Branches: refs/heads/pytest 9610685f4 -> 797fff165
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/dsession.py ---------------------------------------------------------------------- diff --git a/tools/pytest-xdist/xdist/dsession.py b/tools/pytest-xdist/xdist/dsession.py new file mode 100644 index 0000000..8385e76 --- /dev/null +++ b/tools/pytest-xdist/xdist/dsession.py @@ -0,0 +1,460 @@ +import difflib + +import pytest +import py +from xdist.slavemanage import NodeManager + + +queue = py.builtin._tryimport('queue', 'Queue') + + +class EachScheduling: + + def __init__(self, numnodes, log=None): + self.numnodes = numnodes + self.node2collection = {} + self.node2pending = {} + if log is None: + self.log = py.log.Producer("eachsched") + else: + self.log = log.eachsched + self.collection_is_completed = False + + def hasnodes(self): + return bool(self.node2pending) + + def addnode(self, node): + self.node2collection[node] = None + + def tests_finished(self): + if not self.collection_is_completed: + return False + return True + + def addnode_collection(self, node, collection): + assert not self.collection_is_completed + assert self.node2collection[node] is None + self.node2collection[node] = list(collection) + self.node2pending[node] = [] + if len(self.node2pending) >= self.numnodes: + self.collection_is_completed = True + + def remove_item(self, node, item_index, duration=0): + self.node2pending[node].remove(item_index) + + def remove_node(self, node): + # KeyError if we didn't get an addnode() yet + pending = self.node2pending.pop(node) + if not pending: + return + crashitem = self.node2collection[node][pending.pop(0)] + # XXX do or report something wrt the remaining per-node pending items? + return crashitem + + def init_distribute(self): + assert self.collection_is_completed + for node, pending in self.node2pending.items(): + node.send_runtest_all() + pending[:] = range(len(self.node2collection[node])) + + +class LoadScheduling: + def __init__(self, numnodes, log=None): + self.numnodes = numnodes + self.node2pending = {} + self.node2collection = {} + self.nodes = [] + self.pending = [] + if log is None: + self.log = py.log.Producer("loadsched") + else: + self.log = log.loadsched + self.collection_is_completed = False + + def hasnodes(self): + return bool(self.node2pending) + + def addnode(self, node): + self.node2pending[node] = [] + self.nodes.append(node) + + def tests_finished(self): + if not self.collection_is_completed: + return False + for pending in self.node2pending.values(): + if len(pending) >= 2: + return False + return True + + def addnode_collection(self, node, collection): + assert not self.collection_is_completed + assert node in self.node2pending + self.node2collection[node] = list(collection) + if len(self.node2collection) >= self.numnodes: + self.collection_is_completed = True + + def remove_item(self, node, item_index, duration=0): + self.node2pending[node].remove(item_index) + self.check_schedule(node, duration=duration) + + def check_schedule(self, node, duration=0): + if self.pending: + # how many nodes do we have? + num_nodes = len(self.node2pending) + # if our node goes below a heuristic minimum, fill it out to + # heuristic maximum + items_per_node_min = max( + 2, len(self.pending) // num_nodes // 4) + items_per_node_max = max( + 2, len(self.pending) // num_nodes // 2) + node_pending = self.node2pending[node] + if len(node_pending) < items_per_node_min: + if duration >= 0.1 and len(node_pending) >= 2: + # seems the node is doing long-running tests + # and has enough items to continue + # so let's rather wait with sending new items + return + num_send = items_per_node_max - len(node_pending) + self._send_tests(node, num_send) + + self.log("num items waiting for node:", len(self.pending)) + #self.log("node2pending:", self.node2pending) + + def remove_node(self, node): + self.nodes.remove(node) + pending = self.node2pending.pop(node) + if not pending: + return + # the node has crashed on the item if there are pending ones + # and we are told to remove the node + crashitem = self.collection[pending.pop(0)] + + # put the remaining items back to the general pending list + self.pending.extend(pending) + # see if some nodes can pick the remaining tests up already + for node in self.node2pending: + self.check_schedule(node) + return crashitem + + def init_distribute(self): + assert self.collection_is_completed + # XXX allow nodes to have different collections + if not self._check_nodes_have_same_collection(): + self.log('**Different tests collected, aborting run**') + return + + # all collections are the same, good. + # we now create an index + self.collection = list(self.node2collection.values())[0] + self.pending[:] = range(len(self.collection)) + if not self.collection: + return + + # how many items per node do we have about? + items_per_node = len(self.collection) // len(self.node2pending) + # take a fraction of tests for initial distribution + node_chunksize = max(items_per_node // 4, 2) + # and initialize each node with a chunk of tests + for node in self.nodes: + self._send_tests(node, node_chunksize) + + #f = open("/tmp/sent", "w") + def _send_tests(self, node, num): + tests_per_node = self.pending[:num] + #print >>self.f, "sent", node, tests_per_node + if tests_per_node: + del self.pending[:num] + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) + + def _check_nodes_have_same_collection(self): + """ + Return True if all nodes have collected the same items, False otherwise. + This method also logs the collection differences as they are found. + """ + node_collection_items = list(self.node2collection.items()) + first_node, col = node_collection_items[0] + same_collection = True + for node, collection in node_collection_items[1:]: + msg = report_collection_diff( + col, + collection, + first_node.gateway.id, + node.gateway.id, + ) + if msg: + self.log(msg) + same_collection = False + + return same_collection + + +def report_collection_diff(from_collection, to_collection, from_id, to_id): + """Report the collected test difference between two nodes. + + :returns: detailed message describing the difference between the given + collections, or None if they are equal. + """ + if from_collection == to_collection: + return None + + diff = difflib.unified_diff( + from_collection, + to_collection, + fromfile=from_id, + tofile=to_id, + ) + error_message = py.builtin._totext( + 'Different tests were collected between {from_id} and {to_id}. ' + 'The difference is:\n' + '{diff}' + ).format(from_id=from_id, to_id=to_id, diff='\n'.join(diff)) + msg = "\n".join([x.rstrip() for x in error_message.split("\n")]) + return msg + + +class Interrupted(KeyboardInterrupt): + """ signals an immediate interruption. """ + +class DSession: + def __init__(self, config): + self.config = config + self.log = py.log.Producer("dsession") + if not config.option.debug: + py.log.setconsumer(self.log._keywords, None) + self.shuttingdown = False + self.countfailures = 0 + self.maxfail = config.getvalue("maxfail") + self.queue = queue.Queue() + self._failed_collection_errors = {} + try: + self.terminal = config.pluginmanager.getplugin("terminalreporter") + except KeyError: + self.terminal = None + else: + self.trdist = TerminalDistReporter(config) + config.pluginmanager.register(self.trdist, "terminaldistreporter") + + def report_line(self, line): + if self.terminal and self.config.option.verbose >= 0: + self.terminal.write_line(line) + + @pytest.mark.trylast + def pytest_sessionstart(self, session): + self.nodemanager = NodeManager(self.config) + self.nodemanager.setup_nodes(putevent=self.queue.put) + + def pytest_sessionfinish(self, session): + """ teardown any resources after a test run. """ + nm = getattr(self, 'nodemanager', None) # if not fully initialized + if nm is not None: + nm.teardown_nodes() + + def pytest_collection(self): + # prohibit collection of test items in master process + return True + + def pytest_runtestloop(self): + numnodes = len(self.nodemanager.specs) + dist = self.config.getvalue("dist") + if dist == "load": + self.sched = LoadScheduling(numnodes, log=self.log) + elif dist == "each": + self.sched = EachScheduling(numnodes, log=self.log) + else: + assert 0, dist + self.shouldstop = False + self.session_finished = False + while not self.session_finished: + self.loop_once() + if self.shouldstop: + raise Interrupted(str(self.shouldstop)) + return True + + def loop_once(self): + """ process one callback from one of the slaves. """ + while 1: + try: + eventcall = self.queue.get(timeout=2.0) + break + except queue.Empty: + continue + callname, kwargs = eventcall + assert callname, kwargs + method = "slave_" + callname + call = getattr(self, method) + self.log("calling method", method, kwargs) + call(**kwargs) + if self.sched.tests_finished(): + self.triggershutdown() + + # + # callbacks for processing events from slaves + # + + def slave_slaveready(self, node, slaveinfo): + node.slaveinfo = slaveinfo + node.slaveinfo['id'] = node.gateway.id + node.slaveinfo['spec'] = node.gateway.spec + self.config.hook.pytest_testnodeready(node=node) + self.sched.addnode(node) + if self.shuttingdown: + node.shutdown() + + def slave_slavefinished(self, node): + self.config.hook.pytest_testnodedown(node=node, error=None) + if node.slaveoutput['exitstatus'] == 2: # keyboard-interrupt + self.shouldstop = "%s received keyboard-interrupt" % (node,) + self.slave_errordown(node, "keyboard-interrupt") + return + crashitem = self.sched.remove_node(node) + assert not crashitem, (crashitem, node) + if self.shuttingdown and not self.sched.hasnodes(): + self.session_finished = True + + def slave_errordown(self, node, error): + self.config.hook.pytest_testnodedown(node=node, error=error) + try: + crashitem = self.sched.remove_node(node) + except KeyError: + pass + else: + if crashitem: + self.handle_crashitem(crashitem, node) + #self.report_line("item crashed on node: %s" % crashitem) + if not self.sched.hasnodes(): + self.session_finished = True + + def slave_collectionfinish(self, node, ids): + self.sched.addnode_collection(node, ids) + if self.terminal: + self.trdist.setstatus(node.gateway.spec, "[%d]" %(len(ids))) + + if self.sched.collection_is_completed: + if self.terminal: + self.trdist.ensure_show_status() + self.terminal.write_line("") + self.terminal.write_line("scheduling tests via %s" %( + self.sched.__class__.__name__)) + + self.sched.init_distribute() + + def slave_logstart(self, node, nodeid, location): + self.config.hook.pytest_runtest_logstart( + nodeid=nodeid, location=location) + + def slave_testreport(self, node, rep): + if not (rep.passed and rep.when != "call"): + if rep.when in ("setup", "call"): + self.sched.remove_item(node, rep.item_index, rep.duration) + #self.report_line("testreport %s: %s" %(rep.id, rep.status)) + rep.node = node + self.config.hook.pytest_runtest_logreport(report=rep) + self._handlefailures(rep) + + def slave_collectreport(self, node, rep): + if rep.failed: + self._failed_slave_collectreport(node, rep) + + def _failed_slave_collectreport(self, node, rep): + # Check we haven't already seen this report (from + # another slave). + if rep.longrepr not in self._failed_collection_errors: + self._failed_collection_errors[rep.longrepr] = True + self.config.hook.pytest_collectreport(report=rep) + self._handlefailures(rep) + + def _handlefailures(self, rep): + if rep.failed: + self.countfailures += 1 + if self.maxfail and self.countfailures >= self.maxfail: + self.shouldstop = "stopping after %d failures" % ( + self.countfailures) + + def triggershutdown(self): + self.log("triggering shutdown") + self.shuttingdown = True + for node in self.sched.node2pending: + node.shutdown() + + def handle_crashitem(self, nodeid, slave): + # XXX get more reporting info by recording pytest_runtest_logstart? + runner = self.config.pluginmanager.getplugin("runner") + fspath = nodeid.split("::")[0] + msg = "Slave %r crashed while running %r" %(slave.gateway.id, nodeid) + rep = runner.TestReport(nodeid, (fspath, None, fspath), (), + "failed", msg, "???") + rep.node = slave + self.config.hook.pytest_runtest_logreport(report=rep) + +class TerminalDistReporter: + def __init__(self, config): + self.config = config + self.tr = config.pluginmanager.getplugin("terminalreporter") + self._status = {} + self._lastlen = 0 + + def write_line(self, msg): + self.tr.write_line(msg) + + def ensure_show_status(self): + if not self.tr.hasmarkup: + self.write_line(self.getstatus()) + + def setstatus(self, spec, status, show=True): + self._status[spec.id] = status + if show and self.tr.hasmarkup: + self.rewrite(self.getstatus()) + + def getstatus(self): + parts = ["%s %s" %(spec.id, self._status[spec.id]) + for spec in self._specs] + return " / ".join(parts) + + def rewrite(self, line, newline=False): + pline = line + " " * max(self._lastlen-len(line), 0) + if newline: + self._lastlen = 0 + pline += "\n" + else: + self._lastlen = len(line) + self.tr.rewrite(pline, bold=True) + + def pytest_xdist_setupnodes(self, specs): + self._specs = specs + for spec in specs: + self.setstatus(spec, "I", show=False) + self.setstatus(spec, "I", show=True) + self.ensure_show_status() + + def pytest_xdist_newgateway(self, gateway): + if self.config.option.verbose > 0: + rinfo = gateway._rinfo() + version = "%s.%s.%s" % rinfo.version_info[:3] + self.rewrite("[%s] %s Python %s cwd: %s" % ( + gateway.id, rinfo.platform, version, rinfo.cwd), + newline=True) + self.setstatus(gateway.spec, "C") + + def pytest_testnodeready(self, node): + if self.config.option.verbose > 0: + d = node.slaveinfo + infoline = "[%s] Python %s" %( + d['id'], + d['version'].replace('\n', ' -- '),) + self.rewrite(infoline, newline=True) + self.setstatus(node.gateway.spec, "ok") + + def pytest_testnodedown(self, node, error): + if not error: + return + self.write_line("[%s] node down: %s" %(node.gateway.id, error)) + + #def pytest_xdist_rsyncstart(self, source, gateways): + # targets = ",".join([gw.id for gw in gateways]) + # msg = "[%s] rsyncing: %s" %(targets, source) + # self.write_line(msg) + #def pytest_xdist_rsyncfinish(self, source, gateways): + # targets = ", ".join(["[%s]" % gw.id for gw in gateways]) + # self.write_line("rsyncfinish: %s -> %s" %(source, targets)) + http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/looponfail.py ---------------------------------------------------------------------- diff --git a/tools/pytest-xdist/xdist/looponfail.py b/tools/pytest-xdist/xdist/looponfail.py new file mode 100644 index 0000000..e5675a2 --- /dev/null +++ b/tools/pytest-xdist/xdist/looponfail.py @@ -0,0 +1,230 @@ +""" + Implement -f aka looponfailing for py.test. + + NOTE that we try to avoid loading and depending on application modules + within the controlling process (the one that starts repeatedly test + processes) otherwise changes to source code can crash + the controlling process which should best never happen. +""" + +import py, pytest +import sys +import execnet + +def looponfail_main(config): + remotecontrol = RemoteControl(config) + rootdirs = config.getini("looponfailroots") + statrecorder = StatRecorder(rootdirs) + try: + while 1: + remotecontrol.loop_once() + if not remotecontrol.failures and remotecontrol.wasfailing: + continue # the last failures passed, let's immediately rerun all + repr_pytest_looponfailinfo( + failreports=remotecontrol.failures, + rootdirs=rootdirs) + statrecorder.waitonchange(checkinterval=2.0) + except KeyboardInterrupt: + print() + +class RemoteControl(object): + def __init__(self, config): + self.config = config + self.failures = [] + + def trace(self, *args): + if self.config.option.debug: + msg = " ".join([str(x) for x in args]) + py.builtin.print_("RemoteControl:", msg) + + def initgateway(self): + return execnet.makegateway("popen") + + def setup(self, out=None): + if out is None: + out = py.io.TerminalWriter() + if hasattr(self, 'gateway'): + raise ValueError("already have gateway %r" % self.gateway) + self.trace("setting up slave session") + self.gateway = self.initgateway() + self.channel = channel = self.gateway.remote_exec(init_slave_session, + args=self.config.args, + option_dict=vars(self.config.option), + ) + remote_outchannel = channel.receive() + def write(s): + out._file.write(s) + out._file.flush() + remote_outchannel.setcallback(write) + + def ensure_teardown(self): + if hasattr(self, 'channel'): + if not self.channel.isclosed(): + self.trace("closing", self.channel) + self.channel.close() + del self.channel + if hasattr(self, 'gateway'): + self.trace("exiting", self.gateway) + self.gateway.exit() + del self.gateway + + def runsession(self): + try: + self.trace("sending", self.failures) + self.channel.send(self.failures) + try: + return self.channel.receive() + except self.channel.RemoteError: + e = sys.exc_info()[1] + self.trace("ERROR", e) + raise + finally: + self.ensure_teardown() + + def loop_once(self): + self.setup() + self.wasfailing = self.failures and len(self.failures) + result = self.runsession() + failures, reports, collection_failed = result + if collection_failed: + pass # "Collection failed, keeping previous failure set" + else: + uniq_failures = [] + for failure in failures: + if failure not in uniq_failures: + uniq_failures.append(failure) + self.failures = uniq_failures + +def repr_pytest_looponfailinfo(failreports, rootdirs): + tr = py.io.TerminalWriter() + if failreports: + tr.sep("#", "LOOPONFAILING", bold=True) + for report in failreports: + if report: + tr.line(report, red=True) + tr.sep("#", "waiting for changes", bold=True) + for rootdir in rootdirs: + tr.line("### Watching: %s" %(rootdir,), bold=True) + + +def init_slave_session(channel, args, option_dict): + import os, sys + outchannel = channel.gateway.newchannel() + sys.stdout = sys.stderr = outchannel.makefile('w') + channel.send(outchannel) + # prune sys.path to not contain relative paths + newpaths = [] + for p in sys.path: + if p: + if not os.path.isabs(p): + p = os.path.abspath(p) + newpaths.append(p) + sys.path[:] = newpaths + + #fullwidth, hasmarkup = channel.receive() + from _pytest.config import Config + config = Config.fromdictargs(option_dict, list(args)) + config.args = args + from xdist.looponfail import SlaveFailSession + SlaveFailSession(config, channel).main() + +class SlaveFailSession: + def __init__(self, config, channel): + self.config = config + self.channel = channel + self.recorded_failures = [] + self.collection_failed = False + config.pluginmanager.register(self) + config.option.looponfail = False + config.option.usepdb = False + + def DEBUG(self, *args): + if self.config.option.debug: + print(" ".join(map(str, args))) + + def pytest_collection(self, session): + self.session = session + self.trails = self.current_command + hook = self.session.ihook + try: + items = session.perform_collect(self.trails or None) + except pytest.UsageError: + items = session.perform_collect(None) + hook.pytest_collection_modifyitems(session=session, config=session.config, items=items) + hook.pytest_collection_finish(session=session) + return True + + def pytest_runtest_logreport(self, report): + if report.failed: + self.recorded_failures.append(report) + + def pytest_collectreport(self, report): + if report.failed: + self.recorded_failures.append(report) + self.collection_failed = True + + def main(self): + self.DEBUG("SLAVE: received configuration, waiting for command trails") + try: + command = self.channel.receive() + except KeyboardInterrupt: + return # in the slave we can't do much about this + self.DEBUG("received", command) + self.current_command = command + self.config.hook.pytest_cmdline_main(config=self.config) + trails, failreports = [], [] + for rep in self.recorded_failures: + trails.append(rep.nodeid) + loc = rep.longrepr + loc = str(getattr(loc, 'reprcrash', loc)) + failreports.append(loc) + self.channel.send((trails, failreports, self.collection_failed)) + +class StatRecorder: + def __init__(self, rootdirlist): + self.rootdirlist = rootdirlist + self.statcache = {} + self.check() # snapshot state + + def fil(self, p): + return p.check(file=1, dotfile=0) and p.ext != ".pyc" + def rec(self, p): + return p.check(dotfile=0) + + def waitonchange(self, checkinterval=1.0): + while 1: + changed = self.check() + if changed: + return + py.std.time.sleep(checkinterval) + + def check(self, removepycfiles=True): + changed = False + statcache = self.statcache + newstat = {} + for rootdir in self.rootdirlist: + for path in rootdir.visit(self.fil, self.rec): + oldstat = statcache.pop(path, None) + try: + newstat[path] = curstat = path.stat() + except py.error.ENOENT: + if oldstat: + changed = True + else: + if oldstat: + if oldstat.mtime != curstat.mtime or \ + oldstat.size != curstat.size: + changed = True + py.builtin.print_("# MODIFIED", path) + if removepycfiles and path.ext == ".py": + pycfile = path + "c" + if pycfile.check(): + pycfile.remove() + + else: + changed = True + if statcache: + changed = True + self.statcache = newstat + return changed + http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/newhooks.py ---------------------------------------------------------------------- diff --git a/tools/pytest-xdist/xdist/newhooks.py b/tools/pytest-xdist/xdist/newhooks.py new file mode 100644 index 0000000..2034617 --- /dev/null +++ b/tools/pytest-xdist/xdist/newhooks.py @@ -0,0 +1,21 @@ + +def pytest_xdist_setupnodes(config, specs): + """ called before any remote node is set up. """ + +def pytest_xdist_newgateway(gateway): + """ called on new raw gateway creation. """ + +def pytest_xdist_rsyncstart(source, gateways): + """ called before rsyncing a directory to remote gateways takes place. """ + +def pytest_xdist_rsyncfinish(source, gateways): + """ called after rsyncing a directory to remote gateways takes place. """ + +def pytest_configure_node(node): + """ configure node information before it gets instantiated. """ + +def pytest_testnodeready(node): + """ Test Node is ready to operate. """ + +def pytest_testnodedown(node, error): + """ Test Node is down. """ http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/plugin.py ---------------------------------------------------------------------- diff --git a/tools/pytest-xdist/xdist/plugin.py b/tools/pytest-xdist/xdist/plugin.py new file mode 100644 index 0000000..bc32104 --- /dev/null +++ b/tools/pytest-xdist/xdist/plugin.py @@ -0,0 +1,131 @@ +import py +import pytest + +def pytest_addoption(parser): + group = parser.getgroup("xdist", "distributed and subprocess testing") + group._addoption('-f', '--looponfail', + action="store_true", dest="looponfail", default=False, + help="run tests in subprocess, wait for modified files " + "and re-run failing test set until all pass.") + group._addoption('-n', dest="numprocesses", metavar="numprocesses", + action="store", type="int", + help="shortcut for '--dist=load --tx=NUM*popen'") + group.addoption('--boxed', + action="store_true", dest="boxed", default=False, + help="box each test run in a separate process (unix)") + group._addoption('--dist', metavar="distmode", + action="store", choices=['load', 'each', 'no'], + type="choice", dest="dist", default="no", + help=("set mode for distributing tests to exec environments.\n\n" + "each: send each test to each available environment.\n\n" + "load: send each test to available environment.\n\n" + "(default) no: run tests inprocess, don't distribute.")) + group._addoption('--tx', dest="tx", action="append", default=[], + metavar="xspec", + help=("add a test execution environment. some examples: " + "--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 " + "--tx [email protected]//chdir=testcache")) + group._addoption('-d', + action="store_true", dest="distload", default=False, + help="load-balance tests. shortcut for '--dist=load'") + group.addoption('--rsyncdir', action="append", default=[], metavar="DIR", + help="add directory for rsyncing to remote tx nodes.") + group.addoption('--rsyncignore', action="append", default=[], metavar="GLOB", + help="add expression for ignores when rsyncing to remote tx nodes.") + + parser.addini('rsyncdirs', 'list of (relative) paths to be rsynced for' + ' remote distributed testing.', type="pathlist") + parser.addini('rsyncignore', 'list of (relative) glob-style paths to be ignored ' + 'for rsyncing.', type="pathlist") + parser.addini("looponfailroots", type="pathlist", + help="directories to check for changes", default=[py.path.local()]) + +# ------------------------------------------------------------------------- +# distributed testing hooks +# ------------------------------------------------------------------------- +def pytest_addhooks(pluginmanager): + from xdist import newhooks + pluginmanager.addhooks(newhooks) + +# ------------------------------------------------------------------------- +# distributed testing initialization +# ------------------------------------------------------------------------- + +def pytest_cmdline_main(config): + check_options(config) + if config.getoption("looponfail"): + from xdist.looponfail import looponfail_main + looponfail_main(config) + return 2 # looponfail only can get stop with ctrl-C anyway + +def pytest_configure(config, __multicall__): + __multicall__.execute() + if config.getoption("dist") != "no": + from xdist.dsession import DSession + session = DSession(config) + config.pluginmanager.register(session, "dsession") + tr = config.pluginmanager.getplugin("terminalreporter") + tr.showfspath = False + +def check_options(config): + if config.option.numprocesses: + config.option.dist = "load" + config.option.tx = ['popen'] * int(config.option.numprocesses) + if config.option.distload: + config.option.dist = "load" + val = config.getvalue + if not val("collectonly"): + usepdb = config.option.usepdb # a core option + if val("looponfail"): + if usepdb: + raise pytest.UsageError("--pdb incompatible with --looponfail.") + elif val("dist") != "no": + if usepdb: + raise pytest.UsageError("--pdb incompatible with distributing tests.") + + +def pytest_runtest_protocol(item): + if item.config.getvalue("boxed"): + reports = forked_run_report(item) + for rep in reports: + item.ihook.pytest_runtest_logreport(report=rep) + return True + +def forked_run_report(item): + # for now, we run setup/teardown in the subprocess + # XXX optionally allow sharing of setup/teardown + from _pytest.runner import runtestprotocol + EXITSTATUS_TESTEXIT = 4 + import marshal + from xdist.remote import serialize_report + from xdist.slavemanage import unserialize_report + def runforked(): + try: + reports = runtestprotocol(item, log=False) + except KeyboardInterrupt: + py.std.os._exit(EXITSTATUS_TESTEXIT) + return marshal.dumps([serialize_report(x) for x in reports]) + + ff = py.process.ForkedFunc(runforked) + result = ff.waitfinish() + if result.retval is not None: + report_dumps = marshal.loads(result.retval) + return [unserialize_report("testreport", x) for x in report_dumps] + else: + if result.exitstatus == EXITSTATUS_TESTEXIT: + py.test.exit("forked test item %s raised Exit" %(item,)) + return [report_process_crash(item, result)] + +def report_process_crash(item, result): + path, lineno = item._getfslineno() + info = ("%s:%s: running the test CRASHED with signal %d" % + (path, lineno, result.signal)) + from _pytest import runner + call = runner.CallInfo(lambda: 0/0, "???") + call.excinfo = info + rep = runner.pytest_runtest_makereport(item, call) + if result.out: + rep.sections.append(("captured stdout", result.out)) + if result.err: + rep.sections.append(("captured stderr", result.err)) + return rep http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/remote.py ---------------------------------------------------------------------- diff --git a/tools/pytest-xdist/xdist/remote.py b/tools/pytest-xdist/xdist/remote.py new file mode 100644 index 0000000..a0b2cad --- /dev/null +++ b/tools/pytest-xdist/xdist/remote.py @@ -0,0 +1,147 @@ +""" + This module is executed in remote subprocesses and helps to + control a remote testing session and relay back information. + It assumes that 'py' is importable and does not have dependencies + on the rest of the xdist code. This means that the xdist-plugin + needs not to be installed in remote environments. +""" + +import sys, os + +class SlaveInteractor: + def __init__(self, config, channel): + self.config = config + self.slaveid = config.slaveinput.get('slaveid', "?") + self.log = py.log.Producer("slave-%s" % self.slaveid) + if not config.option.debug: + py.log.setconsumer(self.log._keywords, None) + self.channel = channel + config.pluginmanager.register(self) + + def sendevent(self, name, **kwargs): + self.log("sending", name, kwargs) + self.channel.send((name, kwargs)) + + def pytest_internalerror(self, excrepr): + for line in str(excrepr).split("\n"): + self.log("IERROR>", line) + + def pytest_sessionstart(self, session): + self.session = session + slaveinfo = getinfodict() + self.sendevent("slaveready", slaveinfo=slaveinfo) + + def pytest_sessionfinish(self, __multicall__, exitstatus): + self.config.slaveoutput['exitstatus'] = exitstatus + res = __multicall__.execute() + self.sendevent("slavefinished", slaveoutput=self.config.slaveoutput) + return res + + def pytest_collection(self, session): + self.sendevent("collectionstart") + + def pytest_runtestloop(self, session): + self.log("entering main loop") + torun = [] + while 1: + name, kwargs = self.channel.receive() + self.log("received command", name, kwargs) + if name == "runtests": + torun.extend(kwargs['indices']) + elif name == "runtests_all": + torun.extend(range(len(session.items))) + self.log("items to run:", torun) + # only run if we have an item and a next item + while len(torun) >= 2: + self.run_tests(torun) + if name == "shutdown": + if torun: + self.run_tests(torun) + break + return True + + def run_tests(self, torun): + items = self.session.items + self.item_index = torun.pop(0) + if torun: + nextitem = items[torun[0]] + else: + nextitem = None + self.config.hook.pytest_runtest_protocol( + item=items[self.item_index], + nextitem=nextitem) + + def pytest_collection_finish(self, session): + self.sendevent("collectionfinish", + topdir=str(session.fspath), + ids=[item.nodeid for item in session.items]) + + def pytest_runtest_logstart(self, nodeid, location): + self.sendevent("logstart", nodeid=nodeid, location=location) + + def pytest_runtest_logreport(self, report): + data = serialize_report(report) + data["item_index"] = self.item_index + assert self.session.items[self.item_index].nodeid == report.nodeid + self.sendevent("testreport", data=data) + + def pytest_collectreport(self, report): + data = serialize_report(report) + self.sendevent("collectreport", data=data) + +def serialize_report(rep): + import py + d = rep.__dict__.copy() + if hasattr(rep.longrepr, 'toterminal'): + d['longrepr'] = str(rep.longrepr) + else: + d['longrepr'] = rep.longrepr + for name in d: + if isinstance(d[name], py.path.local): + d[name] = str(d[name]) + elif name == "result": + d[name] = None # for now + return d + +def getinfodict(): + import platform + return dict( + version = sys.version, + version_info = tuple(sys.version_info), + sysplatform = sys.platform, + platform = platform.platform(), + executable = sys.executable, + cwd = os.getcwd(), + ) + +def remote_initconfig(option_dict, args): + from _pytest.config import Config + option_dict['plugins'].append("no:terminal") + config = Config.fromdictargs(option_dict, args) + config.option.looponfail = False + config.option.usepdb = False + config.option.dist = "no" + config.option.distload = False + config.option.numprocesses = None + config.args = args + return config + + +if __name__ == '__channelexec__': + channel = channel # noqa + # python3.2 is not concurrent import safe, so let's play it safe + # https://bitbucket.org/hpk42/pytest/issue/347/pytest-xdist-and-python-32 + if sys.version_info[:2] == (3,2): + os.environ["PYTHONDONTWRITEBYTECODE"] = "1" + slaveinput,args,option_dict = channel.receive() + importpath = os.getcwd() + sys.path.insert(0, importpath) # XXX only for remote situations + os.environ['PYTHONPATH'] = (importpath + os.pathsep + + os.environ.get('PYTHONPATH', '')) + #os.environ['PYTHONPATH'] = importpath + import py + config = remote_initconfig(option_dict, args) + config.slaveinput = slaveinput + config.slaveoutput = {} + interactor = SlaveInteractor(config, channel) + config.hook.pytest_cmdline_main(config=config) http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/slavemanage.py ---------------------------------------------------------------------- diff --git a/tools/pytest-xdist/xdist/slavemanage.py b/tools/pytest-xdist/xdist/slavemanage.py new file mode 100644 index 0000000..4f3b8c0 --- /dev/null +++ b/tools/pytest-xdist/xdist/slavemanage.py @@ -0,0 +1,316 @@ +import fnmatch +import os + +import py +import pytest +import execnet +import xdist.remote + +from _pytest import runner # XXX load dynamically + +class NodeManager(object): + EXIT_TIMEOUT = 10 + DEFAULT_IGNORES = ['.*', '*.pyc', '*.pyo', '*~'] + def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"): + self.config = config + self._nodesready = py.std.threading.Event() + self.trace = self.config.trace.get("nodemanager") + self.group = execnet.Group() + if specs is None: + specs = self._getxspecs() + self.specs = [] + for spec in specs: + if not isinstance(spec, execnet.XSpec): + spec = execnet.XSpec(spec) + if not spec.chdir and not spec.popen: + spec.chdir = defaultchdir + self.group.allocate_id(spec) + self.specs.append(spec) + self.roots = self._getrsyncdirs() + self.rsyncoptions = self._getrsyncoptions() + + def rsync_roots(self): + """ make sure that all remote gateways + have the same set of roots in their + current directory. + """ + if self.roots: + # send each rsync root + for root in self.roots: + self.rsync(root, **self.rsyncoptions) + + def makegateways(self): + assert not list(self.group) + self.config.hook.pytest_xdist_setupnodes(config=self.config, + specs=self.specs) + for spec in self.specs: + gw = self.group.makegateway(spec) + self.config.hook.pytest_xdist_newgateway(gateway=gw) + + def setup_nodes(self, putevent): + self.makegateways() + self.rsync_roots() + self.trace("setting up nodes") + for gateway in self.group: + node = SlaveController(self, gateway, self.config, putevent) + gateway.node = node # to keep node alive + node.setup() + self.trace("started node %r" % node) + + def teardown_nodes(self): + self.group.terminate(self.EXIT_TIMEOUT) + + def _getxspecs(self): + xspeclist = [] + for xspec in self.config.getvalue("tx"): + i = xspec.find("*") + try: + num = int(xspec[:i]) + except ValueError: + xspeclist.append(xspec) + else: + xspeclist.extend([xspec[i+1:]] * num) + if not xspeclist: + raise pytest.UsageError( + "MISSING test execution (tx) nodes: please specify --tx") + return [execnet.XSpec(x) for x in xspeclist] + + def _getrsyncdirs(self): + for spec in self.specs: + if not spec.popen or spec.chdir: + break + else: + return [] + import pytest, _pytest + pytestpath = pytest.__file__.rstrip("co") + pytestdir = py.path.local(_pytest.__file__).dirpath() + config = self.config + candidates = [py._pydir,pytestpath,pytestdir] + candidates += config.option.rsyncdir + rsyncroots = config.getini("rsyncdirs") + if rsyncroots: + candidates.extend(rsyncroots) + roots = [] + for root in candidates: + root = py.path.local(root).realpath() + if not root.check(): + raise pytest.UsageError("rsyncdir doesn't exist: %r" %(root,)) + if root not in roots: + roots.append(root) + return roots + + def _getrsyncoptions(self): + """Get options to be passed for rsync.""" + ignores = list(self.DEFAULT_IGNORES) + ignores += self.config.option.rsyncignore + ignores += self.config.getini("rsyncignore") + + return { + 'ignores': ignores, + 'verbose': self.config.option.verbose, + } + + + def rsync(self, source, notify=None, verbose=False, ignores=None): + """ perform rsync to all remote hosts. + """ + rsync = HostRSync(source, verbose=verbose, ignores=ignores) + seen = py.builtin.set() + gateways = [] + for gateway in self.group: + spec = gateway.spec + if spec.popen and not spec.chdir: + # XXX this assumes that sources are python-packages + # and that adding the basedir does not hurt + gateway.remote_exec(""" + import sys ; sys.path.insert(0, %r) + """ % os.path.dirname(str(source))).waitclose() + continue + if spec not in seen: + def finished(): + if notify: + notify("rsyncrootready", spec, source) + rsync.add_target_host(gateway, finished=finished) + seen.add(spec) + gateways.append(gateway) + if seen: + self.config.hook.pytest_xdist_rsyncstart( + source=source, + gateways=gateways, + ) + rsync.send() + self.config.hook.pytest_xdist_rsyncfinish( + source=source, + gateways=gateways, + ) + +class HostRSync(execnet.RSync): + """ RSyncer that filters out common files + """ + def __init__(self, sourcedir, *args, **kwargs): + self._synced = {} + ignores= None + if 'ignores' in kwargs: + ignores = kwargs.pop('ignores') + self._ignores = ignores or [] + super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs) + + def filter(self, path): + path = py.path.local(path) + for x in self._ignores: + x = getattr(x, 'strpath', x) + if fnmatch.fnmatch(path.basename, x) or fnmatch.fnmatch(path.strpath, x): + return False + else: + return True + + def add_target_host(self, gateway, finished=None): + remotepath = os.path.basename(self._sourcedir) + super(HostRSync, self).add_target(gateway, remotepath, + finishedcallback=finished, + delete=True,) + + def _report_send_file(self, gateway, modified_rel_path): + if self._verbose: + path = os.path.basename(self._sourcedir) + "/" + modified_rel_path + remotepath = gateway.spec.chdir + py.builtin.print_('%s:%s <= %s' % + (gateway.spec, remotepath, path)) + + +def make_reltoroot(roots, args): + # XXX introduce/use public API for splitting py.test args + splitcode = "::" + l = [] + for arg in args: + parts = arg.split(splitcode) + fspath = py.path.local(parts[0]) + for root in roots: + x = fspath.relto(root) + if x or fspath == root: + parts[0] = root.basename + "/" + x + break + else: + raise ValueError("arg %s not relative to an rsync root" % (arg,)) + l.append(splitcode.join(parts)) + return l + +class SlaveController(object): + ENDMARK = -1 + + def __init__(self, nodemanager, gateway, config, putevent): + self.nodemanager = nodemanager + self.putevent = putevent + self.gateway = gateway + self.config = config + self.slaveinput = {'slaveid': gateway.id} + self._down = False + self.log = py.log.Producer("slavectl-%s" % gateway.id) + if not self.config.option.debug: + py.log.setconsumer(self.log._keywords, None) + + def __repr__(self): + return "<%s %s>" %(self.__class__.__name__, self.gateway.id,) + + def setup(self): + self.log("setting up slave session") + spec = self.gateway.spec + args = self.config.args + if not spec.popen or spec.chdir: + args = make_reltoroot(self.nodemanager.roots, args) + option_dict = vars(self.config.option) + if spec.popen: + name = "popen-%s" % self.gateway.id + basetemp = self.config._tmpdirhandler.getbasetemp() + option_dict['basetemp'] = str(basetemp.join(name)) + self.config.hook.pytest_configure_node(node=self) + self.channel = self.gateway.remote_exec(xdist.remote) + self.channel.send((self.slaveinput, args, option_dict)) + if self.putevent: + self.channel.setcallback(self.process_from_remote, + endmarker=self.ENDMARK) + + def ensure_teardown(self): + if hasattr(self, 'channel'): + if not self.channel.isclosed(): + self.log("closing", self.channel) + self.channel.close() + #del self.channel + if hasattr(self, 'gateway'): + self.log("exiting", self.gateway) + self.gateway.exit() + #del self.gateway + + def send_runtest_some(self, indices): + self.sendcommand("runtests", indices=indices) + + def send_runtest_all(self): + self.sendcommand("runtests_all",) + + def shutdown(self): + if not self._down: + try: + self.sendcommand("shutdown") + except IOError: + pass + + def sendcommand(self, name, **kwargs): + """ send a named parametrized command to the other side. """ + self.log("sending command %s(**%s)" % (name, kwargs)) + self.channel.send((name, kwargs)) + + def notify_inproc(self, eventname, **kwargs): + self.log("queuing %s(**%s)" % (eventname, kwargs)) + self.putevent((eventname, kwargs)) + + def process_from_remote(self, eventcall): + """ this gets called for each object we receive from + the other side and if the channel closes. + + Note that channel callbacks run in the receiver + thread of execnet gateways - we need to + avoid raising exceptions or doing heavy work. + """ + try: + if eventcall == self.ENDMARK: + err = self.channel._getremoteerror() + if not self._down: + if not err or isinstance(err, EOFError): + err = "Not properly terminated" # lost connection? + self.notify_inproc("errordown", node=self, error=err) + self._down = True + return + eventname, kwargs = eventcall + if eventname in ("collectionstart"): + self.log("ignoring %s(%s)" %(eventname, kwargs)) + elif eventname == "slaveready": + self.notify_inproc(eventname, node=self, **kwargs) + elif eventname == "slavefinished": + self._down = True + self.slaveoutput = kwargs['slaveoutput'] + self.notify_inproc("slavefinished", node=self) + elif eventname == "logstart": + self.notify_inproc(eventname, node=self, **kwargs) + elif eventname in ("testreport", "collectreport", "teardownreport"): + item_index = kwargs.pop("item_index", None) + rep = unserialize_report(eventname, kwargs['data']) + if item_index is not None: + rep.item_index = item_index + self.notify_inproc(eventname, node=self, rep=rep) + elif eventname == "collectionfinish": + self.notify_inproc(eventname, node=self, ids=kwargs['ids']) + else: + raise ValueError("unknown event: %s" %(eventname,)) + except KeyboardInterrupt: + # should not land in receiver-thread + raise + except: + excinfo = py.code.ExceptionInfo() + py.builtin.print_("!" * 20, excinfo) + self.config.pluginmanager.notify_exception(excinfo) + +def unserialize_report(name, reportdict): + if name == "testreport": + return runner.TestReport(**reportdict) + elif name == "collectreport": + return runner.CollectReport(**reportdict)
