Basic implementation of the QMP connection and related tests.
Signed-off-by: Andrea Spadaccini <[email protected]>
---
lib/hypervisor/hv_kvm.py | 239 +++++++++++++++++++++++++++++
test/ganeti.hypervisor.hv_kvm_unittest.py | 102 ++++++++++++
2 files changed, 341 insertions(+), 0 deletions(-)
diff --git a/lib/hypervisor/hv_kvm.py b/lib/hypervisor/hv_kvm.py
index f660966..df2fdc1 100644
--- a/lib/hypervisor/hv_kvm.py
+++ b/lib/hypervisor/hv_kvm.py
@@ -34,6 +34,8 @@ import pwd
import struct
import fcntl
import shutil
+import socket
+import StringIO
from ganeti import utils
from ganeti import constants
@@ -127,6 +129,229 @@ def _OpenTap(vnet_hdr=True):
return (ifname, tapfd)
+class QmpMessage:
+ """QEMU Messaging Protocol (QMP) message.
+
+ """
+
+ def __init__(self, data):
+ """Creates a new QMP message based on the passed data.
+
+ """
+ if not isinstance(data, dict):
+ raise TypeError("QmpMessage must be initialized with a dict")
+
+ self.data = data
+
+ def __getitem__(self, field_name):
+ """Get the value of the required field if present, or None.
+
+ Overrides the [] operator to provide access to the message data,
+ returning None if the required item is not in the message
+ @return: the value of the field_name field, or None if field_name
+ is not contained in the message
+
+ """
+
+ if field_name in self.data:
+ return self.data[field_name]
+
+ return None
+
+ def __setitem__(self, field_name, field_value):
+ """Set the value of the required field_name to field_value.
+
+ """
+ self.data[field_name] = field_value
+
+ @staticmethod
+ def BuildFromJsonString(json_string):
+ """Build a QmpMessage from a JSON encoded string.
+
+ @type json_string: str
+ @param json_string: JSON string representing the message
+ @rtype: L{QmpMessage}
+ @return: a L{QmpMessage} built from json_string
+
+ """
+ # Parse the string
+ data = serializer.LoadJson(json_string)
+ return QmpMessage(data)
+
+ def __str__(self):
+ # The protocol expects the JSON object to be sent as a single
+ # line, hence the need for indent=False.
+ return serializer.DumpJson(self.data, indent=False)
+
+ def __eq__(self, other):
+ # When comparing two QmpMessages, we are interested in comparing
+ # their internal representation of the message data
+ return self.data == other.data
+
+
+class QmpConnection:
+ """Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP).
+
+ """
+ _FIRST_MESSAGE_KEY = "QMP"
+ _EVENT_KEY = "event"
+ _ERROR_KEY = "error"
+ _ERROR_CLASS_KEY = "class"
+ _ERROR_DATA_KEY = "data"
+ _ERROR_DESC_KEY = "desc"
+ _EXECUTE_KEY = "execute"
+ _ARGUMENTS_KEY = "arguments"
+ _CAPABILITIES_COMMAND = "qmp_capabilities"
+ _MESSAGE_END_TOKEN = "\r\n"
+
+ def __init__(self, monitor_filename):
+ """Instantiates the QmpConnection object.
+
+ @type monitor_filename: string
+ @param monitor_filename: the filename of the UNIX raw socket on which the
+ QMP monitor is listening
+
+ """
+ self.monitor_filename = monitor_filename
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ # We want to fail if the server doesn't send a complete message
+ # in a reasonable amount of time
+ self.sock.settimeout(5)
+ self._connected = False
+
+ def _check_connection(self):
+ """Make sure that the connection is established.
+
+ """
+ if not self._connected:
+ raise errors.ProgrammerError("To use a QmpConnection you need to first"
+ " invoke connect() on it")
+
+ def connect(self):
+ """Connects to the QMP monitor.
+
+ Connects to the UNIX socket and makes sure that we can actually send and
+ receive data to the kvm instance via QMP.
+
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self.sock.connect(self.monitor_filename)
+ self._connected = True
+
+ # Check if we receive a correct greeting message from the server
+ # (As per the QEMU Protocol Specification 0.1 - section 2.2)
+ greeting = self._Recv()
+ if not greeting[self._FIRST_MESSAGE_KEY]:
+ self._connected = False
+ raise errors.HypervisorError("kvm: qmp communication error (wrong"
+ " server greeting")
+
+ # Let's put the monitor in command mode using the qmp_capabilities
+ # command, or else no command will be executable.
+ # (As per the QEMU Protocol Specification 0.1 - section 4)
+ self.Execute(self._CAPABILITIES_COMMAND)
+
+ def _Recv(self):
+ """Receives a message from QMP and decodes the received JSON object.
+
+ @rtype: QmpMessage
+ @return: the received message
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self._check_connection()
+ recv_buffer = StringIO.StringIO()
+ try:
+ while True:
+ data = self.sock.recv(4096)
+ if not data:
+ break
+ recv_buffer.write(data)
+
+ # Check if we got the message end token (CRLF, as per the QEMU Protocol
+ # Specification 0.1 - Section 2.1.1)
+ if self._MESSAGE_END_TOKEN in data:
+ break
+
+ except socket.timeout, err:
+ raise errors.HypervisorError("Timeout while receiving a QMP message: "
+ "%s" % (err))
+ except socket.error, err:
+ raise errors.HypervisorError("Unable to receive data from KVM using the"
+ " QMP protocol: %s" % err)
+
+ received_data = recv_buffer.getvalue()
+ recv_buffer.close()
+
+ try:
+ message = QmpMessage.BuildFromJsonString(received_data)
+ except Exception, err:
+ raise errors.ProgrammerError("QMP data serialization error: %s" % err)
+ return message
+
+ def _Send(self, message):
+ """Encodes and sends a message to KVM using QMP.
+
+ @type message: QmpMessage
+ @param message: message to send to KVM
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self._check_connection()
+ try:
+ message_str = str(message)
+ except Exception, err:
+ raise errors.ProgrammerError("QMP data deserialization error: %s" % err)
+
+ try:
+ self.sock.sendall(message_str)
+ except socket.timeout, err:
+ raise errors.HypervisorError("Timeout while sending a QMP message: "
+ "%s (%s)" % (err.string, err.errno))
+ except socket.error, err:
+ raise errors.HypervisorError("Unable to send data from KVM using the"
+ " QMP protocol: %s" % err)
+
+ def Execute(self, command, arguments=None):
+ """Executes a QMP command and returns the response of the server.
+
+ @type command: str
+ @param command: the command to execute
+ @type arguments: dict
+ @param arguments: dictionary of arguments to be passed to the command
+ @rtype: dict
+ @return: dictionary representing the received JSON object
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+
+ """
+ self._check_connection()
+ message = QmpMessage({self._EXECUTE_KEY: command})
+ if arguments:
+ message[self._ARGUMENTS_KEY] = arguments
+ self._Send(message)
+
+ # Events can occur between the sending of the command and the reception
+ # of the response, so we need to filter out messages with the event key.
+ while True:
+ response = self._Recv()
+ err = response[self._ERROR_KEY]
+ if err:
+ raise errors.HypervisorError("kvm: error executing the %s"
+ " command: %s (%s, %s):" %
+ (command,
+ err[self._ERROR_DESC_KEY],
+ err[self._ERROR_CLASS_KEY],
+ err[self._ERROR_DATA_KEY]))
+
+ elif not response[self._EVENT_KEY]:
+ return response
+
+
class KVMHypervisor(hv_base.BaseHypervisor):
"""KVM hypervisor interface"""
CAN_MIGRATE = True
@@ -325,6 +550,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
"""
return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name)
+ @classmethod
+ def _InstanceQmpMonitor(cls, instance_name):
+ """Returns the instance serial QMP socket name
+
+ """
+ return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name)
+
@staticmethod
def _SocatUnixConsoleParams():
"""Returns the correct parameters for socat
@@ -396,6 +628,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
utils.RemoveFile(pidfile)
utils.RemoveFile(cls._InstanceMonitor(instance_name))
utils.RemoveFile(cls._InstanceSerial(instance_name))
+ utils.RemoveFile(cls._InstanceQmpMonitor(instance_name))
utils.RemoveFile(cls._InstanceKVMRuntime(instance_name))
utils.RemoveFile(cls._InstanceKeymapFile(instance_name))
uid_file = cls._InstanceUidFile(instance_name)
@@ -939,6 +1172,12 @@ class KVMHypervisor(hv_base.BaseHypervisor):
utils.EnsureDirs([(self._InstanceChrootDir(name),
constants.SECURE_DIR_MODE)])
+ # Automatically enable QMP if version is >= 0.14
+ if (v_major, v_min) >= (0, 14):
+ logging.debug("Enabling QMP")
+ kvm_cmd.extend(["-qmp", "unix:%s,server,nowait" %
+ self._InstanceQmpMonitor(instance.name)])
+
# Configure the network now for starting instances and bridged interfaces,
# during FinalizeMigration for incoming instances' routed interfaces
for nic_seq, nic in enumerate(kvm_nics):
diff --git a/test/ganeti.hypervisor.hv_kvm_unittest.py
b/test/ganeti.hypervisor.hv_kvm_unittest.py
index d9e5e73..4a7ce54 100755
--- a/test/ganeti.hypervisor.hv_kvm_unittest.py
+++ b/test/ganeti.hypervisor.hv_kvm_unittest.py
@@ -21,8 +21,12 @@
"""Script for testing the hypervisor.hv_kvm module"""
+import threading
import unittest
+import socket
+import os
+from ganeti import serializer
from ganeti import constants
from ganeti import compat
from ganeti import objects
@@ -33,6 +37,104 @@ from ganeti.hypervisor import hv_kvm
import testutils
+class QmpStub(threading.Thread):
+ """Stub for a QMP endpoint for a KVM instance
+
+ """
+ _QMP_BANNER_DATA = {"QMP": {"version": {
+ "package": "",
+ "qemu": {"micro": 50, "minor": 13, "major": 0},
+ "capabilities": [],
+ }}}
+ _EMPTY_RESPONSE = {"return": []}
+
+ def __init__(self, socket_filename, test_script):
+ """Creates a QMP stub
+
+ - it checks each received message for JSON validity;
+
+ - it plays back the list of messages given in the responses parameter
+
+ @type filename: string
+ @param filename: filename of the UNIX socket that will be created by this
+ class and used for the communication
+ """
+ threading.Thread.__init__(self)
+ self.socket_filename = socket_filename
+ self.script = test_script
+
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.bind(self.socket_filename)
+ self.socket.listen(1)
+
+ def run(self):
+ conn, addr = self.socket.accept()
+
+ # Send the banner as the first thing
+ conn.send(self.encode_string(self._QMP_BANNER_DATA))
+
+ # Expect qmp_capabilities and return an empty response
+ conn.recv(4096)
+ conn.send(self.encode_string(self._EMPTY_RESPONSE))
+
+ while True:
+ # We ignore the expected message, as the purpose of this object is not
+ # to verify the correctness of the communication but to act as a
+ # partner for the SUT (System Under Test, that is QmpConnection)
+ msg = conn.recv(4096)
+ if not msg:
+ break
+
+ _, response = self.script.pop(0)
+ conn.send(self.encode_string(response))
+ conn.close()
+
+ def encode_string(self, message):
+ return serializer.DumpJson(message, indent = False) + \
+ hv_kvm.QmpConnection._MESSAGE_END_TOKEN
+
+
+class TestQmpMessage(testutils.GanetiTestCase):
+ def testSerialization(self):
+ test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
+ message = hv_kvm.QmpMessage(test_data)
+
+ for k, v in test_data.items():
+ self.failUnless(message[k] == v)
+
+ rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
+ self.failUnless(rebuilt_message == message)
+
+
+class TestQmp(testutils.GanetiTestCase):
+ def testQmp(self):
+ # List of request/response test message exchanges.
+ test_script = [
+ (
+ {"execute": "query_commands", "arguments": []},
+ {"return": [{"name": "quit"}, {"name": "eject"}]},
+ ),
+ ]
+
+ self._executeTestScript(test_script)
+
+ def _executeTestScript(self, script):
+ # Set up the stub
+ socket_filename = self._CreateTempFile()
+ os.remove(socket_filename)
+ qmp_stub = QmpStub(socket_filename, script)
+ qmp_stub.start()
+
+ # Set up the QMP connection
+ qmp_connection = hv_kvm.QmpConnection(socket_filename)
+ qmp_connection.connect()
+
+ # Format the script
+ for message, expected_response in script:
+ response = qmp_connection.Execute(script)
+ self.failUnless(response == hv_kvm.QmpMessage(expected_response))
+
+
class TestConsole(unittest.TestCase):
def _Test(self, instance, hvparams):
cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
--
1.7.3.1