Francesco Romani has uploaded a new change for review. Change subject: WIP: HACK: VDSM interface using JSON-RPC ......................................................................
WIP: HACK: VDSM interface using JSON-RPC Bootstrap an hypervisor interface to VDSM using JSON-RPC. This patch is a work in progress and it is published as live demo and as base for further discussion. Known issues: - undocumented dependency on stomp.py. important side effect: no code sharing whatsoever with VDSM infrastrucutre - hard coded assumptions: SSL enabled, localhost, VDSM port - error treatment is just absent. - copy/pasted ExpiringDict from http://gerrit.ovirt.org/#/c/36716/ - implementation is barely tested, and not polished at all Change-Id: I7671dd151cc5342728eb864925242f1d04c47b05 Signed-off-by: Francesco Romani <[email protected]> --- M mom/HypervisorInterfaces/Makefile.am A mom/HypervisorInterfaces/vdsmrpcInterface.py 2 files changed, 386 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/mom refs/changes/94/37294/1 diff --git a/mom/HypervisorInterfaces/Makefile.am b/mom/HypervisorInterfaces/Makefile.am index e01c9f3..99e0a18 100644 --- a/mom/HypervisorInterfaces/Makefile.am +++ b/mom/HypervisorInterfaces/Makefile.am @@ -25,6 +25,7 @@ __init__.py \ libvirtInterface.py \ vdsmInterface.py \ + vdsmrpcInterface.py \ $(NULL) clean-local: \ diff --git a/mom/HypervisorInterfaces/vdsmrpcInterface.py b/mom/HypervisorInterfaces/vdsmrpcInterface.py new file mode 100644 index 0000000..763a630 --- /dev/null +++ b/mom/HypervisorInterfaces/vdsmrpcInterface.py @@ -0,0 +1,385 @@ +# Memory Overcommitment Manager +# Copyright (C) 2015 Francesco Romani, Red Hat Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import contextlib +import json +import logging +import os +import sys +import threading +import time +import traceback +import uuid + +import stomp + +from mom.HypervisorInterfaces.HypervisorInterface import HypervisorInterface, \ + HypervisorInterfaceError + +# Copied from lib/vdsm/vdscli.py +PKIDIR = '/etc/pki/vdsm' +KEYFILE = os.path.join(PKIDIR, 'keys/vdsmkey.pem') +CERTFILE = os.path.join(PKIDIR, 'certs/vdsmcert.pem') +CACERT = os.path.join(PKIDIR, 'certs/cacert.pem') + +HOST = '127.0.0.1' +PORT = 54321 +DESTINATION = "/queue/_local/vdsm/requests" +TIMEOUT = 10 # seconds + + +class _Listener(stomp.ConnectionListener): + def __init__(self, enqueue): + super(_Listener, self,).__init__() + self._enqueue = enqueue + logging.info('Listener created') + + def on_error(self, headers, message): + logging.error("Error: %s" % message) + # TODO: react in a saner manner + raise RuntimeError + + def on_message(self, headers, message): +# logging.debug("Message: %s" % message) + self._enqueue(message) + + +class Request(object): + def __init__(self, method, **params): + self._uuid = str(uuid.uuid4()) + self._body = self._make(method, params) + self._answer = threading.Event() + self._message = {} + + @property + def body(self): + return self._body + + @property + def uuid(self): + return self._uuid + + def send(self, logger, conn, dest=DESTINATION): + data = json.dumps(self._body) +# logger.debug('sending: %s', data) + conn.send(body=data, destination=dest) + done = self._answer.wait(TIMEOUT) + if done: + return self._message + raise vdsmException(self._body, logger) + + def recv(self, message): + self._message = message + self._answer.set() + + def _make(self, method, params): + return {"id": self._uuid, + "jsonrpc": "2.0", + "method": method, + "params": params} + + +class vdsmrpcInterface(HypervisorInterface): + + def __init__(self): + self.logger = logging.getLogger('mom.vdsmRpcInterface') + self._pending = {} + self._cache = _ExpiringDict(TIMEOUT) # FIXME + self.logger.debug('Cache ready') + self._listener = _Listener(self._enqueue) + self.logger.debug('Listener ready') + self._connect() + self.logger.debug('Interface ready') + + def _connect(self): + self.logger.info('connecting to JSON-RPC: %s:%i', HOST, PORT) + self._conn = stomp.Connection10( + host_and_ports=((HOST, PORT),), + use_ssl=True, + ssl_key_file=KEYFILE, + ssl_cert_file=CERTFILE, + ssl_ca_certs=CACERT) + self._conn.set_listener("", self._listener) + self.logger.info('starting connection') + self._conn.start() + self.logger.info('connection started') + + @contextlib.contextmanager + def _track(self, req): + self._pending[req.uuid] = req + yield req + self._pending.pop(req.uuid, None) + + def _enqueue(self, message): + try: + msg = json.loads(message) + req = self._pending[msg["id"]] + except KeyError: + self.logger.warning('spurious message: %s', message) + else: + req.recv(msg) + + def _call(self, method, **params): + with self._track(Request(method, **params)) as req: + res = req.send(self.logger, self._conn) +# self.logger.debug('received: %s', res) + return res + + def _getVmStats(self, vmId): + stats = self._cache.get(vmId) + if stats is None: + stats = self._call('VM.getStats', vmID=vmId) + self._cache[vmId] = stats + return stats + + def getVmList(self): + vmIds = [] + try: + response = self._call('Host.getVMFullList') + vmList = response['result'] + vmIds = [vm['vmId'] + for vm in vmList + if vm['status'] == 'Up'] +# self.logger.debug('VM List: %s', vmIds) + return vmIds + except vdsmException, e: + e.handle_exception() + return None + + def getVmMemoryStats(self, uuid): + ret = {} + try: + response = self._getVmStats(uuid)['result'][0] + usage = int(response['memUsage']) + if usage == 0: + msg = "The ovirt-guest-agent is not active" + raise HypervisorInterfaceError(msg) + stats = response['memoryStats'] + if not stats: + msg = "Detailed guest memory stats are not available, " \ + "please upgrade guest agent" + raise HypervisorInterfaceError(msg) + + ret['mem_available'] = int(stats['mem_total']) + ret['mem_unused'] = int(stats['mem_unused']) + ret['mem_free'] = int(stats['mem_free']) + ret['major_fault'] = int(stats['majflt']) + ret['minor_fault'] = int(stats['pageflt']) - int(stats['majflt']) + ret['swap_in'] = int(stats['swap_in']) + ret['swap_out'] = int(stats['swap_out']) + + # get swap size and usage information if available + ret['swap_total'] = int(stats.get('swap_total', 0)) + ret['swap_usage'] = int(stats.get('swap_usage', 0)) + + self.logger.debug('Memory stats: %s', ret) + return ret + except vdsmException, e: + raise HypervisorInterfaceError(e.msg) + + def setVmBalloonTarget(self, uuid, target): + try: + self._call('VM.setBalloonTarget', vmID=uuid, target=target) + except vdsmException, e: + e.handle_exception() + + def getVmInfo(self, vmId): + response = self._getVmStats(vmId) + stats = response['result'][0] + return { + 'uuid': vmId, + 'pid': stats['pid'], + 'name': stats['vmName']} + + def getStatsFields(self=None): + return set(['mem_available', 'mem_unused', 'mem_free', + 'major_fault', 'minor_fault', 'swap_in', 'swap_out', + 'swap_total', 'swap_usage']) + + def getVmBalloonInfo(self, uuid): + try: + response = self._getVmStats(uuid)['result'][0] + + balloon_info = response['balloonInfo'] + if balloon_info: + # Make sure the values are numbers, VDSM is using str + # to avoid xml-rpc issues + # We are modifying the dict keys inside the loop so + # iterate over copy of the list with keys, also use + # list() to make this compatible with Python 3 + for key in list(balloon_info.keys()): + # Remove keys that are not important to MoM to make sure + # the HypervisorInterface stays consistent between + # libvirt and vdsm platforms. + if key not in ("balloon_max", "balloon_min", "balloon_cur"): + del balloon_info[key] + continue + balloon_info[key] = int(balloon_info[key]) + return balloon_info + except vdsmException, e: + e.handle_exception() + + def getVmCpuTuneInfo(self, uuid): + try: + ret = {} + response = self.getVmStats(uuid)['result'][0] + + # Get user selection for vCPU limit + vcpuUserLimit = response.get('vcpuUserLimit', 100) + ret['vcpu_user_limit'] = vcpuUserLimit + + # Get current vcpu tuning info + vcpuQuota = response.get('vcpuQuota', 0) + ret['vcpu_quota'] = vcpuQuota + vcpuPeriod = response.get('vcpuPeriod', 0) + ret['vcpu_period'] = vcpuPeriod + + #Get num of vCPUs + vcpuCount = response.get('vcpuCount', None) + if vcpuCount == None: + return None + else: + ret['vcpu_count'] = vcpuCount + + # Make sure the values are numbers, VDSM is using str + # to avoid xml-rpc issues + # We are modifying the dict keys inside the loop so + # iterate over copy of the list with keys, also use + # list() to make this compatible with Python 3 + for key in list(ret.keys()): + ret[key] = int(ret[key]) + return ret + except vdsmException, e: + e.handle_exception() + + def setVmCpuTune(self, uuid, quota, period): + try: + self._call('VM.setCpuTuneQuota', vmID=uuid, quota=quota) + except vdsmException, e: + e.handle_exception() + try: + self._call('VM.setCpuTunePeriod', vmID=uuid, period=period) + except vdsmException, e: + e.handle_exception() + + def ksmTune(self, tuningParams): + def write_value(fname, value): + try: + with open(fname, 'w') as f: + f.write(str(value)) + except IOError, (errno, strerror): + self.logger.warn("KSM: Failed to write %s: %s", fname, strerror) + + for (key, val) in tuningParams.items(): + write_value('/sys/kernel/mm/ksm/%s' % key, val) + + + +class vdsmException(Exception): + + def __init__(self, msg, logger): + self.msg = msg + self.logger = logger + + def handle_exception(self): + self.logger.error(self.msg) + self.logger.error(traceback.format_exc()) + + +def instance(config): + return vdsmrpcInterface() + + +# copied from http://gerrit.ovirt.org/#/c/36716/ +class _ExpiringDict(object): + """ + ExpiringDict behaves like a dict, but an expiration time + is attached to each key. Thread safe. + + Parameters: + ttl: validity of the keys in `clock' time units since + the time of their inserion in the ExpiringDict + clock: time.time() or monotonic_time()-like callable. + + Expired keys are transparently removed on the first attempt + to access them, using get, __getitem__ or __contains__. + + Expired keys are considered absent, even though not yet + actually removed, for __len__. + This is done to not excessively slow down __len__. + """ + def __init__(self, ttl, clock=time.time): + self._ttl = ttl + self._clock = clock + self._items = {} + self._lock = threading.Lock() + + def get(self, key, default=None): + with self._lock: + value = self._get(key) + if value is not None: + return value + else: + return default + + def clear(self): + self._items.clear() + + def __setitem__(self, key, value): + with self._lock: + self._items[key] = self._clock() + self._ttl, value + + def __getitem__(self, key): + with self._lock: + value = self._get(key) + if value is not None: + return value + else: + raise KeyError(key) + + def __delitem__(self, key): + with self._lock: + del self._items[key] + + def __contains__(self, key): + with self._lock: + return self._get(key) is not None + + def __len__(self): + # reminder: __len__ is used as fallback by __nonzero__ + with self._lock: + now = self._clock() + return sum(expiration > now + for expiration, _ + in self._items.itervalues()) + + # private + + def _get(self, key): + """ + Return the None if invalid or expired key. + Automatically cleanup expired keys. + Caller must ensure proper locking. + """ + if key not in self._items: + return None + + expiration, value = self._items[key] + if expiration <= self._clock(): + del self._items[key] + return None + + return value -- To view, visit http://gerrit.ovirt.org/37294 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7671dd151cc5342728eb864925242f1d04c47b05 Gerrit-PatchSet: 1 Gerrit-Project: mom Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> _______________________________________________ Engine-patches mailing list [email protected] http://lists.ovirt.org/mailman/listinfo/engine-patches
