Hi Lucas, thank you for migrating of migration_multihost_framework. I'm worry about circular deps too.. I have had problem with deps several weeks before. I have found only one typo in patch below.
----- Original Message ----- > With this, we remove all dependencies on virt modules in > virt_utils, eliminating a source of circular deps. Also, > move some of the auxiliary multi host migration classes > to virt_storage, where they are more appropriate. > > Signed-off-by: Lucas Meneghel Rodrigues <l...@redhat.com> > --- > client/tests/kvm/tests/migration_multi_host.py | 4 +- > client/tests/kvm/tests/migration_multi_host_fd.py | 4 +- > .../migration_multi_host_with_file_transfer.py | 7 +- > .../migration_multi_host_with_speed_measurement.py | 6 +- > client/virt/virt_storage.py | 21 +- > client/virt/virt_test_utils.py | 474 > ++++++++++++++++++- > client/virt/virt_utils.py | 492 > +------------------- > 7 files changed, 505 insertions(+), 503 deletions(-) > > diff --git a/client/tests/kvm/tests/migration_multi_host.py > b/client/tests/kvm/tests/migration_multi_host.py > index 187c0a0..2557933 100644 > --- a/client/tests/kvm/tests/migration_multi_host.py > +++ b/client/tests/kvm/tests/migration_multi_host.py > @@ -1,4 +1,4 @@ > -from autotest.client.virt import virt_utils > +from autotest.client.virt import virt_test_utils > > > def run_migration_multi_host(test, params, env): > @@ -12,7 +12,7 @@ def run_migration_multi_host(test, params, env): > @param params: Dictionary with test parameters. > @param env: Dictionary with the test environment. > """ > - class TestMultihostMigration(virt_utils.MultihostMigration): > + class > TestMultihostMigration(virt_test_utils.MultihostMigration): > def __init__(self, test, params, env): > super(TestMultihostMigration, self).__init__(test, > params, env) > > diff --git a/client/tests/kvm/tests/migration_multi_host_fd.py > b/client/tests/kvm/tests/migration_multi_host_fd.py > index 6f3c72b..04bb758 100644 > --- a/client/tests/kvm/tests/migration_multi_host_fd.py > +++ b/client/tests/kvm/tests/migration_multi_host_fd.py > @@ -1,5 +1,5 @@ > import logging, socket, time, errno, os, fcntl > -from autotest.client.virt import virt_utils > +from autotest.client.virt import virt_test_utils, virt_utils > from autotest.client.shared.syncdata import SyncData > > def run_migration_multi_host_fd(test, params, env): > @@ -14,7 +14,7 @@ def run_migration_multi_host_fd(test, params, env): > @param params: Dictionary with test parameters. > @param env: Dictionary with the test environment. > """ > - class TestMultihostMigrationFd(virt_utils.MultihostMigration): > + class > TestMultihostMigrationFd(virt_test_utils.MultihostMigration): > def __init__(self, test, params, env): > super(TestMultihostMigrationFd, self).__init__(test, > params, env) > > diff --git > a/client/tests/kvm/tests/migration_multi_host_with_file_transfer.py > b/client/tests/kvm/tests/migration_multi_host_with_file_transfer.py > index 976b268..bbe025f 100644 > --- > a/client/tests/kvm/tests/migration_multi_host_with_file_transfer.py > +++ > b/client/tests/kvm/tests/migration_multi_host_with_file_transfer.py > @@ -2,7 +2,8 @@ import logging, threading > from autotest.client import utils as client_utils > from autotest.client.shared import utils, error > from autotest.client.shared.syncdata import SyncData > -from autotest.client.virt import virt_env_process, virt_utils, > virt_remote > +from autotest.client.virt import virt_env_process, virt_test_utils, > virt_remote > +from autotest.client.virt import virt_utils > > > @error.context_aware > @@ -65,7 +66,7 @@ def > run_migration_multi_host_with_file_transfer(test, params, env): > #Count of migration during file transfer. > migrate_count = int(params.get("migrate_count", "3")) > > - class TestMultihostMigration(virt_utils.MultihostMigration): > + class > TestMultihostMigration(virt_test_utils.MultihostMigration): > def __init__(self, test, params, env): > super(TestMultihostMigration, self).__init__(test, > params, env) > self.vm = None > @@ -85,7 +86,7 @@ def > run_migration_multi_host_with_file_transfer(test, params, env): > @param mig_data: object with migration data. > """ > for vm in mig_data.vms: > - if not virt_utils.guest_active(vm): > + if not virt_test_utils.guest_active(vm): > raise error.TestFail("Guest not active after > migration") > > logging.info("Migrated guest appears to be running") > diff --git > a/client/tests/kvm/tests/migration_multi_host_with_speed_measurement.py > b/client/tests/kvm/tests/migration_multi_host_with_speed_measurement.py > index 887cfe3..9ef255e 100644 > --- > a/client/tests/kvm/tests/migration_multi_host_with_speed_measurement.py > +++ > b/client/tests/kvm/tests/migration_multi_host_with_speed_measurement.py > @@ -1,8 +1,8 @@ > import os, re, logging, time, socket > -from autotest.client.virt import virt_utils > from autotest.client.shared import error, utils > from autotest.client.shared.barrier import listen_server > from autotest.client.shared.syncdata import SyncData > +from autotest.client.virt import virt_test_utils, virt_utils > > > def run_migration_multi_host_with_speed_measurement(test, params, > env): > @@ -71,7 +71,7 @@ def > run_migration_multi_host_with_speed_measurement(test, params, env): > > return mig_stat > > - class TestMultihostMigration(virt_utils.MultihostMigration): > + class > TestMultihostMigration(virt_test_utils.MultihostMigration): > def __init__(self, test, params, env): > super(TestMultihostMigration, self).__init__(test, > params, env) > self.mig_stat = None > @@ -117,7 +117,7 @@ def > run_migration_multi_host_with_speed_measurement(test, params, env): > session = > vm.wait_for_login(timeout=self.login_timeout) > > virt_utils.install_cpuflags_util_on_vm(test, vm, > install_path, > - extra_flags="-msse3 > -msse2") > + > extra_flags="-msse3 > -msse2") > > cmd = ("%s/cpuflags-test --stressmem %d" % > (os.path.join(install_path, "test_cpu_flags"), > vm_mem / 2)) > diff --git a/client/virt/virt_storage.py > b/client/virt/virt_storage.py > index a56f50a..1c89521 100644 > --- a/client/virt/virt_storage.py > +++ b/client/virt/virt_storage.py > @@ -9,7 +9,26 @@ import logging, os, shutil, re > from autotest.client import utils > import virt_utils, virt_vm > > -# Functions for handling virtual machine image files > + > +def preprocess_images(bindir, params, env): > + # Clone master image form vms. > + for vm_name in params.get("vms").split(): > + vm = env.get_vm(vm_name) > + if vm: > + vm.destroy(free_mac_addresses=False) > + vm_params = params.object_params(vm_name) > + for image in vm_params.get("master_images_clone").split(): > + image_obj = QemuImg(params, bindir, image) > + image_obj.clone_image(params, vm_name, image, bindir) > + > + > +def postprocess_images(bindir, params): > + for vm in params.get("vms").split(): > + vm_params = params.object_params(vm) > + for image in vm_params.get("master_images_clone").split(): > + image_obj = QemuImg(params, bindir, image) > + image_obj.rm_clone_image(params, vm, image, bindir) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ image_obj.rm_cloned_image > + > > def get_image_blkdebug_filename(params, root_dir): > """ > diff --git a/client/virt/virt_test_utils.py > b/client/virt/virt_test_utils.py > index 60781b8..efcabbb 100644 > --- a/client/virt/virt_test_utils.py > +++ b/client/virt/virt_test_utils.py > @@ -27,7 +27,8 @@ from Queue import Queue > from autotest.client.shared import error, global_config > from autotest.client import utils > from autotest.client.tools import scan_results > -import aexpect, virt_utils, virt_vm, virt_remote > +from autotest.client.shared.syncdata import SyncData, > SyncListenServer > +import aexpect, virt_utils, virt_vm, virt_remote, virt_storage, > virt_env_process > > GLOBAL_CONFIG = global_config.global_config > > @@ -298,6 +299,475 @@ def migrate(vm, env=None, mig_timeout=3600, > mig_protocol="tcp", > return vm > > > +def guest_active(vm): > + o = vm.monitor.info("status") > + if isinstance(o, str): > + return "status: running" in o > + else: > + if "status" in o: > + return o.get("status") == "running" > + else: > + return o.get("running") > + > + > +class MigrationData(object): > + def __init__(self, params, srchost, dsthost, vms_name, > params_append): > + """ > + Class that contains data needed for one migration. > + """ > + self.params = params.copy() > + self.params.update(params_append) > + > + self.source = False > + if params.get("hostid") == srchost: > + self.source = True > + > + self.destination = False > + if params.get("hostid") == dsthost: > + self.destination = True > + > + self.src = srchost > + self.dst = dsthost > + self.hosts = [srchost, dsthost] > + self.mig_id = {'src': srchost, 'dst': dsthost, "vms": > vms_name} > + self.vms_name = vms_name > + self.vms = [] > + self.vm_ports = None > + > + > + def is_src(self): > + """ > + @return: True if host is source. > + """ > + return self.source > + > + > + def is_dst(self): > + """ > + @return: True if host is destination. > + """ > + return self.destination > + > + > +class MultihostMigration(object): > + """ > + Class that provides a framework for multi-host migration. > + > + Migration can be run both synchronously and asynchronously. > + To specify what is going to happen during the multi-host > + migration, it is necessary to reimplement the method > + migration_scenario. It is possible to start multiple migrations > + in separate threads, since self.migrate is thread safe. > + > + Only one test using multihost migration framework should be > + started on one machine otherwise it is necessary to solve the > + problem with listen server port. > + > + Multihost migration starts SyncListenServer through which > + all messages are transfered, since the multiple hosts can > + be in diferent states. > + > + Class SyncData is used to transfer data over network or > + synchronize the migration process. Synchronization sessions > + are recognized by session_id. > + > + It is important to note that, in order to have multi-host > + migration, one needs shared guest image storage. The simplest > + case is when the guest images are on an NFS server. > + > + Example: > + class TestMultihostMigration(virt_utils.MultihostMigration): > + def __init__(self, test, params, env): > + super(testMultihostMigration, self).__init__(test, > params, env) > + > + def migration_scenario(self): > + srchost = self.params.get("hosts")[0] > + dsthost = self.params.get("hosts")[1] > + > + def worker(mig_data): > + vm = env.get_vm("vm1") > + session = > vm.wait_for_login(timeout=self.login_timeout) > + session.sendline("nohup dd if=/dev/zero > of=/dev/null &") > + session.cmd("killall -0 dd") > + > + def check_worker(mig_data): > + vm = env.get_vm("vm1") > + session = > vm.wait_for_login(timeout=self.login_timeout) > + session.cmd("killall -9 dd") > + > + # Almost synchronized migration, waiting to end it. > + # Work is started only on first VM. > + self.migrate_wait(["vm1", "vm2"], srchost, dsthost, > + worker, check_worker) > + > + # Migration started in different threads. > + # It allows to start multiple migrations > simultaneously. > + mig1 = self.migrate(["vm1"], srchost, dsthost, > + worker, check_worker) > + mig2 = self.migrate(["vm2"], srchost, dsthost) > + mig2.join() > + mig1.join() > + > + mig = TestMultihostMigration(test, params, env) > + mig.run() > + """ > + def __init__(self, test, params, env, preprocess_env=True): > + self.test = test > + self.params = params > + self.env = env > + self.hosts = params.get("hosts") > + self.hostid = params.get('hostid', "") > + self.comm_port = int(params.get("comm_port", 13234)) > + vms_count = len(params["vms"].split()) > + > + self.login_timeout = int(params.get("login_timeout", 360)) > + self.disk_prepare_timeout = > int(params.get("disk_prepare_timeout", > + 160 * vms_count)) > + self.finish_timeout = int(params.get("finish_timeout", > + 120 * vms_count)) > + > + self.new_params = None > + > + if params.get("clone_master") == "yes": > + self.clone_master = True > + else: > + self.clone_master = False > + > + self.mig_timeout = int(params.get("mig_timeout")) > + # Port used to communicate info between source and > destination > + self.regain_ip_cmd = params.get("regain_ip_cmd", "dhclient") > + > + self.vm_lock = threading.Lock() > + > + self.sync_server = None > + if self.clone_master: > + self.sync_server = SyncListenServer() > + > + if preprocess_env: > + self.preprocess_env() > + self._hosts_barrier(self.hosts, self.hosts, > 'disk_prepared', > + self.disk_prepare_timeout) > + > + > + def migration_scenario(self): > + """ > + Multi Host migration_scenario is started from method run > where the > + exceptions are checked. It is not necessary to take care of > + cleaning up after test crash or finish. > + """ > + raise NotImplementedError > + > + > + def migrate_vms_src(self, mig_data): > + """ > + Migrate vms source. > + > + @param mig_Data: Data for migration. > + > + For change way how machine migrates is necessary > + re implement this method. > + """ > + def mig_wrapper(vm, dsthost, vm_ports): > + vm.migrate(dest_host=dsthost, > remote_port=vm_ports[vm.name]) > + > + logging.info("Start migrating now...") > + multi_mig = [] > + for vm in mig_data.vms: > + multi_mig.append((mig_wrapper, (vm, mig_data.dst, > + mig_data.vm_ports))) > + virt_utils.parallel(multi_mig) > + > + > + def migrate_vms_dest(self, mig_data): > + """ > + Migrate vms destination. This function is started on dest > host during > + migration. > + > + @param mig_Data: Data for migration. > + """ > + pass > + > + > + def __del__(self): > + if self.sync_server: > + self.sync_server.close() > + > + > + def master_id(self): > + return self.hosts[0] > + > + > + def _hosts_barrier(self, hosts, session_id, tag, timeout): > + logging.debug("Barrier timeout: %d tags: %s" % (timeout, > tag)) > + tags = SyncData(self.master_id(), self.hostid, hosts, > + "%s,%s,barrier" % (str(session_id), tag), > + self.sync_server).sync(tag, timeout) > + logging.debug("Barrier tag %s" % (tags)) > + > + > + def preprocess_env(self): > + """ > + Prepare env to start vms. > + """ > + virt_storage.preprocess_images(self.test.bindir, > self.params, self.env) > + > + > + def _check_vms_source(self, mig_data): > + for vm in mig_data.vms: > + vm.wait_for_login(timeout=self.login_timeout) > + > + sync = SyncData(self.master_id(), self.hostid, > mig_data.hosts, > + mig_data.mig_id, self.sync_server) > + mig_data.vm_ports = sync.sync(timeout=120)[mig_data.dst] > + logging.info("Received from destination the migration port > %s", > + str(mig_data.vm_ports)) > + > + > + def _check_vms_dest(self, mig_data): > + mig_data.vm_ports = {} > + for vm in mig_data.vms: > + logging.info("Communicating to source migration port > %s", > + vm.migration_port) > + mig_data.vm_ports[vm.name] = vm.migration_port > + > + SyncData(self.master_id(), self.hostid, > + mig_data.hosts, mig_data.mig_id, > + self.sync_server).sync(mig_data.vm_ports, > timeout=120) > + > + > + def _prepare_params(self, mig_data): > + """ > + Prepare separate params for vm migration. > + > + @param vms_name: List of vms. > + """ > + new_params = mig_data.params.copy() > + new_params["vms"] = " ".join(mig_data.vms_name) > + return new_params > + > + > + def _check_vms(self, mig_data): > + """ > + Check if vms are started correctly. > + > + @param vms: list of vms. > + @param source: Must be True if is source machine. > + """ > + logging.info("Try check vms %s" % (mig_data.vms_name)) > + for vm in mig_data.vms_name: > + if not self.env.get_vm(vm) in mig_data.vms: > + mig_data.vms.append(self.env.get_vm(vm)) > + for vm in mig_data.vms: > + logging.info("Check vm %s on host %s" % (vm.name, > self.hostid)) > + vm.verify_alive() > + > + if mig_data.is_src(): > + self._check_vms_source(mig_data) > + else: > + self._check_vms_dest(mig_data) > + > + > + def prepare_for_migration(self, mig_data, migration_mode): > + """ > + Prepare destination of migration for migration. > + > + @param mig_data: Class with data necessary for migration. > + @param migration_mode: Migration mode for prepare machine. > + """ > + new_params = self._prepare_params(mig_data) > + > + new_params['migration_mode'] = migration_mode > + new_params['start_vm'] = 'yes' > + self.vm_lock.acquire() > + virt_env_process.process(self.test, new_params, self.env, > + virt_env_process.preprocess_image, > + virt_env_process.preprocess_vm) > + self.vm_lock.release() > + > + self._check_vms(mig_data) > + > + > + def migrate_vms(self, mig_data): > + """ > + Migrate vms. > + """ > + if mig_data.is_src(): > + self.migrate_vms_src(mig_data) > + else: > + self.migrate_vms_dest(mig_data) > + > + > + def check_vms(self, mig_data): > + """ > + Check vms after migrate. > + > + @param mig_data: object with migration data. > + """ > + for vm in mig_data.vms: > + if not guest_active(vm): > + raise error.TestFail("Guest not active after > migration") > + > + logging.info("Migrated guest appears to be running") > + > + logging.info("Logging into migrated guest after > migration...") > + for vm in mig_data.vms: > + session_serial = vm.wait_for_serial_login(timeout= > + > self.login_timeout) > + #There is sometime happen that system sends some message > on > + #serial console and IP renew command block test. Because > + #there must be added "sleep" in IP renew command. > + session_serial.cmd(self.regain_ip_cmd) > + vm.wait_for_login(timeout=self.login_timeout) > + > + > + def postprocess_env(self): > + """ > + Kill vms and delete cloned images. > + """ > + virt_storage.postprocess_images(self.test.bindir, > self.params) > + > + > + def migrate(self, vms_name, srchost, dsthost, start_work=None, > + check_work=None, mig_mode="tcp", > params_append=None): > + """ > + Migrate machine from srchost to dsthost. It executes > start_work on > + source machine before migration and executes check_work on > dsthost > + after migration. > + > + Migration execution progress: > + > + source host | dest host > + -------------------------------------------------------- > + prepare guest on both sides of migration > + - start machine and check if machine works > + - synchronize transfer data needed for migration > + -------------------------------------------------------- > + start work on source guests | wait for migration > + -------------------------------------------------------- > + migrate guest to dest host. > + wait on finish migration synchronization > + -------------------------------------------------------- > + | check work on vms > + -------------------------------------------------------- > + wait for sync on finish migration > + > + @param vms_name: List of vms. > + @param srchost: src host id. > + @param dsthost: dst host id. > + @param start_work: Function started before migration. > + @param check_work: Function started after migration. > + @param mig_mode: Migration mode. > + @param params_append: Append params to self.params only for > migration. > + """ > + def migrate_wrap(vms_name, srchost, dsthost, > start_work=None, > + check_work=None, params_append=None): > + logging.info("Starting migrate vms %s from host %s to > %s" % > + (vms_name, srchost, dsthost)) > + error = None > + mig_data = MigrationData(self.params, srchost, dsthost, > + vms_name, params_append) > + try: > + try: > + if mig_data.is_src(): > + self.prepare_for_migration(mig_data, None) > + elif self.hostid == dsthost: > + self.prepare_for_migration(mig_data, > mig_mode) > + else: > + return > + > + if mig_data.is_src(): > + if start_work: > + start_work(mig_data) > + > + self.migrate_vms(mig_data) > + > + timeout = 30 > + if not mig_data.is_src(): > + timeout = self.mig_timeout > + self._hosts_barrier(mig_data.hosts, > mig_data.mig_id, > + 'mig_finished', timeout) > + > + if mig_data.is_dst(): > + self.check_vms(mig_data) > + if check_work: > + check_work(mig_data) > + > + except: > + error = True > + raise > + finally: > + if not error: > + self._hosts_barrier(self.hosts, > + mig_data.mig_id, > + 'test_finihed', > + self.finish_timeout) > + > + def wait_wrap(vms_name, srchost, dsthost): > + mig_data = MigrationData(self.params, srchost, dsthost, > vms_name, > + None) > + timeout = (self.login_timeout + self.mig_timeout + > + self.finish_timeout) > + > + self._hosts_barrier(self.hosts, mig_data.mig_id, > + 'test_finihed', timeout) > + > + if (self.hostid in [srchost, dsthost]): > + mig_thread = utils.InterruptedThread(migrate_wrap, > (vms_name, > + > srchost, > + > dsthost, > + > start_work, > + > check_work, > + > params_append)) > + else: > + mig_thread = utils.InterruptedThread(wait_wrap, > (vms_name, > + > srchost, > + > dsthost)) > + mig_thread.start() > + return mig_thread > + > + > + def migrate_wait(self, vms_name, srchost, dsthost, > start_work=None, > + check_work=None, mig_mode="tcp", > params_append=None): > + """ > + Migrate machine from srchost to dsthost and wait for finish. > + It executes start_work on source machine before migration > and executes > + check_work on dsthost after migration. > + > + @param vms_name: List of vms. > + @param srchost: src host id. > + @param dsthost: dst host id. > + @param start_work: Function which is started before > migration. > + @param check_work: Function which is started after > + done of migration. > + """ > + self.migrate(vms_name, srchost, dsthost, start_work, > check_work, > + mig_mode, params_append).join() > + > + > + def cleanup(self): > + """ > + Cleanup env after test. > + """ > + if self.clone_master: > + self.sync_server.close() > + self.postprocess_env() > + > + > + def run(self): > + """ > + Start multihost migration scenario. > + After scenario is finished or if scenario crashed it calls > postprocess > + machines and cleanup env. > + """ > + try: > + self.migration_scenario() > + > + self._hosts_barrier(self.hosts, self.hosts, > 'all_test_finihed', > + self.finish_timeout) > + finally: > + self.cleanup() > + > + > def stop_windows_service(session, service, timeout=120): > """ > Stop a Windows service using sc. > @@ -910,6 +1380,7 @@ def run_virt_sub_test(test, params, env, > sub_type=None, tag=None): > params = params.object_params(tag) > run_func(test, params, env) > > + > def pin_vm_threads(vm, node): > """ > Pin VM threads to single cpu of a numa node > @@ -921,6 +1392,7 @@ def pin_vm_threads(vm, node): > for i in vm.vcpu_threads: > logging.info("pin vcpu thread(%s) to cpu(%s)" % (i, > node.pin_cpu(i))) > > + > def service_setup(vm, session, dir): > > params = vm.get_params() > diff --git a/client/virt/virt_utils.py b/client/virt/virt_utils.py > index 1284fc2..46d4598 100644 > --- a/client/virt/virt_utils.py > +++ b/client/virt/virt_utils.py > @@ -5,13 +5,11 @@ Virtualization test utility functions. > """ > > import time, string, random, socket, os, signal, re, logging, > commands, cPickle > -import fcntl, shelve, ConfigParser, threading, sys, UserDict, > inspect, tarfile > +import fcntl, shelve, ConfigParser, sys, UserDict, inspect, tarfile > import struct, shutil, glob, HTMLParser, urllib, traceback, platform > from autotest.client import utils, os_dep > from autotest.client.shared import error, logging_config > from autotest.client.shared import logging_manager, git > -from autotest.client.shared.syncdata import SyncData, > SyncListenServer > -import virt_env_process, virt_storage > > try: > import koji > @@ -4199,494 +4197,6 @@ def generate_mac_address_simple(): > return mac > > > -def guest_active(vm): > - o = vm.monitor.info("status") > - if isinstance(o, str): > - return "status: running" in o > - else: > - if "status" in o: > - return o.get("status") == "running" > - else: > - return o.get("running") > - > - > -def preprocess_images(bindir, params, env): > - # Clone master image form vms. > - for vm_name in params.get("vms").split(): > - vm = env.get_vm(vm_name) > - if vm: > - vm.destroy(free_mac_addresses=False) > - vm_params = params.object_params(vm_name) > - for image in vm_params.get("master_images_clone").split(): > - image_obj = virt_storage.QemuImg(params, bindir, image) > - image_obj.clone_image(params, vm_name, image, bindir) > - > - > -def postprocess_images(bindir, params): > - for vm in params.get("vms").split(): > - vm_params = params.object_params(vm) > - for image in vm_params.get("master_images_clone").split(): > - image_obj = virt_storage.QemuImg(params, bindir, image) > - image_obj.rm_clone_image(params, vm, image, bindir) > - > - > -class MigrationData(object): > - def __init__(self, params, srchost, dsthost, vms_name, > params_append): > - """ > - Class that contains data needed for one migration. > - """ > - self.params = params.copy() > - self.params.update(params_append) > - > - self.source = False > - if params.get("hostid") == srchost: > - self.source = True > - > - self.destination = False > - if params.get("hostid") == dsthost: > - self.destination = True > - > - self.src = srchost > - self.dst = dsthost > - self.hosts = [srchost, dsthost] > - self.mig_id = {'src': srchost, 'dst': dsthost, "vms": > vms_name} > - self.vms_name = vms_name > - self.vms = [] > - self.vm_ports = None > - > - > - def is_src(self): > - """ > - @return: True if host is source. > - """ > - return self.source > - > - > - def is_dst(self): > - """ > - @return: True if host is destination. > - """ > - return self.destination > - > - > -class MultihostMigration(object): > - """ > - Class that provides a framework for multi-host migration. > - > - Migration can be run both synchronously and asynchronously. > - To specify what is going to happen during the multi-host > - migration, it is necessary to reimplement the method > - migration_scenario. It is possible to start multiple migrations > - in separate threads, since self.migrate is thread safe. > - > - Only one test using multihost migration framework should be > - started on one machine otherwise it is necessary to solve the > - problem with listen server port. > - > - Multihost migration starts SyncListenServer through which > - all messages are transfered, since the multiple hosts can > - be in diferent states. > - > - Class SyncData is used to transfer data over network or > - synchronize the migration process. Synchronization sessions > - are recognized by session_id. > - > - It is important to note that, in order to have multi-host > - migration, one needs shared guest image storage. The simplest > - case is when the guest images are on an NFS server. > - > - Example: > - class TestMultihostMigration(virt_utils.MultihostMigration): > - def __init__(self, test, params, env): > - super(testMultihostMigration, self).__init__(test, > params, env) > - > - def migration_scenario(self): > - srchost = self.params.get("hosts")[0] > - dsthost = self.params.get("hosts")[1] > - > - def worker(mig_data): > - vm = env.get_vm("vm1") > - session = > vm.wait_for_login(timeout=self.login_timeout) > - session.sendline("nohup dd if=/dev/zero > of=/dev/null &") > - session.cmd("killall -0 dd") > - > - def check_worker(mig_data): > - vm = env.get_vm("vm1") > - session = > vm.wait_for_login(timeout=self.login_timeout) > - session.cmd("killall -9 dd") > - > - # Almost synchronized migration, waiting to end it. > - # Work is started only on first VM. > - self.migrate_wait(["vm1", "vm2"], srchost, dsthost, > - worker, check_worker) > - > - # Migration started in different threads. > - # It allows to start multiple migrations > simultaneously. > - mig1 = self.migrate(["vm1"], srchost, dsthost, > - worker, check_worker) > - mig2 = self.migrate(["vm2"], srchost, dsthost) > - mig2.join() > - mig1.join() > - > - mig = TestMultihostMigration(test, params, env) > - mig.run() > - """ > - def __init__(self, test, params, env, preprocess_env=True): > - self.test = test > - self.params = params > - self.env = env > - self.hosts = params.get("hosts") > - self.hostid = params.get('hostid', "") > - self.comm_port = int(params.get("comm_port", 13234)) > - vms_count = len(params["vms"].split()) > - > - self.login_timeout = int(params.get("login_timeout", 360)) > - self.disk_prepare_timeout = > int(params.get("disk_prepare_timeout", > - 160 * vms_count)) > - self.finish_timeout = int(params.get("finish_timeout", > - 120 * vms_count)) > - > - self.new_params = None > - > - if params.get("clone_master") == "yes": > - self.clone_master = True > - else: > - self.clone_master = False > - > - self.mig_timeout = int(params.get("mig_timeout")) > - # Port used to communicate info between source and > destination > - self.regain_ip_cmd = params.get("regain_ip_cmd", "dhclient") > - > - self.vm_lock = threading.Lock() > - > - self.sync_server = None > - if self.clone_master: > - self.sync_server = SyncListenServer() > - > - if preprocess_env: > - self.preprocess_env() > - self._hosts_barrier(self.hosts, self.hosts, > 'disk_prepared', > - self.disk_prepare_timeout) > - > - > - def migration_scenario(self): > - """ > - Multi Host migration_scenario is started from method run > where the > - exceptions are checked. It is not necessary to take care of > - cleaning up after test crash or finish. > - """ > - raise NotImplementedError > - > - > - def migrate_vms_src(self, mig_data): > - """ > - Migrate vms source. > - > - @param mig_Data: Data for migration. > - > - For change way how machine migrates is necessary > - re implement this method. > - """ > - def mig_wrapper(vm, dsthost, vm_ports): > - vm.migrate(dest_host=dsthost, > remote_port=vm_ports[vm.name]) > - > - logging.info("Start migrating now...") > - multi_mig = [] > - for vm in mig_data.vms: > - multi_mig.append((mig_wrapper, (vm, mig_data.dst, > - mig_data.vm_ports))) > - parallel(multi_mig) > - > - > - def migrate_vms_dest(self, mig_data): > - """ > - Migrate vms destination. This function is started on dest > host during > - migration. > - > - @param mig_Data: Data for migration. > - """ > - pass > - > - > - def __del__(self): > - if self.sync_server: > - self.sync_server.close() > - > - > - def master_id(self): > - return self.hosts[0] > - > - > - def _hosts_barrier(self, hosts, session_id, tag, timeout): > - logging.debug("Barrier timeout: %d tags: %s" % (timeout, > tag)) > - tags = SyncData(self.master_id(), self.hostid, hosts, > - "%s,%s,barrier" % (str(session_id), tag), > - self.sync_server).sync(tag, timeout) > - logging.debug("Barrier tag %s" % (tags)) > - > - > - def preprocess_env(self): > - """ > - Prepare env to start vms. > - """ > - preprocess_images(self.test.bindir, self.params, self.env) > - > - > - def _check_vms_source(self, mig_data): > - for vm in mig_data.vms: > - vm.wait_for_login(timeout=self.login_timeout) > - > - sync = SyncData(self.master_id(), self.hostid, > mig_data.hosts, > - mig_data.mig_id, self.sync_server) > - mig_data.vm_ports = sync.sync(timeout=120)[mig_data.dst] > - logging.info("Received from destination the migration port > %s", > - str(mig_data.vm_ports)) > - > - > - def _check_vms_dest(self, mig_data): > - mig_data.vm_ports = {} > - for vm in mig_data.vms: > - logging.info("Communicating to source migration port > %s", > - vm.migration_port) > - mig_data.vm_ports[vm.name] = vm.migration_port > - > - SyncData(self.master_id(), self.hostid, > - mig_data.hosts, mig_data.mig_id, > - self.sync_server).sync(mig_data.vm_ports, > timeout=120) > - > - > - def _prepare_params(self, mig_data): > - """ > - Prepare separate params for vm migration. > - > - @param vms_name: List of vms. > - """ > - new_params = mig_data.params.copy() > - new_params["vms"] = " ".join(mig_data.vms_name) > - return new_params > - > - > - def _check_vms(self, mig_data): > - """ > - Check if vms are started correctly. > - > - @param vms: list of vms. > - @param source: Must be True if is source machine. > - """ > - logging.info("Try check vms %s" % (mig_data.vms_name)) > - for vm in mig_data.vms_name: > - if not self.env.get_vm(vm) in mig_data.vms: > - mig_data.vms.append(self.env.get_vm(vm)) > - for vm in mig_data.vms: > - logging.info("Check vm %s on host %s" % (vm.name, > self.hostid)) > - vm.verify_alive() > - > - if mig_data.is_src(): > - self._check_vms_source(mig_data) > - else: > - self._check_vms_dest(mig_data) > - > - > - def prepare_for_migration(self, mig_data, migration_mode): > - """ > - Prepare destination of migration for migration. > - > - @param mig_data: Class with data necessary for migration. > - @param migration_mode: Migration mode for prepare machine. > - """ > - new_params = self._prepare_params(mig_data) > - > - new_params['migration_mode'] = migration_mode > - new_params['start_vm'] = 'yes' > - self.vm_lock.acquire() > - virt_env_process.process(self.test, new_params, self.env, > - virt_env_process.preprocess_image, > - virt_env_process.preprocess_vm) > - self.vm_lock.release() > - > - self._check_vms(mig_data) > - > - > - def migrate_vms(self, mig_data): > - """ > - Migrate vms. > - """ > - if mig_data.is_src(): > - self.migrate_vms_src(mig_data) > - else: > - self.migrate_vms_dest(mig_data) > - > - > - def check_vms(self, mig_data): > - """ > - Check vms after migrate. > - > - @param mig_data: object with migration data. > - """ > - for vm in mig_data.vms: > - if not guest_active(vm): > - raise error.TestFail("Guest not active after > migration") > - > - logging.info("Migrated guest appears to be running") > - > - logging.info("Logging into migrated guest after > migration...") > - for vm in mig_data.vms: > - session_serial = vm.wait_for_serial_login(timeout= > - > self.login_timeout) > - #There is sometime happen that system sends some message > on > - #serial console and IP renew command block test. Because > - #there must be added "sleep" in IP renew command. > - session_serial.cmd(self.regain_ip_cmd) > - vm.wait_for_login(timeout=self.login_timeout) > - > - > - def postprocess_env(self): > - """ > - Kill vms and delete cloned images. > - """ > - postprocess_images(self.test.bindir, self.params) > - > - > - def migrate(self, vms_name, srchost, dsthost, start_work=None, > - check_work=None, mig_mode="tcp", > params_append=None): > - """ > - Migrate machine from srchost to dsthost. It executes > start_work on > - source machine before migration and executes check_work on > dsthost > - after migration. > - > - Migration execution progress: > - > - source host | dest host > - -------------------------------------------------------- > - prepare guest on both sides of migration > - - start machine and check if machine works > - - synchronize transfer data needed for migration > - -------------------------------------------------------- > - start work on source guests | wait for migration > - -------------------------------------------------------- > - migrate guest to dest host. > - wait on finish migration synchronization > - -------------------------------------------------------- > - | check work on vms > - -------------------------------------------------------- > - wait for sync on finish migration > - > - @param vms_name: List of vms. > - @param srchost: src host id. > - @param dsthost: dst host id. > - @param start_work: Function started before migration. > - @param check_work: Function started after migration. > - @param mig_mode: Migration mode. > - @param params_append: Append params to self.params only for > migration. > - """ > - def migrate_wrap(vms_name, srchost, dsthost, > start_work=None, > - check_work=None, params_append=None): > - logging.info("Starting migrate vms %s from host %s to > %s" % > - (vms_name, srchost, dsthost)) > - error = None > - mig_data = MigrationData(self.params, srchost, dsthost, > - vms_name, params_append) > - try: > - try: > - if mig_data.is_src(): > - self.prepare_for_migration(mig_data, None) > - elif self.hostid == dsthost: > - self.prepare_for_migration(mig_data, > mig_mode) > - else: > - return > - > - if mig_data.is_src(): > - if start_work: > - start_work(mig_data) > - > - self.migrate_vms(mig_data) > - > - timeout = 30 > - if not mig_data.is_src(): > - timeout = self.mig_timeout > - self._hosts_barrier(mig_data.hosts, > mig_data.mig_id, > - 'mig_finished', timeout) > - > - if mig_data.is_dst(): > - self.check_vms(mig_data) > - if check_work: > - check_work(mig_data) > - > - except: > - error = True > - raise > - finally: > - if not error: > - self._hosts_barrier(self.hosts, > - mig_data.mig_id, > - 'test_finihed', > - self.finish_timeout) > - > - def wait_wrap(vms_name, srchost, dsthost): > - mig_data = MigrationData(self.params, srchost, dsthost, > vms_name, > - None) > - timeout = (self.login_timeout + self.mig_timeout + > - self.finish_timeout) > - > - self._hosts_barrier(self.hosts, mig_data.mig_id, > - 'test_finihed', timeout) > - > - if (self.hostid in [srchost, dsthost]): > - mig_thread = utils.InterruptedThread(migrate_wrap, > (vms_name, > - > srchost, > - > dsthost, > - > start_work, > - > check_work, > - > params_append)) > - else: > - mig_thread = utils.InterruptedThread(wait_wrap, > (vms_name, > - > srchost, > - > dsthost)) > - mig_thread.start() > - return mig_thread > - > - > - def migrate_wait(self, vms_name, srchost, dsthost, > start_work=None, > - check_work=None, mig_mode="tcp", > params_append=None): > - """ > - Migrate machine from srchost to dsthost and wait for finish. > - It executes start_work on source machine before migration > and executes > - check_work on dsthost after migration. > - > - @param vms_name: List of vms. > - @param srchost: src host id. > - @param dsthost: dst host id. > - @param start_work: Function which is started before > migration. > - @param check_work: Function which is started after > - done of migration. > - """ > - self.migrate(vms_name, srchost, dsthost, start_work, > check_work, > - mig_mode, params_append).join() > - > - > - def cleanup(self): > - """ > - Cleanup env after test. > - """ > - if self.clone_master: > - self.sync_server.close() > - self.postprocess_env() > - > - > - def run(self): > - """ > - Start multihost migration scenario. > - After scenario is finished or if scenario crashed it calls > postprocess > - machines and cleanup env. > - """ > - try: > - self.migration_scenario() > - > - self._hosts_barrier(self.hosts, self.hosts, > 'all_test_finihed', > - self.finish_timeout) > - finally: > - self.cleanup() > - > def get_ip_address_by_interface(ifname): > """ > returns ip address by interface > -- > 1.7.10.2 > > _______________________________________________ > Autotest mailing list > Autotest@test.kernel.org > http://test.kernel.org/cgi-bin/mailman/listinfo/autotest > _______________________________________________ Autotest mailing list Autotest@test.kernel.org http://test.kernel.org/cgi-bin/mailman/listinfo/autotest