Francesco Romani has uploaded a new change for review.

Change subject: WIP: HACK: VDSM interface using JSON-RPC
......................................................................

WIP: HACK: VDSM interface using JSON-RPC

Bootstrap an hypervisor interface to VDSM using JSON-RPC.
This patch is a work in progress and it is published
as live demo and as base for further discussion.

Known issues:
- undocumented dependency on stomp.py.
  important side effect: no code sharing whatsoever with VDSM
  infrastrucutre

- hard coded assumptions: SSL enabled, localhost, VDSM port

- error treatment is just absent.

- copy/pasted ExpiringDict from http://gerrit.ovirt.org/#/c/36716/

- implementation is barely tested, and not polished at all

Change-Id: I7671dd151cc5342728eb864925242f1d04c47b05
Signed-off-by: Francesco Romani <[email protected]>
---
M mom/HypervisorInterfaces/Makefile.am
A mom/HypervisorInterfaces/vdsmrpcInterface.py
2 files changed, 386 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/mom refs/changes/94/37294/1

diff --git a/mom/HypervisorInterfaces/Makefile.am 
b/mom/HypervisorInterfaces/Makefile.am
index e01c9f3..99e0a18 100644
--- a/mom/HypervisorInterfaces/Makefile.am
+++ b/mom/HypervisorInterfaces/Makefile.am
@@ -25,6 +25,7 @@
        __init__.py \
        libvirtInterface.py \
        vdsmInterface.py \
+       vdsmrpcInterface.py \
        $(NULL)
 
 clean-local: \
diff --git a/mom/HypervisorInterfaces/vdsmrpcInterface.py 
b/mom/HypervisorInterfaces/vdsmrpcInterface.py
new file mode 100644
index 0000000..763a630
--- /dev/null
+++ b/mom/HypervisorInterfaces/vdsmrpcInterface.py
@@ -0,0 +1,385 @@
+# Memory Overcommitment Manager
+# Copyright (C) 2015 Francesco Romani, Red Hat Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import contextlib
+import json
+import logging
+import os
+import sys
+import threading
+import time
+import traceback
+import uuid
+
+import stomp
+
+from mom.HypervisorInterfaces.HypervisorInterface import HypervisorInterface, \
+    HypervisorInterfaceError
+
+# Copied from lib/vdsm/vdscli.py
+PKIDIR = '/etc/pki/vdsm'
+KEYFILE = os.path.join(PKIDIR, 'keys/vdsmkey.pem')
+CERTFILE = os.path.join(PKIDIR, 'certs/vdsmcert.pem')
+CACERT = os.path.join(PKIDIR, 'certs/cacert.pem')
+
+HOST = '127.0.0.1'
+PORT = 54321
+DESTINATION = "/queue/_local/vdsm/requests"
+TIMEOUT = 10  # seconds
+
+
+class _Listener(stomp.ConnectionListener):
+    def __init__(self, enqueue):
+        super(_Listener, self,).__init__()
+        self._enqueue = enqueue
+        logging.info('Listener created')
+
+    def on_error(self, headers, message):
+        logging.error("Error: %s" % message)
+        # TODO: react in a saner manner
+        raise RuntimeError
+
+    def on_message(self, headers, message):
+#        logging.debug("Message: %s" % message)
+        self._enqueue(message)
+
+
+class Request(object):
+    def __init__(self, method, **params):
+        self._uuid = str(uuid.uuid4())
+        self._body = self._make(method, params)
+        self._answer = threading.Event()
+        self._message = {}
+
+    @property
+    def body(self):
+        return self._body
+
+    @property
+    def uuid(self):
+        return self._uuid
+
+    def send(self, logger, conn, dest=DESTINATION):
+        data = json.dumps(self._body)
+#        logger.debug('sending: %s', data) 
+        conn.send(body=data, destination=dest)
+        done = self._answer.wait(TIMEOUT)
+        if done:
+            return self._message
+        raise vdsmException(self._body, logger)
+
+    def recv(self, message):
+        self._message = message
+        self._answer.set()
+
+    def _make(self, method, params):
+        return {"id": self._uuid,
+                "jsonrpc": "2.0",
+                "method": method,
+                "params": params}
+        
+
+class vdsmrpcInterface(HypervisorInterface):
+
+    def __init__(self):
+        self.logger = logging.getLogger('mom.vdsmRpcInterface')
+        self._pending = {}
+        self._cache = _ExpiringDict(TIMEOUT)  # FIXME
+        self.logger.debug('Cache ready')
+        self._listener = _Listener(self._enqueue)
+        self.logger.debug('Listener ready')
+        self._connect()
+        self.logger.debug('Interface ready')
+
+    def _connect(self):
+        self.logger.info('connecting to JSON-RPC: %s:%i', HOST, PORT)
+        self._conn = stomp.Connection10(
+            host_and_ports=((HOST, PORT),),
+            use_ssl=True,
+            ssl_key_file=KEYFILE,
+            ssl_cert_file=CERTFILE,
+            ssl_ca_certs=CACERT)
+        self._conn.set_listener("", self._listener)
+        self.logger.info('starting connection')
+        self._conn.start()
+        self.logger.info('connection started')
+
+    @contextlib.contextmanager
+    def _track(self, req):
+        self._pending[req.uuid] = req
+        yield req
+        self._pending.pop(req.uuid, None)
+
+    def _enqueue(self, message):
+        try:
+            msg = json.loads(message)
+            req = self._pending[msg["id"]]
+        except KeyError:
+            self.logger.warning('spurious message: %s', message)
+        else:
+            req.recv(msg)
+
+    def _call(self, method, **params):
+        with self._track(Request(method, **params)) as req:
+            res = req.send(self.logger, self._conn)
+#            self.logger.debug('received: %s', res)
+            return res
+
+    def _getVmStats(self, vmId):
+        stats = self._cache.get(vmId)
+        if stats is None:
+            stats = self._call('VM.getStats', vmID=vmId)
+            self._cache[vmId] = stats
+        return stats
+
+    def getVmList(self):
+        vmIds = []
+        try:
+            response = self._call('Host.getVMFullList')
+            vmList = response['result']
+            vmIds = [vm['vmId']
+                    for vm in vmList
+                    if vm['status'] == 'Up']
+#            self.logger.debug('VM List: %s', vmIds)
+            return vmIds
+        except vdsmException, e:
+            e.handle_exception()
+            return None
+
+    def getVmMemoryStats(self, uuid):
+        ret = {}
+        try:
+            response = self._getVmStats(uuid)['result'][0]
+            usage = int(response['memUsage'])
+            if usage == 0:
+                msg = "The ovirt-guest-agent is not active"
+                raise HypervisorInterfaceError(msg)
+            stats = response['memoryStats']
+            if not stats:
+                msg = "Detailed guest memory stats are not available, " \
+                        "please upgrade guest agent"
+                raise HypervisorInterfaceError(msg)
+
+            ret['mem_available'] = int(stats['mem_total'])
+            ret['mem_unused'] = int(stats['mem_unused'])
+            ret['mem_free'] = int(stats['mem_free'])
+            ret['major_fault'] = int(stats['majflt'])
+            ret['minor_fault'] = int(stats['pageflt']) - int(stats['majflt'])
+            ret['swap_in'] = int(stats['swap_in'])
+            ret['swap_out'] = int(stats['swap_out'])
+
+            # get swap size and usage information if available
+            ret['swap_total'] = int(stats.get('swap_total', 0))
+            ret['swap_usage'] = int(stats.get('swap_usage', 0))
+
+            self.logger.debug('Memory stats: %s', ret)
+            return ret
+        except vdsmException, e:
+            raise HypervisorInterfaceError(e.msg)
+
+    def setVmBalloonTarget(self, uuid, target):
+        try:
+            self._call('VM.setBalloonTarget', vmID=uuid, target=target)
+        except vdsmException, e:
+            e.handle_exception()
+
+    def getVmInfo(self, vmId):
+        response = self._getVmStats(vmId)
+        stats = response['result'][0]
+        return {
+            'uuid': vmId,
+            'pid': stats['pid'],
+            'name': stats['vmName']}
+
+    def getStatsFields(self=None):
+        return set(['mem_available', 'mem_unused', 'mem_free',
+                    'major_fault', 'minor_fault', 'swap_in', 'swap_out',
+                    'swap_total', 'swap_usage'])
+
+    def getVmBalloonInfo(self, uuid):
+        try:
+            response = self._getVmStats(uuid)['result'][0]
+
+            balloon_info = response['balloonInfo']
+            if balloon_info:
+                # Make sure the values are numbers, VDSM is using str
+                # to avoid xml-rpc issues
+                # We are modifying the dict keys inside the loop so
+                # iterate over copy of the list with keys, also use
+                # list() to make this compatible with Python 3
+                for key in list(balloon_info.keys()):
+                    # Remove keys that are not important to MoM to make sure
+                    # the HypervisorInterface stays consistent between
+                    # libvirt and vdsm platforms.
+                    if key not in ("balloon_max", "balloon_min", 
"balloon_cur"):
+                        del balloon_info[key]
+                        continue
+                    balloon_info[key] = int(balloon_info[key])
+                return balloon_info
+        except vdsmException, e:
+            e.handle_exception()
+
+    def getVmCpuTuneInfo(self, uuid):
+        try:
+            ret = {}
+            response = self.getVmStats(uuid)['result'][0]
+
+            # Get user selection for vCPU limit
+            vcpuUserLimit = response.get('vcpuUserLimit', 100)
+            ret['vcpu_user_limit'] = vcpuUserLimit
+
+            # Get current vcpu tuning info
+            vcpuQuota = response.get('vcpuQuota', 0)
+            ret['vcpu_quota'] = vcpuQuota
+            vcpuPeriod = response.get('vcpuPeriod', 0)
+            ret['vcpu_period'] = vcpuPeriod
+
+            #Get num of vCPUs
+            vcpuCount = response.get('vcpuCount', None)
+            if vcpuCount == None:
+                return None
+            else:
+                ret['vcpu_count'] = vcpuCount
+
+            # Make sure the values are numbers, VDSM is using str
+            # to avoid xml-rpc issues
+            # We are modifying the dict keys inside the loop so
+            # iterate over copy of the list with keys, also use
+            # list() to make this compatible with Python 3
+            for key in list(ret.keys()):
+                ret[key] = int(ret[key])
+            return ret
+        except vdsmException, e:
+            e.handle_exception()
+
+    def setVmCpuTune(self, uuid, quota, period):
+        try:
+            self._call('VM.setCpuTuneQuota', vmID=uuid, quota=quota)
+        except vdsmException, e:
+            e.handle_exception()
+        try:
+            self._call('VM.setCpuTunePeriod', vmID=uuid, period=period)
+        except vdsmException, e:
+            e.handle_exception()
+
+    def ksmTune(self, tuningParams):
+        def write_value(fname, value):
+            try:
+                with open(fname, 'w') as f:
+                    f.write(str(value))
+            except IOError, (errno, strerror):
+                self.logger.warn("KSM: Failed to write %s: %s", fname, 
strerror)
+
+        for (key, val) in tuningParams.items():
+            write_value('/sys/kernel/mm/ksm/%s' % key, val)
+
+
+
+class vdsmException(Exception):
+
+    def __init__(self, msg, logger):
+        self.msg = msg
+        self.logger = logger
+
+    def handle_exception(self):
+        self.logger.error(self.msg)
+        self.logger.error(traceback.format_exc())
+
+
+def instance(config):
+    return vdsmrpcInterface()
+
+
+# copied from http://gerrit.ovirt.org/#/c/36716/
+class _ExpiringDict(object):
+    """
+    ExpiringDict behaves like a dict, but an expiration time
+    is attached to each key. Thread safe.
+
+    Parameters:
+    ttl: validity of the keys in `clock' time units since
+         the time of their inserion in the ExpiringDict
+    clock: time.time() or monotonic_time()-like callable.
+
+    Expired keys are transparently removed on the first attempt
+    to access them, using get, __getitem__ or __contains__.
+
+    Expired keys are considered absent, even though not yet
+    actually removed, for __len__.
+    This is done to not excessively slow down __len__.
+    """
+    def __init__(self, ttl, clock=time.time):
+        self._ttl = ttl
+        self._clock = clock
+        self._items = {}
+        self._lock = threading.Lock()
+
+    def get(self, key, default=None):
+        with self._lock:
+            value = self._get(key)
+            if value is not None:
+                return value
+            else:
+                return default
+
+    def clear(self):
+        self._items.clear()
+
+    def __setitem__(self, key, value):
+        with self._lock:
+            self._items[key] = self._clock() + self._ttl, value
+
+    def __getitem__(self, key):
+        with self._lock:
+            value = self._get(key)
+            if value is not None:
+                return value
+            else:
+                raise KeyError(key)
+
+    def __delitem__(self, key):
+        with self._lock:
+            del self._items[key]
+
+    def __contains__(self, key):
+        with self._lock:
+            return self._get(key) is not None
+
+    def __len__(self):
+        # reminder: __len__ is used as fallback by __nonzero__
+        with self._lock:
+            now = self._clock()
+            return sum(expiration > now
+                       for expiration, _
+                       in self._items.itervalues())
+
+    # private
+
+    def _get(self, key):
+        """
+        Return the None if invalid or expired key.
+        Automatically cleanup expired keys.
+        Caller must ensure proper locking.
+        """
+        if key not in self._items:
+            return None
+
+        expiration, value = self._items[key]
+        if expiration <= self._clock():
+            del self._items[key]
+            return None
+
+        return value


-- 
To view, visit http://gerrit.ovirt.org/37294
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7671dd151cc5342728eb864925242f1d04c47b05
Gerrit-PatchSet: 1
Gerrit-Project: mom
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to