Martin Sivák has uploaded a new change for review.

Change subject: Refactor the storage access and creation and add support for 
block devices
......................................................................

Refactor the storage access and creation and add support for block devices

Change-Id: I965642683a663c5655342f06d9f3ac19533ca91b
Signed-off-by: Martin Sivak <[email protected]>
---
M ovirt_hosted_engine_ha/agent/constants.py.in
M ovirt_hosted_engine_ha/agent/hosted_engine.py
M ovirt_hosted_engine_ha/broker/listener.py
M ovirt_hosted_engine_ha/broker/storage_broker.py
M ovirt_hosted_engine_ha/client/client.py
M ovirt_hosted_engine_ha/env/constants.py.in
M ovirt_hosted_engine_ha/lib/Makefile.am
M ovirt_hosted_engine_ha/lib/brokerlink.py
A ovirt_hosted_engine_ha/lib/storage_backends.py
A ovirt_hosted_engine_ha/lib/storage_backends_test.py
10 files changed, 567 insertions(+), 70 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-hosted-engine-ha 
refs/changes/24/25624/1

diff --git a/ovirt_hosted_engine_ha/agent/constants.py.in 
b/ovirt_hosted_engine_ha/agent/constants.py.in
index 6e8166c..eb06f89 100644
--- a/ovirt_hosted_engine_ha/agent/constants.py.in
+++ b/ovirt_hosted_engine_ha/agent/constants.py.in
@@ -41,6 +41,8 @@
 
 METADATA_BLOCK_BYTES = 512
 SERVICE_TYPE = 'hosted-engine'
+MD_EXTENSION = '.metadata'
+LOCKSPACE_EXTENSION = '.lockspace'
 BROKER_CONNECTION_RETRIES = 10
 HOST_ALIVE_TIMEOUT_SECS = 60
 ENGINE_RETRY_EXPIRATION_SECS = 600
diff --git a/ovirt_hosted_engine_ha/agent/hosted_engine.py 
b/ovirt_hosted_engine_ha/agent/hosted_engine.py
index 57ae407..d53f54d 100644
--- a/ovirt_hosted_engine_ha/agent/hosted_engine.py
+++ b/ovirt_hosted_engine_ha/agent/hosted_engine.py
@@ -153,8 +153,6 @@
         })
 
         self._sd_path = None
-        self._metadata_path = None
-
         self._sanlock_initialized = False
 
     @property
@@ -295,8 +293,12 @@
         error_count = 0
 
         # make sure everything is initialized
-        self._initialize_broker()
+        # VDSM has to be initialized first, because it prepares the
+        # storage domain connection
+        # Broker then initializes the pieces needed fo metadata and leases
+        # which are then used by sanlock
         self._initialize_vdsm()
+        self._initialize_broker()
         self._initialize_sanlock()
         self._initialize_domain_monitor()
 
@@ -312,8 +314,8 @@
 
             try:
                 # make sure everything is still initialized
-                self._initialize_broker()
                 self._initialize_vdsm()
+                self._initialize_broker()
                 self._initialize_sanlock()
                 self._initialize_domain_monitor()
 
@@ -383,6 +385,14 @@
                 raise
             else:
                 self._local_monitors[m['field']] = lm
+
+        # register storage domain info
+        sd_uuid = self._config.get(config.ENGINE, config.SD_UUID)
+        dom_type = self._config.get(config.ENGINE, config.DOMAIN_TYPE)
+        self._broker.set_storage_domain("fs",
+                                        sd_uuid=sd_uuid,
+                                        dom_type=dom_type)
+
         self._log.info("Broker initialized, all submonitors started")
 
     def _initialize_vdsm(self):
@@ -440,10 +450,8 @@
 
     def _initialize_sanlock(self):
         self._cond_start_service('sanlock')
-
-        self._metadata_dir = env_path.get_metadata_path(self._config)
-        lease_file = os.path.join(self._metadata_dir,
-                                  constants.SERVICE_TYPE + '.lockspace')
+        lease_file = self._broker.get_service_file(
+            constants.SERVICE_TYPE + constants.LOCKSPACE_EXTENSION)
         if not self._sanlock_initialized:
             lvl = logging.INFO
         else:
@@ -612,15 +620,13 @@
 
     def _push_to_storage(self, blocks):
         self._broker.put_stats_on_storage(
-            self._metadata_dir,
-            constants.SERVICE_TYPE,
+            constants.SERVICE_TYPE + constants.MD_EXTENSION,
             self._config.get(config.ENGINE, config.HOST_ID),
             blocks)
 
     def collect_stats(self):
         all_stats = self._broker.get_stats_from_storage(
-            self._metadata_dir,
-            constants.SERVICE_TYPE)
+            constants.SERVICE_TYPE + constants.MD_EXTENSION)
 
         data = {
             # Flag is set if the local agent discovers metadata too new for it
@@ -644,8 +650,7 @@
         }
 
         all_stats = self._broker.get_stats_from_storage(
-            self._metadata_dir,
-            constants.SERVICE_TYPE)
+            constants.SERVICE_TYPE + constants.MD_EXTENSION)
 
         # host_id 0 is a special case, representing global metadata
         if all_stats and 0 in all_stats:
diff --git a/ovirt_hosted_engine_ha/broker/listener.py 
b/ovirt_hosted_engine_ha/broker/listener.py
index 9b6a0d3..ceb2eda 100644
--- a/ovirt_hosted_engine_ha/broker/listener.py
+++ b/ovirt_hosted_engine_ha/broker/listener.py
@@ -162,7 +162,8 @@
                 data = util.socket_readline(self.request, self._log)
                 self._log.debug("Input: %s", data)
                 try:
-                    response = "success " + self._dispatch(data)
+                    response = ("success "
+                                + self._dispatch(self.client_address, data))
                 except RequestError as e:
                     response = "failure " + format(str(e))
                 self._log.debug("Response: %s", response)
@@ -210,6 +211,11 @@
                                 + "%d - %s", id, str(e))
         self._remove_monitor_conn_entry()
 
+        # cleanup storage
+        with self.server.sp_listener.storage_broker_instance_access_lock:
+            self.server.sp_listener.storage_broker_instance \
+                .cleanup(self.client_address)
+
         try:
             SocketServer.BaseRequestHandler.finish(self)
         except socket.error as e:
@@ -217,7 +223,7 @@
                 self._log.error("Error while closing connection",
                                 exc_info=True)
 
-    def _dispatch(self, data):
+    def _dispatch(self, client, data):
         """
         Parses and dispatches a request to the appropriate subsystem.
 
@@ -258,13 +264,26 @@
             options = self._get_options(tokens)
             with self.server.sp_listener.storage_broker_instance_access_lock:
                 stats = self.server.sp_listener.storage_broker_instance \
-                    .get_all_stats_for_service_type(**options)
+                    .get_all_stats_for_service_type(client, **options)
             return stats
         elif type == 'put-stats':
             options = self._get_options(tokens)
             with self.server.sp_listener.storage_broker_instance_access_lock:
                 self.server.sp_listener.storage_broker_instance \
-                    .put_stats(**options)
+                    .put_stats(client, **options)
+            return "ok"
+        elif type == 'service-path':
+            options = self._get_options(tokens)
+            with self.server.sp_listener.storage_broker_instance_access_lock:
+                self.server.sp_listener.storage_broker_instance \
+                    .get_service_path(client, **options)
+            return "ok"
+        elif type == 'set-storage-domain':
+            sd_type = int(tokens.pop(0))
+            options = self._get_options(tokens)
+            with self.server.sp_listener.storage_broker_instance_access_lock:
+                self.server.sp_listener.storage_broker_instance \
+                    .set_storage_domain(client, sd_type, **options)
             return "ok"
         elif type == 'notify':
             options = self._get_options(tokens)
diff --git a/ovirt_hosted_engine_ha/broker/storage_broker.py 
b/ovirt_hosted_engine_ha/broker/storage_broker.py
index ade92f0..71c4df4 100644
--- a/ovirt_hosted_engine_ha/broker/storage_broker.py
+++ b/ovirt_hosted_engine_ha/broker/storage_broker.py
@@ -25,20 +25,43 @@
 
 from ..env import constants
 from ..lib.exceptions import RequestError
-from ..lib import util
+from ..lib.storage_backends import FilesystemBackend, BlockBackend
 
 
 class StorageBroker(object):
+
+    DOMAINTYPES = {
+        "fs": FilesystemBackend,
+        "block": BlockBackend
+    }
+
     def __init__(self):
         self._log = logging.getLogger("%s.StorageBroker" % __name__)
         self._storage_access_lock = threading.Lock()
+        self._backends = {}
 
-    def get_all_stats_for_service_type(self, storage_dir, service_type):
+    def set_storage_domain(self, client, sd_type, **kwargs):
+        """
+        The first thing any new broker client should do is to configure
+        the storage it wants to use. Client is arbitrary hashable structure,
+        but usually is (host, ip) of the agent that opened the connection
+        to the broker. The client value is provided by the broker logic.
+
+        :param sd_type: The type of backend the clients want to use
+        :type sd_type: Currently the only supported values are "fs" and "block"
+        """
+        if client in self._backends:
+            self._backends[client].disconnect()
+            del self._backends[client]
+        self._backends[client] = self.DOMAINTYPES[sd_type](**kwargs)
+        self._backends[client].connect()
+
+    def get_all_stats_for_service_type(self, client, service_type):
         """
         Reads all files in storage_dir for the given service_type, returning a
         space-delimited string of "<host_id>=<hex data>" for each host.
         """
-        d = self.get_raw_stats_for_service_type(storage_dir, service_type)
+        d = self.get_raw_stats_for_service_type(client, service_type)
         str_list = []
         for host_id in sorted(d.keys()):
             hex_data = base64.b16encode(d.get(host_id))
@@ -47,7 +70,7 @@
             str_list.append("{0}={1}".format(host_id, hex_data))
         return ' '.join(str_list)
 
-    def get_raw_stats_for_service_type(self, storage_dir, service_type):
+    def get_raw_stats_for_service_type(self, client, service_type):
         """
         Reads all files in storage_dir for the given service_type, returning a
         dict of "host_id: data" for each host
@@ -55,9 +78,9 @@
         Note: this method is called from the client as well as from
         self.get_all_stats_for_service_type().
         """
-        self._log.debug("Getting stats for service %s from %s",
-                        service_type, storage_dir)
-        path = os.path.join(storage_dir, self._get_filename(service_type))
+        path, offset = self._backends[client].filename(service_type)
+        self._log.debug("Getting stats for service %s from %s with offset %d",
+                        service_type, path, offset)
 
         # Use direct I/O if possible, to avoid the local filesystem cache
         # from hiding metadata file updates from other hosts.  For NFS, we
@@ -72,6 +95,7 @@
         try:
             with self._storage_access_lock:
                 f = os.open(path, direct_flag | os.O_RDONLY)
+                os.lseek(f, offset, os.SEEK_SET)
                 data = os.read(f, read_size)
                 os.close(f)
         except IOError as e:
@@ -83,7 +107,7 @@
                      for i in range(0, len(data), bs)
                      if data[i] != '\0'))
 
-    def put_stats(self, storage_dir, service_type, host_id, data):
+    def put_stats(self, client, service_type, host_id, data):
         """
         Writes to the storage in file <storage_dir>/<service-type>.metadata,
         storing the hex string data (e.g. 01bc4f[...]) in binary format.
@@ -96,8 +120,8 @@
         the file after the write.
         """
         host_id = int(host_id)
-        path = os.path.join(storage_dir, self._get_filename(service_type))
-        offset = host_id * constants.HOST_SEGMENT_BYTES
+        path, offset = self._backends[client].filename(service_type)
+        offset += host_id * constants.HOST_SEGMENT_BYTES
         self._log.debug("Writing stats for service %s, host id %d"
                         " to file %s, offset %d",
                         service_type, host_id, path, offset)
@@ -107,7 +131,6 @@
         with self._storage_access_lock:
             f = None
             try:
-                util.mkdir_recursive(storage_dir)
                 f = io.open(path, "r+b")
                 f.seek(offset, os.SEEK_SET)
                 f.write(byte_data)
@@ -122,7 +145,22 @@
 
         self._log.debug("Finished")
 
-    def _get_filename(self, service_type):
-        # Nothing special yet
-        # FIXME should escape special chars before production deployment
-        return "{0}.{1}".format(service_type, constants.MD_EXTENSION)
+    def get_service_path(self, client, service):
+        """
+        Returns the full path to a file or device that holds the data
+        for specified service.
+
+        Client ID is provided by the broker logic.
+        """
+        return self._backends[client].filename(service)[0]
+
+    def cleanup(self, client):
+        """
+        After client (like ha_agent) disconnects the storage backend
+        needs to be freed properly.
+
+        Client ID is provided by the broker logic.
+        """
+        if client in self._backends:
+            self._backends[client].disconnect()
+            del self._backends[client]
diff --git a/ovirt_hosted_engine_ha/client/client.py 
b/ovirt_hosted_engine_ha/client/client.py
index 508ff0a..db67bd6 100644
--- a/ovirt_hosted_engine_ha/client/client.py
+++ b/ovirt_hosted_engine_ha/client/client.py
@@ -18,7 +18,6 @@
 #
 
 import logging
-import os
 import time
 
 from ..agent import constants as agent_constants
@@ -81,22 +80,7 @@
             self._config = config.Config()
         broker = brokerlink.BrokerLink()
         with broker.connection():
-            stats = broker.get_stats_from_storage(
-                path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE)
-
-        return self._parse_stats(stats, mode)
-
-    def get_all_stats_direct(self, dom_path, service_type, mode=StatModes.ALL):
-        """
-        Like get_all_stats(), but bypasses broker by directly accessing
-        storage.
-        """
-        from ..broker import storage_broker
-
-        sb = storage_broker.StorageBroker()
-        path = os.path.join(dom_path, constants.SD_METADATA_DIR)
-        stats = sb.get_raw_stats_for_service_type(path, service_type)
+            stats = broker.get_stats_from_storage(constants.SERVICE_TYPE)
 
         return self._parse_stats(stats, mode)
 
@@ -136,16 +120,6 @@
         """
         return self.get_all_stats(self.StatModes.HOST)
 
-    def get_all_host_stats_direct(self, dom_path, service_type):
-        """
-        Like get_all_host_stats(), but bypasses broker by directly accessing
-        storage.
-        """
-        return self.get_all_stats_direct(
-            dom_path,
-            service_type,
-            self.StatModes.HOST)
-
     def set_global_md_flag(self, flag, value):
         """
         Connects to HA broker and sets flags in global metadata, leaving
@@ -169,9 +143,7 @@
 
         broker = brokerlink.BrokerLink()
         with broker.connection():
-            all_stats = broker.get_stats_from_storage(
-                path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE)
+            all_stats = broker.get_stats_from_storage(constants.SERVICE_TYPE)
 
             global_stats = all_stats.get(0)
             if global_stats and len(global_stats):
@@ -200,9 +172,7 @@
         host_id = int(self._config.get(config.ENGINE, config.HOST_ID))
         broker = brokerlink.BrokerLink()
         with broker.connection():
-            stats = broker.get_stats_from_storage(
-                path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE)
+            stats = broker.get_stats_from_storage(constants.SERVICE_TYPE)
 
         score = 0
         if host_id in stats:
diff --git a/ovirt_hosted_engine_ha/env/constants.py.in 
b/ovirt_hosted_engine_ha/env/constants.py.in
index 73a2ba9..3fefd40 100644
--- a/ovirt_hosted_engine_ha/env/constants.py.in
+++ b/ovirt_hosted_engine_ha/env/constants.py.in
@@ -22,7 +22,6 @@
 METADATA_FEATURE_VERSION = 1
 METADATA_PARSE_VERSION = 1
 
-MD_EXTENSION = 'metadata'
 MAX_HOST_ID_SCAN = 64
 HOST_SEGMENT_BYTES = 4096
 METADATA_BLOCK_BYTES = 512
diff --git a/ovirt_hosted_engine_ha/lib/Makefile.am 
b/ovirt_hosted_engine_ha/lib/Makefile.am
index 22133a8..03660dc 100644
--- a/ovirt_hosted_engine_ha/lib/Makefile.am
+++ b/ovirt_hosted_engine_ha/lib/Makefile.am
@@ -33,6 +33,8 @@
        metadata.py \
        util.py \
        vds_client.py \
+       storage_backends.py \
+       storage_backends_test.py \
        $(NULL)
 
 SUBDIRS = \
diff --git a/ovirt_hosted_engine_ha/lib/brokerlink.py 
b/ovirt_hosted_engine_ha/lib/brokerlink.py
index b197d87..2888fb0 100644
--- a/ovirt_hosted_engine_ha/lib/brokerlink.py
+++ b/ovirt_hosted_engine_ha/lib/brokerlink.py
@@ -148,6 +148,33 @@
         self._log.debug("Success, status %s", response)
         return response
 
+    def get_service_path(self, service):
+        request = "service-path {0}".format(service)
+        try:
+            response = self._checked_communicate(request)
+        except Exception as e:
+            self._log.error("Exception getting service path: %s", str(e))
+            raise RequestError("Failed to get service path: {0}"
+                               .format(str(e)))
+        self._log.debug("Success, service path %s", response)
+        return response
+
+    def set_storage_domain(self, sd_type, **options):
+        request = ["set-storage-domain {0}".format(sd_type)]
+        for (k, v) in options.iteritems():
+            request.append("{0}={1}".format(k, str(v)))
+        request = " ".join(request)
+
+        try:
+            response = self._checked_communicate(request)
+        except Exception as e:
+            raise RequestError("Failed to set storage domain {0}, "
+                               "options {1}: {2}"
+                               .format(sd_type, options, e))
+
+        self._log.info("Success, id %s", response)
+        return response
+
     def put_stats_on_storage(self, storage_dir, service_type, host_id, data):
         """
         Puts data on the shared storage according to the parameters.
@@ -161,13 +188,13 @@
                    .format(storage_dir, service_type, host_id, hex_data))
         self._checked_communicate(request)
 
-    def get_stats_from_storage(self, storage_dir, service_type):
+    def get_stats_from_storage(self, service_type):
         """
         Returns data from the shared storage for all hosts of the specified
         service type.
         """
-        request = ("get-stats storage_dir={0} service_type={1}"
-                   .format(storage_dir, service_type))
+        request = ("get-stats service_type={1}"
+                   .format(service_type))
         result = self._checked_communicate(request)
 
         tokens = result.split()
diff --git a/ovirt_hosted_engine_ha/lib/storage_backends.py 
b/ovirt_hosted_engine_ha/lib/storage_backends.py
new file mode 100644
index 0000000..9bb2b38
--- /dev/null
+++ b/ovirt_hosted_engine_ha/lib/storage_backends.py
@@ -0,0 +1,334 @@
+from collections import namedtuple
+import os
+import struct
+import subprocess
+import zlib
+from ..env import constants
+from . import util
+import math
+from cStringIO import StringIO
+from operator import itemgetter
+
+
+class BlockBackendCorruptedException(Exception):
+    """
+    Exception raised by BlockBackend when the internal metadata
+    structure reports a corrupted data (CRC mismatch).
+    """
+    pass
+
+
+class StorageBackend(object):
+    """
+    The base template for Storage backend classes.
+    """
+
+    def __init__(self):
+        # the atomic block size of the underlying storage
+        self._blocksize = 512
+
+    def connect(self):
+        """Initialize the storage."""
+        raise NotImplementedError()
+
+    def disconnect(self):
+        """Close the storage."""
+        raise NotImplementedError()
+
+    def filename(self, service):
+        """
+        Return a tuple with the filename to open and bytes to skip
+        to get to the metadata structures.
+        """
+        raise NotImplementedError()
+
+    @property
+    def blocksize(self):
+        return self._blocksize
+
+    def create(self, service_map):
+        """
+        Reinitialize the storage backend according to the service_map.
+        Key represents service name and value contains the size of the
+        required block in Bytes.
+        """
+        raise NotImplementedError()
+
+
+class FilesystemBackend(StorageBackend):
+    """
+    Backend for all filesystem based access structures. This
+    includes VDSM's LVM block devices as they are accessed using symlinks
+    in the same structure that VDSM uses for NFS based storage domains.
+    """
+    def __init__(self, sd_uuid, dom_type):
+        super(FilesystemBackend, self).__init__()
+        self._sd_uuid = sd_uuid
+        self._dom_type = dom_type
+        self._lv_based = False
+        self._storage_path = None
+
+    def filename(self, service):
+        fname = os.path.join(self._storage_path, service)
+        return (fname, 0)
+
+    def get_domain_path(self, sd_uuid, dom_type):
+        """
+        Return path of storage domain holding engine vm
+        """
+        parent = constants.SD_MOUNT_PARENT
+        if dom_type == 'glusterfs':
+            parent = os.path.join(parent, 'glusterSD')
+
+        for dname in os.listdir(parent):
+            path = os.path.join(parent, dname, sd_uuid)
+            if os.access(path, os.F_OK):
+                if dname == "blockSD":
+                    self._lv_based = True
+                return path
+        raise Exception("path to storage domain {0} not found in {1}"
+                        .format(sd_uuid, parent))
+
+    def connect(self):
+        self._lv_based = False
+        self._storage_path = os.path.join(self.get_domain_path(self._sd_uuid,
+                                                               self._dom_type),
+                                          constants.SD_METADATA_DIR)
+        util.mkdir_recursive(self._storage_path)
+
+        if not self._lv_based:
+            return
+
+        # create LV symlinks
+        uuid = self._sd_uuid
+        for lv in os.listdir(os.path.join("/dev", uuid)):
+            # skip all LVs that do not have proper name
+            if not lv.startswith(constants.SD_METADATA_DIR + "-"):
+                continue
+
+            # strip the prefix and use the rest as symlink name
+            service = lv.split(constants.SD_METADATA_DIR + "-", 1)[-1]
+            os.symlink(os.path.join("/dev", uuid, lv),
+                       os.path.join(self._storage_path, service))
+
+    def disconnect(self):
+        pass
+
+    def lvcreate(self, vg_uuid, lv_name, size_bytes, popen=subprocess.Popen):
+        """
+        Call lvm lvcreate and ask it to create a Logical Volume in the
+        Storage Domain's Volume Group. It should be named lv_name
+        and be big enough to fit size_bytes into it.
+        """
+        lvc = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE,
+                    args=["lvm", "lvcreate", "-L", str(size_bytes)+"B",
+                          "-n", lv_name, vg_uuid])
+        lvc.wait()
+
+    def create(self, service_map):
+        for service, size in service_map.iteritems():
+            service_path = os.path.join(self._storage_path, service)
+            if self._lv_based:
+                lvname = "-".join([constants.SD_METADATA_DIR, service])
+                self.lvcreate(self._sd_uuid, lvname, size)
+            else:
+                # file based storage
+                with open(service_path, "w") as f:
+                    # create an empty file, possibly utilizing
+                    # sparse files if size was provided
+                    if size:
+                        f.seek(size - 1)
+                        f.write(0)
+
+        # reconnect so all links are refreshed
+        self.disconnect()
+        self.connect()
+
+
+class BlockBackend(StorageBackend):
+    """
+    This uses a pure block device to expose the data. It requires device
+    mapper support to explode the single device to couple of virtual files.
+
+    This is supposed to be used for devices that are not managed by VDSM
+    or do not use LVM.
+
+    The structure is described using a table that starts at block 0
+    of the block device.
+
+    The format of that block is:
+
+    <the next chained block:64bit> - 0 means this is the last block
+    <service name used length: 1 Byte>
+    <service name: 63 Bytes>
+    <data area start block:64 bit>
+    <data area block length:64 bit>
+    ... data area records can be repeated if they fit into one block
+    ... if there is need for more data area records, one of the chained
+    ... blocks can add them to the same service name
+    128bit (16B) of 0s as a sentinel
+    32bit CRC32
+
+    This information is converted to Device Mapper table and used to create
+    the logical device files.
+    """
+
+    # Binary format specifications, all in network byte order
+    # The name supports only 63 characters
+    BlockInfo = namedtuple("BlockInfo", ("next", "name", "pieces", "valid"))
+    BlockStructHeader = struct.Struct("!Q64p")
+    BlockStructData = struct.Struct("!QQ")
+    BlockCRC = struct.Struct("!L")
+
+    def __init__(self, block_dev_name, dm_prefix):
+        super(BlockBackend, self).__init__()
+        self._block_dev_name = block_dev_name
+        self._dm_prefix = dm_prefix
+        self._services = {}
+
+    def parse_meta_block(self, block):
+        """
+        Parse one info block from the raw byte representation
+        to namedtuple BlockInfo.
+        """
+        next_block, name = self.BlockStructHeader.unpack_from(block, 0)
+        pieces = []
+        seek = self.BlockStructHeader.size
+        while True:
+            start, size = self.BlockStructData.unpack_from(block, seek)
+            seek += self.BlockStructData.size
+            # end of blocks section sentinel
+            if start == size and size == 0:
+                break
+            pieces.append((start, size))
+        crc = zlib.crc32(block[:seek]) & 0xffffffff
+        # the comma is important, unpack_from returns a single element tuple
+        expected_crc, = self.BlockCRC.unpack_from(block, seek)
+
+        return self.BlockInfo._make((next_block, name,
+                                     tuple(pieces), crc == expected_crc))
+
+    def get_services(self, block_device_fo):
+        """
+        Read all the info blocks from a block device and
+        assemble the services dictionary mapping
+        service name to a list of (data block start, size)
+        tuples.
+        """
+        offset = block_device_fo.tell()
+        services = {}
+        while True:
+            block = block_device_fo.read(self.blocksize)
+            parsed = self.parse_meta_block(block)
+            if not parsed.valid:
+                raise BlockBackendCorruptedException(
+                    "CRC for block ending at %d does not match data!"
+                    % block_device_fo.tell())
+            services.setdefault(parsed.name, [])
+            services[parsed.name].extend(parsed.pieces)
+            if parsed.next == 0:
+                break
+            else:
+                block_device_fo.seek(offset + parsed.next * self.blocksize, 0)
+        return services
+
+    def dm_name(self, service):
+        return os.path.join(self._dm_prefix, service)
+
+    def compute_dm_table(self, pieces):
+        """
+        Take a list of tuples in the form of (start, size) and
+        create the string representation of device mapper table
+        that can be used in dmsetup.
+        """
+        table = []
+        log_start = 0
+        for start, size in pieces:
+            table.append("%d %d linear %s %d"
+                         % (log_start, size, self._block_dev_name, start))
+            log_start += size
+        return "\n".join(table)
+
+    def connect(self):
+        with open(self._block_dev_name, "r") as bd:
+            self._services = self.get_services(bd)
+
+        for name, pieces in self._services:
+            table = self.compute_dm_table(pieces)
+            self.dmcreate(name, table)
+
+    def disconnect(self):
+        for name in self._services:
+            self.dmremove(name)
+
+    def dmcreate(self, name, table, popen=subprocess.Popen):
+        """
+        Call dmsetup create <name> and pass it the table.
+        """
+        dm = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                   stderr=subprocess.PIPE,
+                   args=["dmsetup", "create", name])
+        print "Table for %s" % name
+        print table
+        print
+
+        dm.communicate(table)
+
+    def dmremove(self, name, popen=subprocess.Popen):
+        """
+        Call dmsetup remove to destroy the device.
+        """
+        dm = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                   stderr=subprocess.PIPE,
+                   args=["dmsetup", "remove", name])
+        stdout, stderr = dm.communicate()
+
+    def filename(self, service):
+        if service not in self._services:
+            return None
+        else:
+            return os.path.join("/dev/mapper", self.dm_name(service)), 0
+
+    def create_info_blocks(self, service_map):
+        def bc(size):
+            """
+            Return the number of blocks needed to accommodate size
+            number of Bytes.
+            """
+            return int(math.ceil(size / float(self._blocksize)))
+
+        # first len(service_map) blocks will contain
+        # the information about services and their data locations
+        data_start = len(service_map)
+        info_blocks = []
+
+        # Linearize the list, put smaller services before bigger ones
+        service_list = service_map.items()
+        service_list.sort(key=itemgetter(1))
+
+        # create list of next ids that starts with 1, goes to the last
+        # index (size - 1) and then ends with 0
+        next_links = range(1, data_start) + [0]
+        for next_id, (service, size) in zip(next_links, service_list):
+            block_len = bc(size)
+            raw_data = StringIO()
+            raw_data.write(self.BlockStructHeader.pack(next_id, service))
+            raw_data.write(self.BlockStructData.pack(data_start, block_len))
+            raw_data.write(self.BlockStructData.pack(0, 0))
+            crc = zlib.crc32(raw_data.getvalue()) & 0xffffffff
+            raw_data.write(self.BlockCRC.pack(crc))
+            info_blocks.append(raw_data.getvalue())
+            data_start += block_len
+
+        return info_blocks
+
+    def create(self, service_map):
+        info_blocks = self.create_info_blocks(service_map)
+        with open(self._block_dev_name, "w") as dev:
+            for idx, b in enumerate(info_blocks):
+                dev.seek(idx * self._blocksize)
+                dev.write(b)
+
+        self.disconnect()
+        self.connect()
diff --git a/ovirt_hosted_engine_ha/lib/storage_backends_test.py 
b/ovirt_hosted_engine_ha/lib/storage_backends_test.py
new file mode 100644
index 0000000..03fb131
--- /dev/null
+++ b/ovirt_hosted_engine_ha/lib/storage_backends_test.py
@@ -0,0 +1,101 @@
+import unittest
+import cStringIO as StringIO
+import struct
+import zlib
+
+from .storage_backends import BlockBackend
+
+
+class StorageBackendTests(unittest.TestCase):
+
+    def test_single_bad_block_decode(self):
+        raw = struct.pack("!Q64pQQQQQQL",
+                          1, "test",
+                          1, 100,
+                          102, 100,
+                          0, 0,
+                          0)
+        b = BlockBackend("/dev/null", "test-1")
+        block = b.parse_meta_block(raw)
+        self.assertEqual(block, BlockBackend.BlockInfo(
+            1, "test", ((1, 100), (102, 100)), False))
+
+    def test_service_creation(self):
+        b = BlockBackend("/dev/null", "test-1")
+        blocks = b.create_info_blocks({"test1": 300,
+                                       "test2": 512,
+                                       "test3": 1024*1024*50})
+
+        self.assertEqual(3, len(blocks))
+
+        test1 = struct.pack("!Q64pQQQQ",
+                            1, "test1",
+                            3, 1,
+                            0, 0)
+        test1crc = struct.pack("!L", zlib.crc32(test1) & 0xffffffff)
+        test2 = struct.pack("!Q64pQQQQ",
+                            2, "test2",
+                            4, 1,
+                            0, 0)
+        test2crc = struct.pack("!L", zlib.crc32(test2) & 0xffffffff)
+        test3 = struct.pack("!Q64pQQQQ",
+                            0, "test3",
+                            5, 102400,
+                            0, 0)
+        test3crc = struct.pack("!L", zlib.crc32(test3) & 0xffffffff)
+
+        expected = [
+            test1 + test1crc,
+            test2 + test2crc,
+            test3 + test3crc
+        ]
+
+        self.assertEqual(expected, blocks)
+
+    def test_single_good_block_decode(self):
+        raw = struct.pack("!Q64pQQQQQQ",
+                          1, "test",
+                          1, 100,
+                          102, 100,
+                          0, 0)
+        rawcrc = struct.pack("!L", zlib.crc32(raw) & 0xffffffff)
+        b = BlockBackend("/dev/null", "test-1")
+        block = b.parse_meta_block(raw+rawcrc)
+        self.assertEqual(block, BlockBackend.BlockInfo(
+            1, "test", ((1, 100), (102, 100)), True))
+
+    def test_dm_table(self):
+        block = BlockBackend.BlockInfo(1, "test", ((1, 100), (102, 100)), True)
+        b = BlockBackend("/dev/null", "test-1")
+        table = b.compute_dm_table(block.pieces)
+        expected = ("0 100 linear /dev/null 1\n"
+                    "100 100 linear /dev/null 102")
+        self.assertEqual(expected, table)
+
+    def test_get_services(self):
+        raw1 = struct.pack("!Q64pQQQQQQ",
+                           1, "test",
+                           1, 100,
+                           102, 100,
+                           0, 0)
+        raw1crc = struct.pack("!L", zlib.crc32(raw1) & 0xffffffff)
+
+        raw2 = struct.pack("!Q64pQQQQQQ",
+                           0, "test2",
+                           2, 200,
+                           202, 200,
+                           0, 0)
+        raw2crc = struct.pack("!L", zlib.crc32(raw2) & 0xffffffff)
+
+        b = BlockBackend("/dev/null", "test-1")
+        blockdev = StringIO.StringIO()
+        blockdev.write(raw1)
+        blockdev.write(raw1crc)
+        blockdev.seek(b.blocksize)
+        blockdev.write(raw2)
+        blockdev.write(raw2crc)
+        blockdev.seek(0)
+        expected = {'test': [(1, 100), (102, 100)],
+                    'test2': [(2, 200), (202, 200)]}
+        services = b.get_services(blockdev)
+        self.assertEqual(expected, services)


-- 
To view, visit http://gerrit.ovirt.org/25624
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I965642683a663c5655342f06d9f3ac19533ca91b
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-hosted-engine-ha
Gerrit-Branch: master
Gerrit-Owner: Martin Sivák <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to