Basic implementation of the QMP connection and related tests.
Signed-off-by: Andrea Spadaccini <[email protected]>
---
lib/hypervisor/hv_kvm.py | 189 +++++++++++++++++++++++++++++
test/ganeti.hypervisor.hv_kvm_unittest.py | 98 +++++++++++++++
2 files changed, 287 insertions(+), 0 deletions(-)
diff --git a/lib/hypervisor/hv_kvm.py b/lib/hypervisor/hv_kvm.py
index 04d2327..622df48 100644
--- a/lib/hypervisor/hv_kvm.py
+++ b/lib/hypervisor/hv_kvm.py
@@ -34,6 +34,7 @@ import pwd
import struct
import fcntl
import shutil
+import socket
from ganeti import utils
from ganeti import constants
@@ -127,6 +128,186 @@ def _OpenTap(vnet_hdr=True):
return (ifname, tapfd)
+class QmpMessage:
+ """QEMU Messaging Protocol (QMP) message."""
+ def __init__(self, data):
+ """Creates a new QMP message based on the passed data.
+
+ """
+ if not isinstance(data, dict):
+ raise TypeError, "QmpMessage must be initialized with a dict"
+
+ self.data = data
+
+ def __getitem__(self, field_name):
+ """Get the value of the required field if present, or None.
+
+ Overrides the [] operator to provide access to the message data,
+ returning None if the required item is not in the message
+ @return: the value of the field_name field, or None if field_name
+ is not contained in the message
+
+ """
+
+ if field_name in self.data:
+ return self.data[field_name]
+
+ return None
+
+ def __setitem__(self, field_name, field_value):
+ """Set the value of the required field_name to field_value.
+
+ """
+ self.data[field_name] = field_value
+
+ @staticmethod
+ def BuildFromJsonString(json_string):
+ """Build a QmpMessage from a JSON encoded string.
+
+ @type json_string: str
+ @param json_string: JSON string representing the message
+ @rtype: L{QmpMessage}
+ @return: a L{QmpMessage} built from json_string
+
+ """
+ # Parse the string
+ data = serializer.LoadJson(json_string)
+ return QmpMessage(data)
+
+ def __str__(self):
+ # The protocol expects the JSON object to be sent as a single
+ # line, hence the need for indent=False.
+ return serializer.DumpJson(self.data, indent = False)
+
+ def __eq__(self, other):
+ # When comparing two QmpMessages, we are interested in comparing
+ # their internal representation of the message data
+ return self.data == other.data
+
+
+class QmpConnection:
+ """Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP)."""
+ _FIRST_MESSAGE_KEY = "QMP"
+ _EVENT_KEY = "event"
+ _ERROR_KEY = "error"
+ _CAPABILITIES_COMMAND = "qmp_capabilities"
+
+ def __init__(self, monitor_filename):
+ """Instantiates the QmpConnection object.
+
+ @type monitor_filename: string
+ @param monitor_filename: the filename of the UNIX raw socket on which the
+ QMP monitor is listening
+
+ """
+ self.monitor_filename = monitor_filename
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self._connected = False
+
+ def _check_connection(self):
+ if not self._connected:
+ raise errors.ProgrammerError("To use a QmpConnection you need to first"
+ " invoke connect() on it")
+
+ def connect(self):
+ """Connects to the QMP monitor.
+
+ Connects to the UNIX socket and makes sure that we can actually send and
+ receive data to the kvm instance via QMP.
+
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self.sock.connect(self.monitor_filename)
+ self.sock_file = self.sock.makefile()
+ self._connected = True
+
+ # Check if we receive a correct greeting message from the server
+ # (As per the QEMU Protocol Specification 0.1 - section 2.2)
+ greeting = self._Recv()
+ if not greeting[self._FIRST_MESSAGE_KEY]:
+ self._connected = False
+ raise errors.HypervisorError("kvm: qmp communication error (wrong"
+ " server greeting")
+
+ # Let's put the monitor in command mode using the qmp_capabilities
+ # command, or else no command will be executable.
+ # (As per the QEMU Protocol Specification 0.1 - section 4)
+ response = self.Execute(self._CAPABILITIES_COMMAND)
+ if response[self._ERROR_KEY]:
+ self._connected = False
+ raise errors.HypervisorError("kvm: qmp communication error (impossible"
+ " to execute the qmp_capabilities command)")
+
+ def _Recv(self):
+ """Receives a message from QMP and decodes the received JSON object.
+
+ @rtype: QmpMessage
+ @return: the received message
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self._check_connection()
+ try:
+ data = self.sock.recv(4096)
+ message = QmpMessage.BuildFromJsonString(data)
+ return message
+ except socket.error:
+ raise errors.HypervisorError("Unable to receive data from KVM using the"
+ " QMP protocol")
+ except Exception:
+ raise errors.ProgrammerError("QMP data serialization error")
+
+ def _Send(self, message):
+ """Encodes and sends a message to KVM using QMP.
+
+ @type message: QmpMessage
+ @param message: message to send to KVM
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self._check_connection()
+ try:
+ self.sock.send(str(message))
+ except socket.error:
+ raise errors.HypervisorError("Unable to send data to KVM using the QMP"
+ " protocol")
+ except Exception:
+ raise errors.ProgrammerError("QMP data serialization error")
+
+ def Execute(self, command, arguments = None):
+ """Executes a QMP command and returns the response of the server.
+
+ @type command: str
+ @param command: the command to execute
+ @type arguments: dict
+ @param arguments: dictionary of arguments to be passed to the command
+ @rtype: dict
+ @return: dictionary representing the received JSON object
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self._check_connection()
+ message = QmpMessage({"execute": command})
+ if arguments:
+ message["arguments"] = arguments
+ self._Send(message)
+
+ # Events can occur between the sending of the command and the reception
+ # of the response, so we need to filter out messages with the event key.
+ while True:
+ response = self._Recv()
+ if not response[self._EVENT_KEY]:
+ if response[self._ERROR_KEY]:
+ raise errors.ProgrammerError("kvm: got an error from QMP (%s)" %
+ response[self._ERROR_KEY])
+ return response
+
+
class KVMHypervisor(hv_base.BaseHypervisor):
"""KVM hypervisor interface"""
CAN_MIGRATE = True
@@ -325,6 +506,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
"""
return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name)
+ @classmethod
+ def _InstanceQmpMonitor(cls, instance_name):
+ """Returns the instance serial QMP socket name
+
+ """
+ return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name)
+
@staticmethod
def _SocatUnixConsoleParams():
"""Returns the correct parameters for socat
@@ -396,6 +584,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
utils.RemoveFile(pidfile)
utils.RemoveFile(cls._InstanceMonitor(instance_name))
utils.RemoveFile(cls._InstanceSerial(instance_name))
+ utils.RemoveFile(cls._InstanceQmpMonitor(instance_name))
utils.RemoveFile(cls._InstanceKVMRuntime(instance_name))
utils.RemoveFile(cls._InstanceKeymapFile(instance_name))
uid_file = cls._InstanceUidFile(instance_name)
diff --git a/test/ganeti.hypervisor.hv_kvm_unittest.py
b/test/ganeti.hypervisor.hv_kvm_unittest.py
index d9e5e73..3b3eef0 100755
--- a/test/ganeti.hypervisor.hv_kvm_unittest.py
+++ b/test/ganeti.hypervisor.hv_kvm_unittest.py
@@ -21,8 +21,12 @@
"""Script for testing the hypervisor.hv_kvm module"""
+import threading
import unittest
+import socket
+import os
+from ganeti import serializer
from ganeti import constants
from ganeti import compat
from ganeti import objects
@@ -33,6 +37,100 @@ from ganeti.hypervisor import hv_kvm
import testutils
+class QmpStub(threading.Thread):
+ """Stub for a QMP endpoint for a KVM instance
+
+ """
+ _QMP_BANNER_DATA = {"QMP": {"version": {
+ "package": "",
+ "qemu": {"micro": 50, "minor": 13, "major": 0},
+ "capabilities": [],
+ }}}
+
+ def __init__(self, socket_filename, test_script):
+ """Creates a QMP stub
+
+ - it checks each received message for JSON validity;
+
+ - it plays back the list of messages given in the responses parameter
+
+ @type filename: string
+ @param filename: filename of the UNIX socket that will be created by this
+ class and used for the communication
+ """
+ threading.Thread.__init__(self)
+ self.socket_filename = socket_filename
+ self.script = test_script
+
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.bind(self.socket_filename)
+ self.socket.listen(1)
+
+ def run(self):
+ conn, addr = self.socket.accept()
+
+ # Send the banner as the first thing
+ conn.send(serializer.DumpJson(self._QMP_BANNER_DATA, indent = False))
+
+ # Expect qmp_capabilities and return an empty response
+ conn.recv(4096)
+ empty_response = {"return": []}
+ conn.send(serializer.DumpJson(empty_response, indent = False))
+
+ while True:
+ # We ignore the expected message, as the purpose of this object is not
+ # to verify the correctness of the communication but to act as a
+ # partner for the SUT (System Under Test, that is QmpConnection)
+ msg = conn.recv(4096)
+ if not msg:
+ break
+
+ _, response = self.script.pop(0)
+ conn.send(serializer.DumpJson(response, indent = False))
+ conn.close()
+
+
+class TestQmpMessage(testutils.GanetiTestCase):
+ def testSerialization(self):
+ test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
+ message = hv_kvm.QmpMessage(test_data)
+
+ for k, v in test_data.items():
+ self.failUnless(message[k] == v)
+
+ rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
+ self.failUnless(rebuilt_message == message)
+
+
+class TestQmp(testutils.GanetiTestCase):
+ def testQmp(self):
+ # List of request/response test message exchanges.
+ test_script = [
+ (
+ {"execute": "query_commands", "arguments": []},
+ {"return": [{"name": "quit"}, {"name": "eject"}]},
+ ),
+ ]
+
+ self._executeTestScript(test_script)
+
+ def _executeTestScript(self, script):
+ # Set up the stub
+ socket_filename = self._CreateTempFile()
+ os.remove(socket_filename)
+ qmp_stub = QmpStub(socket_filename, script)
+ qmp_stub.start()
+
+ # Set up the QMP connection
+ qmp_connection = hv_kvm.QmpConnection(socket_filename)
+ qmp_connection.connect()
+
+ # Format the script
+ for message, expected_response in script:
+ response = qmp_connection.Execute(script)
+ self.failUnless(response == hv_kvm.QmpMessage(expected_response))
+
+
class TestConsole(unittest.TestCase):
def _Test(self, instance, hvparams):
cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
--
1.7.3.1