Basic implementation of the QMP connection and related tests.
Signed-off-by: Andrea Spadaccini <[email protected]>
---
Makefile.am | 1 +
lib/hypervisor/hv_kvm.py | 157 +++++++++++++++++++++++++++++
test/data/qmp-basic-query.txt | 2 +
test/ganeti.hypervisor.hv_kvm_unittest.py | 109 ++++++++++++++++++++
4 files changed, 269 insertions(+), 0 deletions(-)
create mode 100644 test/data/qmp-basic-query.txt
diff --git a/Makefile.am b/Makefile.am
index 4cd7ff9..8db0549 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -620,6 +620,7 @@ TEST_FILES = \
test/data/ip-addr-show-lo-oneline-ipv6.txt \
test/data/ip-addr-show-lo-oneline.txt \
test/data/ip-addr-show-lo.txt \
+ test/data/qmp-basic-query.txt \
test/data/proc_drbd8.txt \
test/data/proc_drbd80-emptyline.txt \
test/data/proc_drbd83.txt \
diff --git a/lib/hypervisor/hv_kvm.py b/lib/hypervisor/hv_kvm.py
index 04d2327..f012f01 100644
--- a/lib/hypervisor/hv_kvm.py
+++ b/lib/hypervisor/hv_kvm.py
@@ -34,6 +34,7 @@ import pwd
import struct
import fcntl
import shutil
+import socket
from ganeti import utils
from ganeti import constants
@@ -46,6 +47,7 @@ from ganeti.hypervisor import hv_base
from ganeti import netutils
from ganeti.utils import wrapper as utils_wrapper
+import simplejson as json
_KVM_NETWORK_SCRIPT = constants.SYSCONFDIR + "/ganeti/kvm-vif-bridge"
@@ -127,6 +129,149 @@ def _OpenTap(vnet_hdr=True):
return (ifname, tapfd)
+class QmpMessage:
+ """Base class for QMP messages
+ """
+ def __init__(self, data):
+ """Creates a new QMP message based on the passed data.
+
+ """
+ if not isinstance(data, dict):
+ raise TypeError, "QmpMessage must be initialized with a dict"
+
+ self.data = data
+
+ def __getitem__(self, field_name):
+ """Override the dereference operator
+
+ Allows the user of the class to call it as if it was a dict.
+ Returns None if the key is not in the internal dictionary.
+ """
+
+ if field_name in self.data:
+ return self.data[field_name]
+
+ return None
+
+ def __setitem__(self, field_name, field_value):
+ self.data[field_name] = field_value
+
+ @staticmethod
+ def BuildFromJsonString(json_string):
+ # Parse the string
+ data = json.loads(json_string)
+ return QmpMessage(data)
+
+ def __str__(self):
+ return json.dumps(self.data)
+
+ def __eq__(self, other):
+ return self.data == other.data
+
+
+class QmpConnection:
+ """Represents a connection to the QEMU Monitor using the QEMU Monitor
+ Protocol (QMP).
+ """
+ _FIRST_MESSAGE_KEY = "QMP"
+ _EVENT_KEY = "event"
+ _ERROR_KEY = "error"
+ _CAPABILITIES_COMMAND = "qmp_capabilities"
+
+ def __init__(self, monitor_filename):
+ """Instantiates the QmpConnection object
+
+ @type monitor_filename: string
+ @param monitor_filename: the filename of the UNIX raw socket on which the
+ QMP monitor is listening
+ """
+ self.monitor_filename = monitor_filename
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+ def connect(self):
+ """Connects to the QMP monitor
+
+ Connects to the given UNIX socket filename and makes sure that we can
+ actually send and receive data to the kvm instance via QMP.
+ """
+ self.sock.connect(self.monitor_filename)
+ self.sock_file = self.sock.makefile()
+
+ # Check if we receive a correct greeting message from the server
+ # (As per the QEMU Protocol Specification 0.1 - section 2.2)
+ greeting = self._Recv()
+ if not greeting[self._FIRST_MESSAGE_KEY]:
+ raise errors.HypervisorError("kvm: qmp communication error (wrong"
+ " server greeting")
+
+ # Let's put the monitor in command mode using the qmp_capabilities
+ # command, or else no command will be executable.
+ # (As per the QEMU Protocol Specification 0.1 - section 4)
+ response = self.Execute(self._CAPABILITIES_COMMAND)
+ if response[self._ERROR_KEY]:
+ raise errors.HypervisorError("kvm: qmp communication error (impossible"
+ " to execute the qmp_capabilities command)")
+
+ def _Recv(self):
+ """Receives a message from QMP and decodes the received JSON object
+
+ @rtype: QmpMessage
+ @return: the received message
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+ """
+ try:
+ data = self.sock.recv(4096)
+ message = QmpMessage.BuildFromJsonString(data)
+ return message
+ except socket.error:
+ raise errors.HypervisorError("Unable to receive data from KVM using the"
+ " QMP protocol")
+ except Exception:
+ raise errors.ProgrammerError("QMP data serialization error")
+
+ def _Send(self, message):
+ """Encodes and sends a message to KVM using QMP
+
+ @type message: QmpMessage
+ @param message: message to send to KVM
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+ """
+ try:
+ self.sock.send(str(message))
+ except socket.error:
+ raise errors.HypervisorError("Unable to send data to KVM using the QMP"
+ " protocol")
+ except Exception:
+ raise errors.ProgrammerError("QMP data serialization error")
+
+ def Execute(self, command, arguments = None):
+ """Executes a QMP command and returns the response
+
+ @type command: string
+ @param command: the command to execute
+ @type arguments: dict
+ @param arguments: dictionary of arguments to be passed to the command
+ @rtype: dict
+ @return: dictionary representing the received JSON object
+ @raise errors.HypervisorError: when there are communication errors
+ @raise errors.ProgrammerError: when there are data serialization errors
+ """
+ message = QmpMessage({"execute": command, "arguments": arguments})
+ self._Send(message)
+
+ # Events can occur between the sending of the command and the reception
+ # of the response, so whe need to filter messages with the event key.
+ while True:
+ response = self._Recv()
+ if not response[self._EVENT_KEY]:
+ if response[self._ERROR_KEY]:
+ raise errors.ProgrammerError("kvm: got an error from QMP (%s)" %
+ response[self._ERROR_KEY])
+ return response
+
+
class KVMHypervisor(hv_base.BaseHypervisor):
"""KVM hypervisor interface"""
CAN_MIGRATE = True
@@ -325,6 +470,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
"""
return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name)
+ @classmethod
+ def _InstanceQmpMonitor(cls, instance_name):
+ """Returns the instance serial QMP socket name
+
+ """
+ return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name)
+
@staticmethod
def _SocatUnixConsoleParams():
"""Returns the correct parameters for socat
@@ -396,6 +548,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
utils.RemoveFile(pidfile)
utils.RemoveFile(cls._InstanceMonitor(instance_name))
utils.RemoveFile(cls._InstanceSerial(instance_name))
+ utils.RemoveFile(cls._InstanceQmpMonitor(instance_name))
utils.RemoveFile(cls._InstanceKVMRuntime(instance_name))
utils.RemoveFile(cls._InstanceKeymapFile(instance_name))
uid_file = cls._InstanceUidFile(instance_name)
@@ -720,6 +873,10 @@ class KVMHypervisor(hv_base.BaseHypervisor):
else:
kvm_cmd.extend(["-serial", "none"])
+ # TEST: adding QMP socket
+ kvm_cmd.extend(["-qmp", "unix:%s,server,nowait"%
+ self._InstanceQmpMonitor(instance.name)])
+
spice_bind = hvp[constants.HV_KVM_SPICE_BIND]
spice_ip_version = None
if spice_bind:
diff --git a/test/data/qmp-basic-query.txt b/test/data/qmp-basic-query.txt
new file mode 100644
index 0000000..3780b22
--- /dev/null
+++ b/test/data/qmp-basic-query.txt
@@ -0,0 +1,2 @@
+{"execute": "query-commands", "arguments": []}
+{"return": [{"name": "quit"}, {"name": "eject"}, {"name": "change"}, {"name":
"screendump"}, {"name": "stop"}, {"name": "cont"}, {"name": "system_reset"},
{"name": "system_powerdown"}, {"name": "device_add"}, {"name": "device_del"},
{"name": "cpu"}, {"name": "memsave"}, {"name": "pmemsave"}, {"name":
"migrate"}, {"name": "migrate_cancel"}, {"name": "migrate_set_speed"}, {"name":
"client_migrate_info"}, {"name": "migrate_set_downtime"}, {"name":
"netdev_add"}, {"name": "netdev_del"}, {"name": "block_resize"}, {"name":
"balloon"}, {"name": "set_link"}, {"name": "getfd"}, {"name": "closefd"},
{"name": "block_passwd"}, {"name": "set_password"}, {"name":
"expire_password"}, {"name": "qmp_capabilities"}, {"name":
"human-monitor-command"}, {"name": "query-version"}, {"name":
"query-commands"}, {"name": "query-chardev"}, {"name": "query-block"}, {"name":
"query-blockstats"}, {"name": "query-cpus"}, {"name": "query-pci"}]}
diff --git a/test/ganeti.hypervisor.hv_kvm_unittest.py
b/test/ganeti.hypervisor.hv_kvm_unittest.py
index d9e5e73..27262a3 100755
--- a/test/ganeti.hypervisor.hv_kvm_unittest.py
+++ b/test/ganeti.hypervisor.hv_kvm_unittest.py
@@ -21,7 +21,12 @@
"""Script for testing the hypervisor.hv_kvm module"""
+import threading
import unittest
+import socket
+import os
+
+import simplejson as json
from ganeti import constants
from ganeti import compat
@@ -33,6 +38,110 @@ from ganeti.hypervisor import hv_kvm
import testutils
+class QmpStub(threading.Thread):
+ """Stub for a QMP endpoint for a KVM instance
+
+ """
+ _QMP_BANNER = ('{"QMP": {"version": {"qemu":'
+ '{"micro": 50, "minor": 13, "major": 0},'
+ '"package": "", "capabilities": []}'
+ '}}')
+
+ def __init__(self, socket_filename, script_filename):
+ """Creates a QMP stub
+
+ - it checks each received message for JSON validity;
+
+ - it plays back the list of messages given in the responses parameter
+
+ @type filename: string
+ @param filename: filename of the UNIX socket that will be created by this
+ class and used for the communication
+ """
+ threading.Thread.__init__(self)
+ self.socket_filename = socket_filename
+ self.script = self.ReadScript(script_filename)
+
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.socket.bind(self.socket_filename)
+ self.socket.listen(1)
+
+ def run(self):
+ conn, addr = self.socket.accept()
+
+ # Send the banner as the first thing
+ conn.send(self._QMP_BANNER)
+
+ # Expect qmp_capabilities and return an empty response
+ conn.recv(4096)
+ empty_response = {"return": []}
+ conn.send(json.dumps(empty_response))
+
+ while True:
+ # We ignore the expected message, as the purpose of this object is not
+ # to verify the correctness of the communication but to act as a
+ # partner for the SUT (System Under Test, that is QmpConnection)
+ msg = conn.recv(4096)
+ if not msg:
+ break
+
+ _, response = self.script.pop(0)
+ conn.send(json.dumps(response))
+ conn.close()
+
+ @staticmethod
+ def ReadScript(script_filename):
+ test_data = testutils.GanetiTestCase._ReadTestData(script_filename)
+ script = [json.loads(line) for line in test_data.splitlines()]
+
+ # Check that the provided script has a response for each expected message
+ assert len(script) % 2 == 0
+
+ # Couple together request and responses
+ script = [(script[i], script[i + 1]) for i in range(0, len(script), 2)]
+ return script
+
+
+class TestQmpMessage(testutils.GanetiTestCase):
+ def testSerialization(self):
+ test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
+ message = hv_kvm.QmpMessage(test_data)
+
+ for k, v in test_data.items():
+ self.failUnless(message[k] == v)
+
+ rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
+ self.failUnless(rebuilt_message == message)
+
+
+class TestQmp(testutils.GanetiTestCase):
+ def testQmp(self):
+ script_files = [
+ 'qmp-basic-query.txt',
+ ]
+
+ for script in script_files:
+ self._executeTestScript(script)
+
+ def _executeTestScript(self, script_filename):
+ # Read the script and check that it is in the correct form (n*2 rows, each
+ # couple of rows is a request/response cycle)
+ script = QmpStub.ReadScript(script_filename)
+
+ # Set up the stub
+ socket_filename = self._CreateTempFile()
+ os.remove(socket_filename)
+ qmp_stub = QmpStub(socket_filename, script_filename)
+ qmp_stub.start()
+
+ # Set up the QMP connection
+ qmp_connection = hv_kvm.QmpConnection(socket_filename)
+ qmp_connection.connect()
+ for message, expected_response in script:
+ response = qmp_connection.Execute(script)
+ self.failUnless(response == hv_kvm.QmpMessage(expected_response))
+
+
class TestConsole(unittest.TestCase):
def _Test(self, instance, hvparams):
cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
--
1.7.3.1