Martin Sivák has uploaded a new change for review. Change subject: Refactor the storage access and creation and add support for block devices ......................................................................
Refactor the storage access and creation and add support for block devices Change-Id: I965642683a663c5655342f06d9f3ac19533ca91b Signed-off-by: Martin Sivak <[email protected]> --- M ovirt_hosted_engine_ha/agent/constants.py.in M ovirt_hosted_engine_ha/agent/hosted_engine.py M ovirt_hosted_engine_ha/broker/listener.py M ovirt_hosted_engine_ha/broker/storage_broker.py M ovirt_hosted_engine_ha/client/client.py M ovirt_hosted_engine_ha/env/constants.py.in M ovirt_hosted_engine_ha/lib/Makefile.am M ovirt_hosted_engine_ha/lib/brokerlink.py A ovirt_hosted_engine_ha/lib/storage_backends.py A ovirt_hosted_engine_ha/lib/storage_backends_test.py 10 files changed, 567 insertions(+), 70 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-hosted-engine-ha refs/changes/24/25624/1 diff --git a/ovirt_hosted_engine_ha/agent/constants.py.in b/ovirt_hosted_engine_ha/agent/constants.py.in index 6e8166c..eb06f89 100644 --- a/ovirt_hosted_engine_ha/agent/constants.py.in +++ b/ovirt_hosted_engine_ha/agent/constants.py.in @@ -41,6 +41,8 @@ METADATA_BLOCK_BYTES = 512 SERVICE_TYPE = 'hosted-engine' +MD_EXTENSION = '.metadata' +LOCKSPACE_EXTENSION = '.lockspace' BROKER_CONNECTION_RETRIES = 10 HOST_ALIVE_TIMEOUT_SECS = 60 ENGINE_RETRY_EXPIRATION_SECS = 600 diff --git a/ovirt_hosted_engine_ha/agent/hosted_engine.py b/ovirt_hosted_engine_ha/agent/hosted_engine.py index 57ae407..d53f54d 100644 --- a/ovirt_hosted_engine_ha/agent/hosted_engine.py +++ b/ovirt_hosted_engine_ha/agent/hosted_engine.py @@ -153,8 +153,6 @@ }) self._sd_path = None - self._metadata_path = None - self._sanlock_initialized = False @property @@ -295,8 +293,12 @@ error_count = 0 # make sure everything is initialized - self._initialize_broker() + # VDSM has to be initialized first, because it prepares the + # storage domain connection + # Broker then initializes the pieces needed fo metadata and leases + # which are then used by sanlock self._initialize_vdsm() + self._initialize_broker() self._initialize_sanlock() self._initialize_domain_monitor() @@ -312,8 +314,8 @@ try: # make sure everything is still initialized - self._initialize_broker() self._initialize_vdsm() + self._initialize_broker() self._initialize_sanlock() self._initialize_domain_monitor() @@ -383,6 +385,14 @@ raise else: self._local_monitors[m['field']] = lm + + # register storage domain info + sd_uuid = self._config.get(config.ENGINE, config.SD_UUID) + dom_type = self._config.get(config.ENGINE, config.DOMAIN_TYPE) + self._broker.set_storage_domain("fs", + sd_uuid=sd_uuid, + dom_type=dom_type) + self._log.info("Broker initialized, all submonitors started") def _initialize_vdsm(self): @@ -440,10 +450,8 @@ def _initialize_sanlock(self): self._cond_start_service('sanlock') - - self._metadata_dir = env_path.get_metadata_path(self._config) - lease_file = os.path.join(self._metadata_dir, - constants.SERVICE_TYPE + '.lockspace') + lease_file = self._broker.get_service_file( + constants.SERVICE_TYPE + constants.LOCKSPACE_EXTENSION) if not self._sanlock_initialized: lvl = logging.INFO else: @@ -612,15 +620,13 @@ def _push_to_storage(self, blocks): self._broker.put_stats_on_storage( - self._metadata_dir, - constants.SERVICE_TYPE, + constants.SERVICE_TYPE + constants.MD_EXTENSION, self._config.get(config.ENGINE, config.HOST_ID), blocks) def collect_stats(self): all_stats = self._broker.get_stats_from_storage( - self._metadata_dir, - constants.SERVICE_TYPE) + constants.SERVICE_TYPE + constants.MD_EXTENSION) data = { # Flag is set if the local agent discovers metadata too new for it @@ -644,8 +650,7 @@ } all_stats = self._broker.get_stats_from_storage( - self._metadata_dir, - constants.SERVICE_TYPE) + constants.SERVICE_TYPE + constants.MD_EXTENSION) # host_id 0 is a special case, representing global metadata if all_stats and 0 in all_stats: diff --git a/ovirt_hosted_engine_ha/broker/listener.py b/ovirt_hosted_engine_ha/broker/listener.py index 9b6a0d3..ceb2eda 100644 --- a/ovirt_hosted_engine_ha/broker/listener.py +++ b/ovirt_hosted_engine_ha/broker/listener.py @@ -162,7 +162,8 @@ data = util.socket_readline(self.request, self._log) self._log.debug("Input: %s", data) try: - response = "success " + self._dispatch(data) + response = ("success " + + self._dispatch(self.client_address, data)) except RequestError as e: response = "failure " + format(str(e)) self._log.debug("Response: %s", response) @@ -210,6 +211,11 @@ + "%d - %s", id, str(e)) self._remove_monitor_conn_entry() + # cleanup storage + with self.server.sp_listener.storage_broker_instance_access_lock: + self.server.sp_listener.storage_broker_instance \ + .cleanup(self.client_address) + try: SocketServer.BaseRequestHandler.finish(self) except socket.error as e: @@ -217,7 +223,7 @@ self._log.error("Error while closing connection", exc_info=True) - def _dispatch(self, data): + def _dispatch(self, client, data): """ Parses and dispatches a request to the appropriate subsystem. @@ -258,13 +264,26 @@ options = self._get_options(tokens) with self.server.sp_listener.storage_broker_instance_access_lock: stats = self.server.sp_listener.storage_broker_instance \ - .get_all_stats_for_service_type(**options) + .get_all_stats_for_service_type(client, **options) return stats elif type == 'put-stats': options = self._get_options(tokens) with self.server.sp_listener.storage_broker_instance_access_lock: self.server.sp_listener.storage_broker_instance \ - .put_stats(**options) + .put_stats(client, **options) + return "ok" + elif type == 'service-path': + options = self._get_options(tokens) + with self.server.sp_listener.storage_broker_instance_access_lock: + self.server.sp_listener.storage_broker_instance \ + .get_service_path(client, **options) + return "ok" + elif type == 'set-storage-domain': + sd_type = int(tokens.pop(0)) + options = self._get_options(tokens) + with self.server.sp_listener.storage_broker_instance_access_lock: + self.server.sp_listener.storage_broker_instance \ + .set_storage_domain(client, sd_type, **options) return "ok" elif type == 'notify': options = self._get_options(tokens) diff --git a/ovirt_hosted_engine_ha/broker/storage_broker.py b/ovirt_hosted_engine_ha/broker/storage_broker.py index ade92f0..71c4df4 100644 --- a/ovirt_hosted_engine_ha/broker/storage_broker.py +++ b/ovirt_hosted_engine_ha/broker/storage_broker.py @@ -25,20 +25,43 @@ from ..env import constants from ..lib.exceptions import RequestError -from ..lib import util +from ..lib.storage_backends import FilesystemBackend, BlockBackend class StorageBroker(object): + + DOMAINTYPES = { + "fs": FilesystemBackend, + "block": BlockBackend + } + def __init__(self): self._log = logging.getLogger("%s.StorageBroker" % __name__) self._storage_access_lock = threading.Lock() + self._backends = {} - def get_all_stats_for_service_type(self, storage_dir, service_type): + def set_storage_domain(self, client, sd_type, **kwargs): + """ + The first thing any new broker client should do is to configure + the storage it wants to use. Client is arbitrary hashable structure, + but usually is (host, ip) of the agent that opened the connection + to the broker. The client value is provided by the broker logic. + + :param sd_type: The type of backend the clients want to use + :type sd_type: Currently the only supported values are "fs" and "block" + """ + if client in self._backends: + self._backends[client].disconnect() + del self._backends[client] + self._backends[client] = self.DOMAINTYPES[sd_type](**kwargs) + self._backends[client].connect() + + def get_all_stats_for_service_type(self, client, service_type): """ Reads all files in storage_dir for the given service_type, returning a space-delimited string of "<host_id>=<hex data>" for each host. """ - d = self.get_raw_stats_for_service_type(storage_dir, service_type) + d = self.get_raw_stats_for_service_type(client, service_type) str_list = [] for host_id in sorted(d.keys()): hex_data = base64.b16encode(d.get(host_id)) @@ -47,7 +70,7 @@ str_list.append("{0}={1}".format(host_id, hex_data)) return ' '.join(str_list) - def get_raw_stats_for_service_type(self, storage_dir, service_type): + def get_raw_stats_for_service_type(self, client, service_type): """ Reads all files in storage_dir for the given service_type, returning a dict of "host_id: data" for each host @@ -55,9 +78,9 @@ Note: this method is called from the client as well as from self.get_all_stats_for_service_type(). """ - self._log.debug("Getting stats for service %s from %s", - service_type, storage_dir) - path = os.path.join(storage_dir, self._get_filename(service_type)) + path, offset = self._backends[client].filename(service_type) + self._log.debug("Getting stats for service %s from %s with offset %d", + service_type, path, offset) # Use direct I/O if possible, to avoid the local filesystem cache # from hiding metadata file updates from other hosts. For NFS, we @@ -72,6 +95,7 @@ try: with self._storage_access_lock: f = os.open(path, direct_flag | os.O_RDONLY) + os.lseek(f, offset, os.SEEK_SET) data = os.read(f, read_size) os.close(f) except IOError as e: @@ -83,7 +107,7 @@ for i in range(0, len(data), bs) if data[i] != '\0')) - def put_stats(self, storage_dir, service_type, host_id, data): + def put_stats(self, client, service_type, host_id, data): """ Writes to the storage in file <storage_dir>/<service-type>.metadata, storing the hex string data (e.g. 01bc4f[...]) in binary format. @@ -96,8 +120,8 @@ the file after the write. """ host_id = int(host_id) - path = os.path.join(storage_dir, self._get_filename(service_type)) - offset = host_id * constants.HOST_SEGMENT_BYTES + path, offset = self._backends[client].filename(service_type) + offset += host_id * constants.HOST_SEGMENT_BYTES self._log.debug("Writing stats for service %s, host id %d" " to file %s, offset %d", service_type, host_id, path, offset) @@ -107,7 +131,6 @@ with self._storage_access_lock: f = None try: - util.mkdir_recursive(storage_dir) f = io.open(path, "r+b") f.seek(offset, os.SEEK_SET) f.write(byte_data) @@ -122,7 +145,22 @@ self._log.debug("Finished") - def _get_filename(self, service_type): - # Nothing special yet - # FIXME should escape special chars before production deployment - return "{0}.{1}".format(service_type, constants.MD_EXTENSION) + def get_service_path(self, client, service): + """ + Returns the full path to a file or device that holds the data + for specified service. + + Client ID is provided by the broker logic. + """ + return self._backends[client].filename(service)[0] + + def cleanup(self, client): + """ + After client (like ha_agent) disconnects the storage backend + needs to be freed properly. + + Client ID is provided by the broker logic. + """ + if client in self._backends: + self._backends[client].disconnect() + del self._backends[client] diff --git a/ovirt_hosted_engine_ha/client/client.py b/ovirt_hosted_engine_ha/client/client.py index 508ff0a..db67bd6 100644 --- a/ovirt_hosted_engine_ha/client/client.py +++ b/ovirt_hosted_engine_ha/client/client.py @@ -18,7 +18,6 @@ # import logging -import os import time from ..agent import constants as agent_constants @@ -81,22 +80,7 @@ self._config = config.Config() broker = brokerlink.BrokerLink() with broker.connection(): - stats = broker.get_stats_from_storage( - path.get_metadata_path(self._config), - constants.SERVICE_TYPE) - - return self._parse_stats(stats, mode) - - def get_all_stats_direct(self, dom_path, service_type, mode=StatModes.ALL): - """ - Like get_all_stats(), but bypasses broker by directly accessing - storage. - """ - from ..broker import storage_broker - - sb = storage_broker.StorageBroker() - path = os.path.join(dom_path, constants.SD_METADATA_DIR) - stats = sb.get_raw_stats_for_service_type(path, service_type) + stats = broker.get_stats_from_storage(constants.SERVICE_TYPE) return self._parse_stats(stats, mode) @@ -136,16 +120,6 @@ """ return self.get_all_stats(self.StatModes.HOST) - def get_all_host_stats_direct(self, dom_path, service_type): - """ - Like get_all_host_stats(), but bypasses broker by directly accessing - storage. - """ - return self.get_all_stats_direct( - dom_path, - service_type, - self.StatModes.HOST) - def set_global_md_flag(self, flag, value): """ Connects to HA broker and sets flags in global metadata, leaving @@ -169,9 +143,7 @@ broker = brokerlink.BrokerLink() with broker.connection(): - all_stats = broker.get_stats_from_storage( - path.get_metadata_path(self._config), - constants.SERVICE_TYPE) + all_stats = broker.get_stats_from_storage(constants.SERVICE_TYPE) global_stats = all_stats.get(0) if global_stats and len(global_stats): @@ -200,9 +172,7 @@ host_id = int(self._config.get(config.ENGINE, config.HOST_ID)) broker = brokerlink.BrokerLink() with broker.connection(): - stats = broker.get_stats_from_storage( - path.get_metadata_path(self._config), - constants.SERVICE_TYPE) + stats = broker.get_stats_from_storage(constants.SERVICE_TYPE) score = 0 if host_id in stats: diff --git a/ovirt_hosted_engine_ha/env/constants.py.in b/ovirt_hosted_engine_ha/env/constants.py.in index 73a2ba9..3fefd40 100644 --- a/ovirt_hosted_engine_ha/env/constants.py.in +++ b/ovirt_hosted_engine_ha/env/constants.py.in @@ -22,7 +22,6 @@ METADATA_FEATURE_VERSION = 1 METADATA_PARSE_VERSION = 1 -MD_EXTENSION = 'metadata' MAX_HOST_ID_SCAN = 64 HOST_SEGMENT_BYTES = 4096 METADATA_BLOCK_BYTES = 512 diff --git a/ovirt_hosted_engine_ha/lib/Makefile.am b/ovirt_hosted_engine_ha/lib/Makefile.am index 22133a8..03660dc 100644 --- a/ovirt_hosted_engine_ha/lib/Makefile.am +++ b/ovirt_hosted_engine_ha/lib/Makefile.am @@ -33,6 +33,8 @@ metadata.py \ util.py \ vds_client.py \ + storage_backends.py \ + storage_backends_test.py \ $(NULL) SUBDIRS = \ diff --git a/ovirt_hosted_engine_ha/lib/brokerlink.py b/ovirt_hosted_engine_ha/lib/brokerlink.py index b197d87..2888fb0 100644 --- a/ovirt_hosted_engine_ha/lib/brokerlink.py +++ b/ovirt_hosted_engine_ha/lib/brokerlink.py @@ -148,6 +148,33 @@ self._log.debug("Success, status %s", response) return response + def get_service_path(self, service): + request = "service-path {0}".format(service) + try: + response = self._checked_communicate(request) + except Exception as e: + self._log.error("Exception getting service path: %s", str(e)) + raise RequestError("Failed to get service path: {0}" + .format(str(e))) + self._log.debug("Success, service path %s", response) + return response + + def set_storage_domain(self, sd_type, **options): + request = ["set-storage-domain {0}".format(sd_type)] + for (k, v) in options.iteritems(): + request.append("{0}={1}".format(k, str(v))) + request = " ".join(request) + + try: + response = self._checked_communicate(request) + except Exception as e: + raise RequestError("Failed to set storage domain {0}, " + "options {1}: {2}" + .format(sd_type, options, e)) + + self._log.info("Success, id %s", response) + return response + def put_stats_on_storage(self, storage_dir, service_type, host_id, data): """ Puts data on the shared storage according to the parameters. @@ -161,13 +188,13 @@ .format(storage_dir, service_type, host_id, hex_data)) self._checked_communicate(request) - def get_stats_from_storage(self, storage_dir, service_type): + def get_stats_from_storage(self, service_type): """ Returns data from the shared storage for all hosts of the specified service type. """ - request = ("get-stats storage_dir={0} service_type={1}" - .format(storage_dir, service_type)) + request = ("get-stats service_type={1}" + .format(service_type)) result = self._checked_communicate(request) tokens = result.split() diff --git a/ovirt_hosted_engine_ha/lib/storage_backends.py b/ovirt_hosted_engine_ha/lib/storage_backends.py new file mode 100644 index 0000000..9bb2b38 --- /dev/null +++ b/ovirt_hosted_engine_ha/lib/storage_backends.py @@ -0,0 +1,334 @@ +from collections import namedtuple +import os +import struct +import subprocess +import zlib +from ..env import constants +from . import util +import math +from cStringIO import StringIO +from operator import itemgetter + + +class BlockBackendCorruptedException(Exception): + """ + Exception raised by BlockBackend when the internal metadata + structure reports a corrupted data (CRC mismatch). + """ + pass + + +class StorageBackend(object): + """ + The base template for Storage backend classes. + """ + + def __init__(self): + # the atomic block size of the underlying storage + self._blocksize = 512 + + def connect(self): + """Initialize the storage.""" + raise NotImplementedError() + + def disconnect(self): + """Close the storage.""" + raise NotImplementedError() + + def filename(self, service): + """ + Return a tuple with the filename to open and bytes to skip + to get to the metadata structures. + """ + raise NotImplementedError() + + @property + def blocksize(self): + return self._blocksize + + def create(self, service_map): + """ + Reinitialize the storage backend according to the service_map. + Key represents service name and value contains the size of the + required block in Bytes. + """ + raise NotImplementedError() + + +class FilesystemBackend(StorageBackend): + """ + Backend for all filesystem based access structures. This + includes VDSM's LVM block devices as they are accessed using symlinks + in the same structure that VDSM uses for NFS based storage domains. + """ + def __init__(self, sd_uuid, dom_type): + super(FilesystemBackend, self).__init__() + self._sd_uuid = sd_uuid + self._dom_type = dom_type + self._lv_based = False + self._storage_path = None + + def filename(self, service): + fname = os.path.join(self._storage_path, service) + return (fname, 0) + + def get_domain_path(self, sd_uuid, dom_type): + """ + Return path of storage domain holding engine vm + """ + parent = constants.SD_MOUNT_PARENT + if dom_type == 'glusterfs': + parent = os.path.join(parent, 'glusterSD') + + for dname in os.listdir(parent): + path = os.path.join(parent, dname, sd_uuid) + if os.access(path, os.F_OK): + if dname == "blockSD": + self._lv_based = True + return path + raise Exception("path to storage domain {0} not found in {1}" + .format(sd_uuid, parent)) + + def connect(self): + self._lv_based = False + self._storage_path = os.path.join(self.get_domain_path(self._sd_uuid, + self._dom_type), + constants.SD_METADATA_DIR) + util.mkdir_recursive(self._storage_path) + + if not self._lv_based: + return + + # create LV symlinks + uuid = self._sd_uuid + for lv in os.listdir(os.path.join("/dev", uuid)): + # skip all LVs that do not have proper name + if not lv.startswith(constants.SD_METADATA_DIR + "-"): + continue + + # strip the prefix and use the rest as symlink name + service = lv.split(constants.SD_METADATA_DIR + "-", 1)[-1] + os.symlink(os.path.join("/dev", uuid, lv), + os.path.join(self._storage_path, service)) + + def disconnect(self): + pass + + def lvcreate(self, vg_uuid, lv_name, size_bytes, popen=subprocess.Popen): + """ + Call lvm lvcreate and ask it to create a Logical Volume in the + Storage Domain's Volume Group. It should be named lv_name + and be big enough to fit size_bytes into it. + """ + lvc = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + args=["lvm", "lvcreate", "-L", str(size_bytes)+"B", + "-n", lv_name, vg_uuid]) + lvc.wait() + + def create(self, service_map): + for service, size in service_map.iteritems(): + service_path = os.path.join(self._storage_path, service) + if self._lv_based: + lvname = "-".join([constants.SD_METADATA_DIR, service]) + self.lvcreate(self._sd_uuid, lvname, size) + else: + # file based storage + with open(service_path, "w") as f: + # create an empty file, possibly utilizing + # sparse files if size was provided + if size: + f.seek(size - 1) + f.write(0) + + # reconnect so all links are refreshed + self.disconnect() + self.connect() + + +class BlockBackend(StorageBackend): + """ + This uses a pure block device to expose the data. It requires device + mapper support to explode the single device to couple of virtual files. + + This is supposed to be used for devices that are not managed by VDSM + or do not use LVM. + + The structure is described using a table that starts at block 0 + of the block device. + + The format of that block is: + + <the next chained block:64bit> - 0 means this is the last block + <service name used length: 1 Byte> + <service name: 63 Bytes> + <data area start block:64 bit> + <data area block length:64 bit> + ... data area records can be repeated if they fit into one block + ... if there is need for more data area records, one of the chained + ... blocks can add them to the same service name + 128bit (16B) of 0s as a sentinel + 32bit CRC32 + + This information is converted to Device Mapper table and used to create + the logical device files. + """ + + # Binary format specifications, all in network byte order + # The name supports only 63 characters + BlockInfo = namedtuple("BlockInfo", ("next", "name", "pieces", "valid")) + BlockStructHeader = struct.Struct("!Q64p") + BlockStructData = struct.Struct("!QQ") + BlockCRC = struct.Struct("!L") + + def __init__(self, block_dev_name, dm_prefix): + super(BlockBackend, self).__init__() + self._block_dev_name = block_dev_name + self._dm_prefix = dm_prefix + self._services = {} + + def parse_meta_block(self, block): + """ + Parse one info block from the raw byte representation + to namedtuple BlockInfo. + """ + next_block, name = self.BlockStructHeader.unpack_from(block, 0) + pieces = [] + seek = self.BlockStructHeader.size + while True: + start, size = self.BlockStructData.unpack_from(block, seek) + seek += self.BlockStructData.size + # end of blocks section sentinel + if start == size and size == 0: + break + pieces.append((start, size)) + crc = zlib.crc32(block[:seek]) & 0xffffffff + # the comma is important, unpack_from returns a single element tuple + expected_crc, = self.BlockCRC.unpack_from(block, seek) + + return self.BlockInfo._make((next_block, name, + tuple(pieces), crc == expected_crc)) + + def get_services(self, block_device_fo): + """ + Read all the info blocks from a block device and + assemble the services dictionary mapping + service name to a list of (data block start, size) + tuples. + """ + offset = block_device_fo.tell() + services = {} + while True: + block = block_device_fo.read(self.blocksize) + parsed = self.parse_meta_block(block) + if not parsed.valid: + raise BlockBackendCorruptedException( + "CRC for block ending at %d does not match data!" + % block_device_fo.tell()) + services.setdefault(parsed.name, []) + services[parsed.name].extend(parsed.pieces) + if parsed.next == 0: + break + else: + block_device_fo.seek(offset + parsed.next * self.blocksize, 0) + return services + + def dm_name(self, service): + return os.path.join(self._dm_prefix, service) + + def compute_dm_table(self, pieces): + """ + Take a list of tuples in the form of (start, size) and + create the string representation of device mapper table + that can be used in dmsetup. + """ + table = [] + log_start = 0 + for start, size in pieces: + table.append("%d %d linear %s %d" + % (log_start, size, self._block_dev_name, start)) + log_start += size + return "\n".join(table) + + def connect(self): + with open(self._block_dev_name, "r") as bd: + self._services = self.get_services(bd) + + for name, pieces in self._services: + table = self.compute_dm_table(pieces) + self.dmcreate(name, table) + + def disconnect(self): + for name in self._services: + self.dmremove(name) + + def dmcreate(self, name, table, popen=subprocess.Popen): + """ + Call dmsetup create <name> and pass it the table. + """ + dm = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + args=["dmsetup", "create", name]) + print "Table for %s" % name + print table + print + + dm.communicate(table) + + def dmremove(self, name, popen=subprocess.Popen): + """ + Call dmsetup remove to destroy the device. + """ + dm = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + args=["dmsetup", "remove", name]) + stdout, stderr = dm.communicate() + + def filename(self, service): + if service not in self._services: + return None + else: + return os.path.join("/dev/mapper", self.dm_name(service)), 0 + + def create_info_blocks(self, service_map): + def bc(size): + """ + Return the number of blocks needed to accommodate size + number of Bytes. + """ + return int(math.ceil(size / float(self._blocksize))) + + # first len(service_map) blocks will contain + # the information about services and their data locations + data_start = len(service_map) + info_blocks = [] + + # Linearize the list, put smaller services before bigger ones + service_list = service_map.items() + service_list.sort(key=itemgetter(1)) + + # create list of next ids that starts with 1, goes to the last + # index (size - 1) and then ends with 0 + next_links = range(1, data_start) + [0] + for next_id, (service, size) in zip(next_links, service_list): + block_len = bc(size) + raw_data = StringIO() + raw_data.write(self.BlockStructHeader.pack(next_id, service)) + raw_data.write(self.BlockStructData.pack(data_start, block_len)) + raw_data.write(self.BlockStructData.pack(0, 0)) + crc = zlib.crc32(raw_data.getvalue()) & 0xffffffff + raw_data.write(self.BlockCRC.pack(crc)) + info_blocks.append(raw_data.getvalue()) + data_start += block_len + + return info_blocks + + def create(self, service_map): + info_blocks = self.create_info_blocks(service_map) + with open(self._block_dev_name, "w") as dev: + for idx, b in enumerate(info_blocks): + dev.seek(idx * self._blocksize) + dev.write(b) + + self.disconnect() + self.connect() diff --git a/ovirt_hosted_engine_ha/lib/storage_backends_test.py b/ovirt_hosted_engine_ha/lib/storage_backends_test.py new file mode 100644 index 0000000..03fb131 --- /dev/null +++ b/ovirt_hosted_engine_ha/lib/storage_backends_test.py @@ -0,0 +1,101 @@ +import unittest +import cStringIO as StringIO +import struct +import zlib + +from .storage_backends import BlockBackend + + +class StorageBackendTests(unittest.TestCase): + + def test_single_bad_block_decode(self): + raw = struct.pack("!Q64pQQQQQQL", + 1, "test", + 1, 100, + 102, 100, + 0, 0, + 0) + b = BlockBackend("/dev/null", "test-1") + block = b.parse_meta_block(raw) + self.assertEqual(block, BlockBackend.BlockInfo( + 1, "test", ((1, 100), (102, 100)), False)) + + def test_service_creation(self): + b = BlockBackend("/dev/null", "test-1") + blocks = b.create_info_blocks({"test1": 300, + "test2": 512, + "test3": 1024*1024*50}) + + self.assertEqual(3, len(blocks)) + + test1 = struct.pack("!Q64pQQQQ", + 1, "test1", + 3, 1, + 0, 0) + test1crc = struct.pack("!L", zlib.crc32(test1) & 0xffffffff) + test2 = struct.pack("!Q64pQQQQ", + 2, "test2", + 4, 1, + 0, 0) + test2crc = struct.pack("!L", zlib.crc32(test2) & 0xffffffff) + test3 = struct.pack("!Q64pQQQQ", + 0, "test3", + 5, 102400, + 0, 0) + test3crc = struct.pack("!L", zlib.crc32(test3) & 0xffffffff) + + expected = [ + test1 + test1crc, + test2 + test2crc, + test3 + test3crc + ] + + self.assertEqual(expected, blocks) + + def test_single_good_block_decode(self): + raw = struct.pack("!Q64pQQQQQQ", + 1, "test", + 1, 100, + 102, 100, + 0, 0) + rawcrc = struct.pack("!L", zlib.crc32(raw) & 0xffffffff) + b = BlockBackend("/dev/null", "test-1") + block = b.parse_meta_block(raw+rawcrc) + self.assertEqual(block, BlockBackend.BlockInfo( + 1, "test", ((1, 100), (102, 100)), True)) + + def test_dm_table(self): + block = BlockBackend.BlockInfo(1, "test", ((1, 100), (102, 100)), True) + b = BlockBackend("/dev/null", "test-1") + table = b.compute_dm_table(block.pieces) + expected = ("0 100 linear /dev/null 1\n" + "100 100 linear /dev/null 102") + self.assertEqual(expected, table) + + def test_get_services(self): + raw1 = struct.pack("!Q64pQQQQQQ", + 1, "test", + 1, 100, + 102, 100, + 0, 0) + raw1crc = struct.pack("!L", zlib.crc32(raw1) & 0xffffffff) + + raw2 = struct.pack("!Q64pQQQQQQ", + 0, "test2", + 2, 200, + 202, 200, + 0, 0) + raw2crc = struct.pack("!L", zlib.crc32(raw2) & 0xffffffff) + + b = BlockBackend("/dev/null", "test-1") + blockdev = StringIO.StringIO() + blockdev.write(raw1) + blockdev.write(raw1crc) + blockdev.seek(b.blocksize) + blockdev.write(raw2) + blockdev.write(raw2crc) + blockdev.seek(0) + expected = {'test': [(1, 100), (102, 100)], + 'test2': [(2, 200), (202, 200)]} + services = b.get_services(blockdev) + self.assertEqual(expected, services) -- To view, visit http://gerrit.ovirt.org/25624 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I965642683a663c5655342f06d9f3ac19533ca91b Gerrit-PatchSet: 1 Gerrit-Project: ovirt-hosted-engine-ha Gerrit-Branch: master Gerrit-Owner: Martin Sivák <[email protected]> _______________________________________________ Engine-patches mailing list [email protected] http://lists.ovirt.org/mailman/listinfo/engine-patches
