Basic implementation of the QMP connection and related tests.

Signed-off-by: Andrea Spadaccini <[email protected]>
---
 Makefile.am                               |    1 +
 lib/hypervisor/hv_kvm.py                  |  157 +++++++++++++++++++++++++++++
 test/data/qmp-basic-query.txt             |    2 +
 test/ganeti.hypervisor.hv_kvm_unittest.py |  109 ++++++++++++++++++++
 4 files changed, 269 insertions(+), 0 deletions(-)
 create mode 100644 test/data/qmp-basic-query.txt

diff --git a/Makefile.am b/Makefile.am
index 4cd7ff9..8db0549 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -620,6 +620,7 @@ TEST_FILES = \
        test/data/ip-addr-show-lo-oneline-ipv6.txt \
        test/data/ip-addr-show-lo-oneline.txt \
        test/data/ip-addr-show-lo.txt \
+       test/data/qmp-basic-query.txt \
        test/data/proc_drbd8.txt \
        test/data/proc_drbd80-emptyline.txt \
        test/data/proc_drbd83.txt \
diff --git a/lib/hypervisor/hv_kvm.py b/lib/hypervisor/hv_kvm.py
index 04d2327..f012f01 100644
--- a/lib/hypervisor/hv_kvm.py
+++ b/lib/hypervisor/hv_kvm.py
@@ -34,6 +34,7 @@ import pwd
 import struct
 import fcntl
 import shutil
+import socket
 
 from ganeti import utils
 from ganeti import constants
@@ -46,6 +47,7 @@ from ganeti.hypervisor import hv_base
 from ganeti import netutils
 from ganeti.utils import wrapper as utils_wrapper
 
+import simplejson as json
 
 _KVM_NETWORK_SCRIPT = constants.SYSCONFDIR + "/ganeti/kvm-vif-bridge"
 
@@ -127,6 +129,149 @@ def _OpenTap(vnet_hdr=True):
   return (ifname, tapfd)
 
 
+class QmpMessage:
+  """Base class for QMP messages
+  """
+  def __init__(self, data):
+    """Creates a new QMP message based on the passed data.
+
+    """
+    if not isinstance(data, dict):
+      raise TypeError, "QmpMessage must be initialized with a dict"
+
+    self.data = data
+
+  def __getitem__(self, field_name):
+    """Override the dereference operator
+
+    Allows the user of the class to call it as if it was a dict.
+    Returns None if the key is not in the internal dictionary.
+    """
+
+    if field_name in self.data:
+      return self.data[field_name]
+
+    return None
+
+  def __setitem__(self, field_name, field_value):
+    self.data[field_name] = field_value
+
+  @staticmethod
+  def BuildFromJsonString(json_string):
+    # Parse the string
+    data = json.loads(json_string)
+    return QmpMessage(data)
+
+  def __str__(self):
+    return json.dumps(self.data)
+
+  def __eq__(self, other):
+    return self.data == other.data
+
+
+class QmpConnection:
+  """Represents a connection to the QEMU Monitor using the QEMU Monitor
+  Protocol (QMP).
+  """
+  _FIRST_MESSAGE_KEY = "QMP"
+  _EVENT_KEY = "event"
+  _ERROR_KEY = "error"
+  _CAPABILITIES_COMMAND = "qmp_capabilities"
+
+  def __init__(self, monitor_filename):
+    """Instantiates the QmpConnection object
+
+    @type monitor_filename: string
+    @param monitor_filename: the filename of the UNIX raw socket on which the
+                             QMP monitor is listening
+    """
+    self.monitor_filename = monitor_filename
+    self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+  def connect(self):
+    """Connects to the QMP monitor
+
+    Connects to the given UNIX socket filename and makes sure that we can
+    actually send and receive data to the kvm instance via QMP.
+    """
+    self.sock.connect(self.monitor_filename)
+    self.sock_file = self.sock.makefile()
+
+    # Check if we receive a correct greeting message from the server
+    # (As per the QEMU Protocol Specification 0.1 - section 2.2)
+    greeting = self._Recv()
+    if not greeting[self._FIRST_MESSAGE_KEY]:
+      raise errors.HypervisorError("kvm: qmp communication error (wrong"
+                                   " server greeting")
+
+    # Let's put the monitor in command mode using the qmp_capabilities
+    # command, or else no command will be executable.
+    # (As per the QEMU Protocol Specification 0.1 - section 4)
+    response = self.Execute(self._CAPABILITIES_COMMAND)
+    if response[self._ERROR_KEY]:
+      raise errors.HypervisorError("kvm: qmp communication error (impossible"
+                                   " to execute the qmp_capabilities command)")
+
+  def _Recv(self):
+    """Receives a message from QMP and decodes the received JSON object
+
+    @rtype: QmpMessage
+    @return: the received message
+    @raise errors.HypervisorError: when there are communication errors
+    @raise errors.ProgrammerError: when there are data serialization errors
+    """
+    try:
+      data = self.sock.recv(4096)
+      message = QmpMessage.BuildFromJsonString(data)
+      return message
+    except socket.error:
+      raise errors.HypervisorError("Unable to receive data from KVM using the"
+                                   " QMP protocol")
+    except Exception:
+      raise errors.ProgrammerError("QMP data serialization error")
+
+  def _Send(self, message):
+    """Encodes and sends a message to KVM using QMP
+
+    @type message: QmpMessage
+    @param message: message to send to KVM
+    @raise errors.HypervisorError: when there are communication errors
+    @raise errors.ProgrammerError: when there are data serialization errors
+    """
+    try:
+      self.sock.send(str(message))
+    except socket.error:
+      raise errors.HypervisorError("Unable to send data to KVM using the QMP"
+                                   " protocol")
+    except Exception:
+      raise errors.ProgrammerError("QMP data serialization error")
+
+  def Execute(self, command, arguments = None):
+    """Executes a QMP command and returns the response
+
+    @type command: string
+    @param command: the command to execute
+    @type arguments: dict
+    @param arguments: dictionary of arguments to be passed to the command
+    @rtype: dict
+    @return: dictionary representing the received JSON object
+    @raise errors.HypervisorError: when there are communication errors
+    @raise errors.ProgrammerError: when there are data serialization errors
+    """
+    message = QmpMessage({"execute": command, "arguments": arguments})
+    self._Send(message)
+
+    # Events can occur between the sending of the command and the reception
+    # of the response, so whe need to filter messages with the event key.
+    while True:
+      response = self._Recv()
+      if not response[self._EVENT_KEY]:
+        if response[self._ERROR_KEY]:
+          raise errors.ProgrammerError("kvm: got an error from QMP (%s)" %
+                                       response[self._ERROR_KEY])
+        return response
+
+
 class KVMHypervisor(hv_base.BaseHypervisor):
   """KVM hypervisor interface"""
   CAN_MIGRATE = True
@@ -325,6 +470,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     """
     return utils.PathJoin(cls._CTRL_DIR, "%s.serial" % instance_name)
 
+  @classmethod
+  def _InstanceQmpMonitor(cls, instance_name):
+    """Returns the instance serial QMP socket name
+
+    """
+    return utils.PathJoin(cls._CTRL_DIR, "%s.qmp" % instance_name)
+
   @staticmethod
   def _SocatUnixConsoleParams():
     """Returns the correct parameters for socat
@@ -396,6 +548,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     utils.RemoveFile(pidfile)
     utils.RemoveFile(cls._InstanceMonitor(instance_name))
     utils.RemoveFile(cls._InstanceSerial(instance_name))
+    utils.RemoveFile(cls._InstanceQmpMonitor(instance_name))
     utils.RemoveFile(cls._InstanceKVMRuntime(instance_name))
     utils.RemoveFile(cls._InstanceKeymapFile(instance_name))
     uid_file = cls._InstanceUidFile(instance_name)
@@ -720,6 +873,10 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     else:
       kvm_cmd.extend(["-serial", "none"])
 
+    # TEST: adding QMP socket
+    kvm_cmd.extend(["-qmp", "unix:%s,server,nowait"%
+                    self._InstanceQmpMonitor(instance.name)])
+
     spice_bind = hvp[constants.HV_KVM_SPICE_BIND]
     spice_ip_version = None
     if spice_bind:
diff --git a/test/data/qmp-basic-query.txt b/test/data/qmp-basic-query.txt
new file mode 100644
index 0000000..3780b22
--- /dev/null
+++ b/test/data/qmp-basic-query.txt
@@ -0,0 +1,2 @@
+{"execute": "query-commands", "arguments": []}
+{"return": [{"name": "quit"}, {"name": "eject"}, {"name": "change"}, {"name": 
"screendump"}, {"name": "stop"}, {"name": "cont"}, {"name": "system_reset"}, 
{"name": "system_powerdown"}, {"name": "device_add"}, {"name": "device_del"}, 
{"name": "cpu"}, {"name": "memsave"}, {"name": "pmemsave"}, {"name": 
"migrate"}, {"name": "migrate_cancel"}, {"name": "migrate_set_speed"}, {"name": 
"client_migrate_info"}, {"name": "migrate_set_downtime"}, {"name": 
"netdev_add"}, {"name": "netdev_del"}, {"name": "block_resize"}, {"name": 
"balloon"}, {"name": "set_link"}, {"name": "getfd"}, {"name": "closefd"}, 
{"name": "block_passwd"}, {"name": "set_password"}, {"name": 
"expire_password"}, {"name": "qmp_capabilities"}, {"name": 
"human-monitor-command"}, {"name": "query-version"}, {"name": 
"query-commands"}, {"name": "query-chardev"}, {"name": "query-block"}, {"name": 
"query-blockstats"}, {"name": "query-cpus"}, {"name": "query-pci"}]}
diff --git a/test/ganeti.hypervisor.hv_kvm_unittest.py 
b/test/ganeti.hypervisor.hv_kvm_unittest.py
index d9e5e73..27262a3 100755
--- a/test/ganeti.hypervisor.hv_kvm_unittest.py
+++ b/test/ganeti.hypervisor.hv_kvm_unittest.py
@@ -21,7 +21,12 @@
 
 """Script for testing the hypervisor.hv_kvm module"""
 
+import threading
 import unittest
+import socket
+import os
+
+import simplejson as json
 
 from ganeti import constants
 from ganeti import compat
@@ -33,6 +38,110 @@ from ganeti.hypervisor import hv_kvm
 import testutils
 
 
+class QmpStub(threading.Thread):
+  """Stub for a QMP endpoint for a KVM instance
+
+  """
+  _QMP_BANNER = ('{"QMP": {"version": {"qemu":'
+                 '{"micro": 50, "minor": 13, "major": 0},'
+                 '"package": "", "capabilities": []}'
+                 '}}')
+
+  def __init__(self, socket_filename, script_filename):
+    """Creates a QMP stub
+
+    - it checks each received message for JSON validity;
+
+    - it plays back the list of messages given in the responses parameter
+
+    @type filename: string
+    @param filename: filename of the UNIX socket that will be created by this
+                     class and used for the communication
+    """
+    threading.Thread.__init__(self)
+    self.socket_filename = socket_filename
+    self.script = self.ReadScript(script_filename)
+
+    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    self.socket.bind(self.socket_filename)
+    self.socket.listen(1)
+
+  def run(self):
+    conn, addr = self.socket.accept()
+
+    # Send the banner as the first thing
+    conn.send(self._QMP_BANNER)
+
+    # Expect qmp_capabilities and return an empty response
+    conn.recv(4096)
+    empty_response = {"return": []}
+    conn.send(json.dumps(empty_response))
+
+    while True:
+      # We ignore the expected message, as the purpose of this object is not
+      # to verify the correctness of the communication but to act as a
+      # partner for the SUT (System Under Test, that is QmpConnection)
+      msg = conn.recv(4096)
+      if not msg:
+        break
+
+      _, response = self.script.pop(0)
+      conn.send(json.dumps(response))
+    conn.close()
+
+  @staticmethod
+  def ReadScript(script_filename):
+    test_data = testutils.GanetiTestCase._ReadTestData(script_filename)
+    script = [json.loads(line) for line in test_data.splitlines()]
+
+    # Check that the provided script has a response for each expected message
+    assert len(script) % 2 == 0
+
+    # Couple together request and responses
+    script = [(script[i], script[i + 1]) for i in range(0, len(script), 2)]
+    return script
+
+
+class TestQmpMessage(testutils.GanetiTestCase):
+  def testSerialization(self):
+    test_data = {"execute": "command", "arguments": ["a", "b", "c"]}
+    message = hv_kvm.QmpMessage(test_data)
+
+    for k, v in test_data.items():
+      self.failUnless(message[k] == v)
+
+    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(str(message))
+    self.failUnless(rebuilt_message == message)
+
+
+class TestQmp(testutils.GanetiTestCase):
+  def testQmp(self):
+    script_files = [
+      'qmp-basic-query.txt',
+    ]
+
+    for script in script_files:
+      self._executeTestScript(script)
+
+  def _executeTestScript(self, script_filename):
+    # Read the script and check that it is in the correct form (n*2 rows, each
+    # couple of rows is a request/response cycle)
+    script = QmpStub.ReadScript(script_filename)
+
+    # Set up the stub
+    socket_filename = self._CreateTempFile()
+    os.remove(socket_filename)
+    qmp_stub = QmpStub(socket_filename, script_filename)
+    qmp_stub.start()
+
+    # Set up the QMP connection
+    qmp_connection = hv_kvm.QmpConnection(socket_filename)
+    qmp_connection.connect()
+    for message, expected_response in script:
+      response = qmp_connection.Execute(script)
+      self.failUnless(response == hv_kvm.QmpMessage(expected_response))
+
+
 class TestConsole(unittest.TestCase):
   def _Test(self, instance, hvparams):
     cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, hvparams, {})
-- 
1.7.3.1

Reply via email to