Signed-off-by: Lucas Meneghel Rodrigues <[email protected]>
---
client/tests/kvm/control.parallel | 4 +-
client/virt/scheduler.py | 229 ++++++++++++++++++++++++++++++++++++++
client/virt/virt_scheduler.py | 229 --------------------------------------
3 files changed, 231 insertions(+), 231 deletions(-)
create mode 100644 client/virt/scheduler.py
delete mode 100644 client/virt/virt_scheduler.py
diff --git a/client/tests/kvm/control.parallel
b/client/tests/kvm/control.parallel
index ca29c9b..a33365b 100644
--- a/client/tests/kvm/control.parallel
+++ b/client/tests/kvm/control.parallel
@@ -174,7 +174,7 @@ tests = list(parser.get_dicts())
# -------------
# Run the tests
# -------------
-from autotest.client.virt import virt_scheduler
+from autotest.client.virt import scheduler
from autotest.client import utils
# total_cpus defaults to the number of CPUs reported by /proc/cpuinfo
@@ -185,7 +185,7 @@ total_mem = int(commands.getoutput("free
-m").splitlines()[1].split()[1]) * 3/4
num_workers = total_cpus
# Start the scheduler and workers
-s = virt_scheduler.scheduler(tests, num_workers, total_cpus, total_mem, pwd)
+s = scheduler.scheduler(tests, num_workers, total_cpus, total_mem, pwd)
job.parallel([s.scheduler],
*[(s.worker, i, job.run_test) for i in range(num_workers)])
diff --git a/client/virt/scheduler.py b/client/virt/scheduler.py
new file mode 100644
index 0000000..8353e5f
--- /dev/null
+++ b/client/virt/scheduler.py
@@ -0,0 +1,229 @@
+import os, select
+import virt_utils, virt_vm, aexpect
+
+
+class scheduler:
+ """
+ A scheduler that manages several parallel test execution pipelines on a
+ single host.
+ """
+
+ def __init__(self, tests, num_workers, total_cpus, total_mem, bindir):
+ """
+ Initialize the class.
+
+ @param tests: A list of test dictionaries.
+ @param num_workers: The number of workers (pipelines).
+ @param total_cpus: The total number of CPUs to dedicate to tests.
+ @param total_mem: The total amount of memory to dedicate to tests.
+ @param bindir: The directory where environment files reside.
+ """
+ self.tests = tests
+ self.num_workers = num_workers
+ self.total_cpus = total_cpus
+ self.total_mem = total_mem
+ self.bindir = bindir
+ # Pipes -- s stands for scheduler, w stands for worker
+ self.s2w = [os.pipe() for i in range(num_workers)]
+ self.w2s = [os.pipe() for i in range(num_workers)]
+ self.s2w_r = [os.fdopen(r, "r", 0) for r, w in self.s2w]
+ self.s2w_w = [os.fdopen(w, "w", 0) for r, w in self.s2w]
+ self.w2s_r = [os.fdopen(r, "r", 0) for r, w in self.w2s]
+ self.w2s_w = [os.fdopen(w, "w", 0) for r, w in self.w2s]
+ # "Personal" worker dicts contain modifications that are applied
+ # specifically to each worker. For example, each worker must use a
+ # different environment file and a different MAC address pool.
+ self.worker_dicts = [{"env": "env%d" % i} for i in range(num_workers)]
+
+
+ def worker(self, index, run_test_func):
+ """
+ The worker function.
+
+ Waits for commands from the scheduler and processes them.
+
+ @param index: The index of this worker (in the range 0..num_workers-1).
+ @param run_test_func: A function to be called to run a test
+ (e.g. job.run_test).
+ """
+ r = self.s2w_r[index]
+ w = self.w2s_w[index]
+ self_dict = self.worker_dicts[index]
+
+ # Inform the scheduler this worker is ready
+ w.write("ready\n")
+
+ while True:
+ cmd = r.readline().split()
+ if not cmd:
+ continue
+
+ # The scheduler wants this worker to run a test
+ if cmd[0] == "run":
+ test_index = int(cmd[1])
+ test = self.tests[test_index].copy()
+ test.update(self_dict)
+ test_iterations = int(test.get("iterations", 1))
+ status = run_test_func("kvm", params=test,
+ tag=test.get("shortname"),
+ iterations=test_iterations)
+ w.write("done %s %s\n" % (test_index, status))
+ w.write("ready\n")
+
+ # The scheduler wants this worker to free its used resources
+ elif cmd[0] == "cleanup":
+ env_filename = os.path.join(self.bindir, self_dict["env"])
+ env = virt_utils.Env(env_filename)
+ for obj in env.values():
+ if isinstance(obj, virt_vm.VM):
+ obj.destroy()
+ elif isinstance(obj, aexpect.Spawn):
+ obj.close()
+ env.save()
+ w.write("cleanup_done\n")
+ w.write("ready\n")
+
+ # There's no more work for this worker
+ elif cmd[0] == "terminate":
+ break
+
+
+ def scheduler(self):
+ """
+ The scheduler function.
+
+ Sends commands to workers, telling them to run tests, clean up or
+ terminate execution.
+ """
+ idle_workers = []
+ closing_workers = []
+ test_status = ["waiting"] * len(self.tests)
+ test_worker = [None] * len(self.tests)
+ used_cpus = [0] * self.num_workers
+ used_mem = [0] * self.num_workers
+
+ while True:
+ # Wait for a message from a worker
+ r, w, x = select.select(self.w2s_r, [], [])
+
+ someone_is_ready = False
+
+ for pipe in r:
+ worker_index = self.w2s_r.index(pipe)
+ msg = pipe.readline().split()
+ if not msg:
+ continue
+
+ # A worker is ready -- add it to the idle_workers list
+ if msg[0] == "ready":
+ idle_workers.append(worker_index)
+ someone_is_ready = True
+
+ # A worker completed a test
+ elif msg[0] == "done":
+ test_index = int(msg[1])
+ test = self.tests[test_index]
+ status = int(eval(msg[2]))
+ test_status[test_index] = ("fail", "pass")[status]
+ # If the test failed, mark all dependent tests as "failed"
too
+ if not status:
+ for i, other_test in enumerate(self.tests):
+ for dep in other_test.get("dep", []):
+ if dep in test["name"]:
+ test_status[i] = "fail"
+
+ # A worker is done shutting down its VMs and other processes
+ elif msg[0] == "cleanup_done":
+ used_cpus[worker_index] = 0
+ used_mem[worker_index] = 0
+ closing_workers.remove(worker_index)
+
+ if not someone_is_ready:
+ continue
+
+ for worker in idle_workers[:]:
+ # Find a test for this worker
+ test_found = False
+ for i, test in enumerate(self.tests):
+ # We only want "waiting" tests
+ if test_status[i] != "waiting":
+ continue
+ # Make sure the test isn't assigned to another worker
+ if test_worker[i] is not None and test_worker[i] != worker:
+ continue
+ # Make sure the test's dependencies are satisfied
+ dependencies_satisfied = True
+ for dep in test["dep"]:
+ dependencies = [j for j, t in enumerate(self.tests)
+ if dep in t["name"]]
+ bad_status_deps = [j for j in dependencies
+ if test_status[j] != "pass"]
+ if bad_status_deps:
+ dependencies_satisfied = False
+ break
+ if not dependencies_satisfied:
+ continue
+ # Make sure we have enough resources to run the test
+ test_used_cpus = int(test.get("used_cpus", 1))
+ test_used_mem = int(test.get("used_mem", 128))
+ # First make sure the other workers aren't using too many
+ # CPUs (not including the workers currently shutting down)
+ uc = (sum(used_cpus) - used_cpus[worker] -
+ sum(used_cpus[i] for i in closing_workers))
+ if uc and uc + test_used_cpus > self.total_cpus:
+ continue
+ # ... or too much memory
+ um = (sum(used_mem) - used_mem[worker] -
+ sum(used_mem[i] for i in closing_workers))
+ if um and um + test_used_mem > self.total_mem:
+ continue
+ # If we reached this point it means there are, or will
+ # soon be, enough resources to run the test
+ test_found = True
+ # Now check if the test can be run right now, i.e. if the
+ # other workers, including the ones currently shutting
+ # down, aren't using too many CPUs
+ uc = (sum(used_cpus) - used_cpus[worker])
+ if uc and uc + test_used_cpus > self.total_cpus:
+ continue
+ # ... or too much memory
+ um = (sum(used_mem) - used_mem[worker])
+ if um and um + test_used_mem > self.total_mem:
+ continue
+ # Everything is OK -- run the test
+ test_status[i] = "running"
+ test_worker[i] = worker
+ idle_workers.remove(worker)
+ # Update used_cpus and used_mem
+ used_cpus[worker] = test_used_cpus
+ used_mem[worker] = test_used_mem
+ # Assign all related tests to this worker
+ for j, other_test in enumerate(self.tests):
+ for other_dep in other_test["dep"]:
+ # All tests that depend on this test
+ if other_dep in test["name"]:
+ test_worker[j] = worker
+ break
+ # ... and all tests that share a dependency
+ # with this test
+ for dep in test["dep"]:
+ if dep in other_dep or other_dep in dep:
+ test_worker[j] = worker
+ break
+ # Tell the worker to run the test
+ self.s2w_w[worker].write("run %s\n" % i)
+ break
+
+ # If there won't be any tests for this worker to run soon, tell
+ # the worker to free its used resources
+ if not test_found and (used_cpus[worker] or used_mem[worker]):
+ self.s2w_w[worker].write("cleanup\n")
+ idle_workers.remove(worker)
+ closing_workers.append(worker)
+
+ # If there are no more new tests to run, terminate the workers and
+ # the scheduler
+ if len(idle_workers) == self.num_workers:
+ for worker in idle_workers:
+ self.s2w_w[worker].write("terminate\n")
+ break
diff --git a/client/virt/virt_scheduler.py b/client/virt/virt_scheduler.py
deleted file mode 100644
index 8353e5f..0000000
--- a/client/virt/virt_scheduler.py
+++ /dev/null
@@ -1,229 +0,0 @@
-import os, select
-import virt_utils, virt_vm, aexpect
-
-
-class scheduler:
- """
- A scheduler that manages several parallel test execution pipelines on a
- single host.
- """
-
- def __init__(self, tests, num_workers, total_cpus, total_mem, bindir):
- """
- Initialize the class.
-
- @param tests: A list of test dictionaries.
- @param num_workers: The number of workers (pipelines).
- @param total_cpus: The total number of CPUs to dedicate to tests.
- @param total_mem: The total amount of memory to dedicate to tests.
- @param bindir: The directory where environment files reside.
- """
- self.tests = tests
- self.num_workers = num_workers
- self.total_cpus = total_cpus
- self.total_mem = total_mem
- self.bindir = bindir
- # Pipes -- s stands for scheduler, w stands for worker
- self.s2w = [os.pipe() for i in range(num_workers)]
- self.w2s = [os.pipe() for i in range(num_workers)]
- self.s2w_r = [os.fdopen(r, "r", 0) for r, w in self.s2w]
- self.s2w_w = [os.fdopen(w, "w", 0) for r, w in self.s2w]
- self.w2s_r = [os.fdopen(r, "r", 0) for r, w in self.w2s]
- self.w2s_w = [os.fdopen(w, "w", 0) for r, w in self.w2s]
- # "Personal" worker dicts contain modifications that are applied
- # specifically to each worker. For example, each worker must use a
- # different environment file and a different MAC address pool.
- self.worker_dicts = [{"env": "env%d" % i} for i in range(num_workers)]
-
-
- def worker(self, index, run_test_func):
- """
- The worker function.
-
- Waits for commands from the scheduler and processes them.
-
- @param index: The index of this worker (in the range 0..num_workers-1).
- @param run_test_func: A function to be called to run a test
- (e.g. job.run_test).
- """
- r = self.s2w_r[index]
- w = self.w2s_w[index]
- self_dict = self.worker_dicts[index]
-
- # Inform the scheduler this worker is ready
- w.write("ready\n")
-
- while True:
- cmd = r.readline().split()
- if not cmd:
- continue
-
- # The scheduler wants this worker to run a test
- if cmd[0] == "run":
- test_index = int(cmd[1])
- test = self.tests[test_index].copy()
- test.update(self_dict)
- test_iterations = int(test.get("iterations", 1))
- status = run_test_func("kvm", params=test,
- tag=test.get("shortname"),
- iterations=test_iterations)
- w.write("done %s %s\n" % (test_index, status))
- w.write("ready\n")
-
- # The scheduler wants this worker to free its used resources
- elif cmd[0] == "cleanup":
- env_filename = os.path.join(self.bindir, self_dict["env"])
- env = virt_utils.Env(env_filename)
- for obj in env.values():
- if isinstance(obj, virt_vm.VM):
- obj.destroy()
- elif isinstance(obj, aexpect.Spawn):
- obj.close()
- env.save()
- w.write("cleanup_done\n")
- w.write("ready\n")
-
- # There's no more work for this worker
- elif cmd[0] == "terminate":
- break
-
-
- def scheduler(self):
- """
- The scheduler function.
-
- Sends commands to workers, telling them to run tests, clean up or
- terminate execution.
- """
- idle_workers = []
- closing_workers = []
- test_status = ["waiting"] * len(self.tests)
- test_worker = [None] * len(self.tests)
- used_cpus = [0] * self.num_workers
- used_mem = [0] * self.num_workers
-
- while True:
- # Wait for a message from a worker
- r, w, x = select.select(self.w2s_r, [], [])
-
- someone_is_ready = False
-
- for pipe in r:
- worker_index = self.w2s_r.index(pipe)
- msg = pipe.readline().split()
- if not msg:
- continue
-
- # A worker is ready -- add it to the idle_workers list
- if msg[0] == "ready":
- idle_workers.append(worker_index)
- someone_is_ready = True
-
- # A worker completed a test
- elif msg[0] == "done":
- test_index = int(msg[1])
- test = self.tests[test_index]
- status = int(eval(msg[2]))
- test_status[test_index] = ("fail", "pass")[status]
- # If the test failed, mark all dependent tests as "failed"
too
- if not status:
- for i, other_test in enumerate(self.tests):
- for dep in other_test.get("dep", []):
- if dep in test["name"]:
- test_status[i] = "fail"
-
- # A worker is done shutting down its VMs and other processes
- elif msg[0] == "cleanup_done":
- used_cpus[worker_index] = 0
- used_mem[worker_index] = 0
- closing_workers.remove(worker_index)
-
- if not someone_is_ready:
- continue
-
- for worker in idle_workers[:]:
- # Find a test for this worker
- test_found = False
- for i, test in enumerate(self.tests):
- # We only want "waiting" tests
- if test_status[i] != "waiting":
- continue
- # Make sure the test isn't assigned to another worker
- if test_worker[i] is not None and test_worker[i] != worker:
- continue
- # Make sure the test's dependencies are satisfied
- dependencies_satisfied = True
- for dep in test["dep"]:
- dependencies = [j for j, t in enumerate(self.tests)
- if dep in t["name"]]
- bad_status_deps = [j for j in dependencies
- if test_status[j] != "pass"]
- if bad_status_deps:
- dependencies_satisfied = False
- break
- if not dependencies_satisfied:
- continue
- # Make sure we have enough resources to run the test
- test_used_cpus = int(test.get("used_cpus", 1))
- test_used_mem = int(test.get("used_mem", 128))
- # First make sure the other workers aren't using too many
- # CPUs (not including the workers currently shutting down)
- uc = (sum(used_cpus) - used_cpus[worker] -
- sum(used_cpus[i] for i in closing_workers))
- if uc and uc + test_used_cpus > self.total_cpus:
- continue
- # ... or too much memory
- um = (sum(used_mem) - used_mem[worker] -
- sum(used_mem[i] for i in closing_workers))
- if um and um + test_used_mem > self.total_mem:
- continue
- # If we reached this point it means there are, or will
- # soon be, enough resources to run the test
- test_found = True
- # Now check if the test can be run right now, i.e. if the
- # other workers, including the ones currently shutting
- # down, aren't using too many CPUs
- uc = (sum(used_cpus) - used_cpus[worker])
- if uc and uc + test_used_cpus > self.total_cpus:
- continue
- # ... or too much memory
- um = (sum(used_mem) - used_mem[worker])
- if um and um + test_used_mem > self.total_mem:
- continue
- # Everything is OK -- run the test
- test_status[i] = "running"
- test_worker[i] = worker
- idle_workers.remove(worker)
- # Update used_cpus and used_mem
- used_cpus[worker] = test_used_cpus
- used_mem[worker] = test_used_mem
- # Assign all related tests to this worker
- for j, other_test in enumerate(self.tests):
- for other_dep in other_test["dep"]:
- # All tests that depend on this test
- if other_dep in test["name"]:
- test_worker[j] = worker
- break
- # ... and all tests that share a dependency
- # with this test
- for dep in test["dep"]:
- if dep in other_dep or other_dep in dep:
- test_worker[j] = worker
- break
- # Tell the worker to run the test
- self.s2w_w[worker].write("run %s\n" % i)
- break
-
- # If there won't be any tests for this worker to run soon, tell
- # the worker to free its used resources
- if not test_found and (used_cpus[worker] or used_mem[worker]):
- self.s2w_w[worker].write("cleanup\n")
- idle_workers.remove(worker)
- closing_workers.append(worker)
-
- # If there are no more new tests to run, terminate the workers and
- # the scheduler
- if len(idle_workers) == self.num_workers:
- for worker in idle_workers:
- self.s2w_w[worker].write("terminate\n")
- break
--
1.7.11.2
_______________________________________________
Autotest-kernel mailing list
[email protected]
https://www.redhat.com/mailman/listinfo/autotest-kernel