Hello,

this is the (first) patch with the Tracker class implementation based on host plugin test.


It is meant to be used as a base class to implement a helper class to write xml-rpc (api) tests for LDAP based plugins and to replace the Declarative class which is used for
most of the xml-rpc tests at the moment.

For an example usage take a look at the host plugin test.

Cheers,
Milan
>From a0cba2ffaafb7cc44ccfef4f00f047df1f91cf37 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Milan=20Kub=C3=ADk?= <mku...@redhat.com>
Date: Mon, 25 May 2015 16:05:46 +0200
Subject: [PATCH] Abstract the HostTracker class from host plugin test

Implements a base class to help test LDAP based plugins.

The class has been decoupled from the original host plugin test
and moved to separate module ipatests.test_xmlrpc.ldaptracker.

https://fedorahosted.org/freeipa/ticket/5032
---
 ipatests/test_xmlrpc/ldaptracker.py      | 287 +++++++++++++++++++++++++++++++
 ipatests/test_xmlrpc/test_host_plugin.py | 155 +----------------
 2 files changed, 292 insertions(+), 150 deletions(-)
 create mode 100644 ipatests/test_xmlrpc/ldaptracker.py

diff --git a/ipatests/test_xmlrpc/ldaptracker.py b/ipatests/test_xmlrpc/ldaptracker.py
new file mode 100644
index 0000000000000000000000000000000000000000..97412440446a0cc56e283e58fb731a31c4d9458f
--- /dev/null
+++ b/ipatests/test_xmlrpc/ldaptracker.py
@@ -0,0 +1,287 @@
+#
+# Copyright (C) 2015  FreeIPA Contributors see COPYING for license
+#
+
+"""
+Implements a base class to track changes to an LDAP object.
+"""
+
+import functools
+
+from ipalib import api, errors
+from ipapython.dn import DN
+from ipapython.version import API_VERSION
+
+
+class Tracker(object):
+    """Wraps and tracks modifications to a plugin LDAP entry object
+
+    Stores a copy of state of a plugin entry object and allows checking that
+    the state in the database is the same as expected.
+    This allows creating independent tests: the individual tests check
+    that the relevant changes have been made. At the same time
+    the entry doesn't need to be recreated and cleaned up for each test.
+
+    Two attributes are used for tracking: ``exists`` (true if the entry is
+    supposed to exist) and ``attrs`` (a dict of LDAP attributes that are
+    expected to be returned from IPA commands).
+
+    For commonly used operations, there is a helper method, e.g.
+    ``create``, ``update``, or ``find``, that does these steps:
+
+    * ensure the entry exists (or does not exist, for "create")
+    * store the expected modifications
+    * get the IPA command to run, and run it
+    * check that the result matches the expected state
+
+    Tests that require customization of these steps are expected to do them
+    manually, using lower-level methods.
+    Especially the first step (ensure the entry exists) is important for
+    achieving independent tests.
+
+    The Tracker object also stores information about the entry, e.g.
+    ``dn``, ``rdn`` and ``name`` which is derived from DN property.
+
+    To use this class, the programer must subclass it and provide the
+    implementation of following methods:
+
+     * make_*_command   -- implementing the API call for particular plugin
+                           and operation (add, delete, ...)
+                           These methods should use the make_command method
+     * check_* commands -- an assertion for a plugin command (CRUD)
+     * track_create     -- to make an internal representation of the
+                           entry
+
+    Apart from overriding these methods, the subclass must provide the
+    distinguished name of the entry in `self.dn` property.
+
+    It is also required to override the class variables defining the sets
+    of ldap attributes/keys for these operations specific to the plugin
+    being implemented. Take the host plugin test for an example.
+
+    The implementation of these methods is not strictly enforced.
+    A missing method will cause a NotImplementedError during runtime
+    as a result.
+    """
+    retrieve_keys = None
+    retrieve_all_keys = None
+    create_keys = None
+    update_keys = None
+    managedby_keys = None
+    allowedto_keys = None
+
+    _override_me_msg = "This method needs to be overriden in a subclass"
+
+    def __init__(self, default_version=None):
+        self.api = api
+        self.default_version = default_version or API_VERSION
+        self._dn = None
+
+        self.exists = False
+
+    @property
+    def dn(self):
+        """A property containing the distinguished name of the entry."""
+        if not self._dn:
+            raise ValueError('The DN must be set in the init method.')
+        return self._dn
+
+    @dn.setter
+    def dn(self, value):
+        if not isinstance(value, DN):
+            raise ValueError('The value must be an instance of DN.')
+        self._dn = value
+
+    @property
+    def rdn(self):
+        return self.dn[0]
+
+    @property
+    def name(self):
+        """Property holding the name of the entry in LDAP.
+
+        This property is computed in runtime.
+        """
+        return self.rdn.value
+
+    def filter_attrs(self, keys):
+        """Return a dict of expected attrs, filtered by the given keys"""
+        if not self.attrs:
+            raise RuntimeError('The tracker instance has no attributes.')
+        return {k: v for k, v in self.attrs.items() if k in keys}
+
+    def run_command(self, name, *args, **options):
+        """Run the given IPA command
+
+        Logs the command using print for easier debugging
+        """
+        cmd = self.api.Command[name]
+
+        options.setdefault('version', self.default_version)
+
+        args_repr = ', '.join(
+            [repr(a) for a in args] +
+            ['%s=%r' % item for item in options.items()])
+        try:
+            result = cmd(*args, **options)
+        except Exception as e:
+            print 'Ran command: %s(%s): %s: %s' % (cmd, args_repr,
+                                                   type(e).__name__, e)
+            raise
+        else:
+            print 'Ran command: %s(%s): OK' % (cmd, args_repr)
+        return result
+
+    def make_command(self, name, *args, **options):
+        """Make a functools.partial function to run the given command"""
+        return functools.partial(self.run_command, name, *args, **options)
+
+    def make_fixture(self, request):
+        """Make a pytest fixture for this tracker
+
+        The fixture ensures the plugin entry does not exist before
+        and after the tests that use it.
+        """
+        del_command = self.make_delete_command()
+        try:
+            del_command()
+        except errors.NotFound:
+            pass
+
+        def cleanup():
+            existed = self.exists
+            try:
+                del_command()
+            except errors.NotFound:
+                if existed:
+                    raise
+            self.exists = False
+
+        request.addfinalizer(cleanup)
+
+        return self
+
+    def ensure_exists(self):
+        """If the entry does not exist (according to tracker state), create it
+        """
+        if not self.exists:
+            self.create(force=True)
+
+    def ensure_missing(self):
+        """If the entry exists (according to tracker state), delete it
+        """
+        if self.exists:
+            self.delete()
+
+    def make_create_command(self, force=True):
+        """Make function that creates the plugin entry object."""
+        raise NotImplementedError(self._override_me_msg)
+
+    def make_delete_command(self):
+        """Make function that deletes the plugin entry object."""
+        raise NotImplementedError(self._override_me_msg)
+
+    def make_retrieve_command(self, all=False, raw=False):
+        """Make function that retrieves the entry using ${CMD}_show"""
+        raise NotImplementedError(self._override_me_msg)
+
+    def make_find_command(self, *args, **kwargs):
+        """Make function that finds the entry using ${CMD}_find
+
+        Note that the name (or other search terms) needs to be specified
+        in arguments.
+        """
+        raise NotImplementedError(self._override_me_msg)
+
+    def make_update_command(self, updates):
+        """Make function that modifies the entry using ${CMD}_mod"""
+        raise NotImplementedError(self._override_me_msg)
+
+    def create(self, force=True):
+        """Helper function to create an entry and check the result"""
+        self.ensure_missing()
+        self.track_create()
+        command = self.make_create_command(force=force)
+        result = command()
+        self.check_create(result)
+
+    def track_create(self):
+        """Update expected state for host creation
+
+        The method should look similar to the following
+        example of host plugin.
+
+        self.attrs = dict(
+            dn=self.dn,
+            fqdn=[self.fqdn],
+            description=[self.description],
+            ... # all required attributes
+        )
+        self.exists = True
+        """
+        raise NotImplementedError(self._override_me_msg)
+
+    def check_create(self, result):
+        """Check plugin's add command result"""
+        raise NotImplementedError(self._override_me_msg)
+
+    def delete(self):
+        """Helper function to delete a host and check the result"""
+        self.ensure_exists()
+        self.track_delete()
+        command = self.make_delete_command()
+        result = command()
+        self.check_delete(result)
+
+    def track_delete(self):
+        """Update expected state for host deletion"""
+        self.exists = False
+        self.attrs = {}
+
+    def check_delete(self, result):
+        """Check plugin's `del` command result"""
+        raise NotImplementedError(self._override_me_msg)
+
+    def retrieve(self, all=False, raw=False):
+        """Helper function to retrieve an entry and check the result"""
+        self.ensure_exists()
+        command = self.make_retrieve_command(all=all, raw=raw)
+        result = command()
+        self.check_retrieve(result, all=all, raw=raw)
+
+    def check_retrieve(self, result, all=False, raw=False):
+        """Check the plugin's `show` command result"""
+        raise NotImplementedError(self._override_me_msg)
+
+    def find(self, all=False, raw=False):
+        """Helper function to search for this hosts and check the result"""
+        self.ensure_exists()
+        command = self.make_find_command(self.name, all=all, raw=raw)
+        result = command()
+        self.check_find(result, all=all, raw=raw)
+
+    def check_find(self, result, all=False, raw=False):
+        """Check the plugin's `find` command result"""
+        raise NotImplementedError(self._override_me_msg)
+
+    def update(self, updates, expected_updates=None):
+        """Helper function to update this hosts and check the result
+
+        The ``updates`` are used as options to the *_mod command,
+        and the self.attrs is updated with this dict.
+        Additionally, self.attrs is updated with ``expected_updates``.
+        """
+        if expected_updates is None:
+            expected_updates = {}
+
+        self.ensure_exists()
+        command = self.make_update_command(updates)
+        result = command()
+        self.attrs.update(updates)
+        self.attrs.update(expected_updates)
+        self.check_update(result, extra_keys=set(updates.keys()) |
+                                             set(expected_updates.keys()))
+
+    def check_update(self, result, extra_keys=()):
+        """Check the plugin's `find` command result"""
+        raise NotImplementedError(self._override_me_msg)
diff --git a/ipatests/test_xmlrpc/test_host_plugin.py b/ipatests/test_xmlrpc/test_host_plugin.py
index 2b256772ec6a64cff6040fb2eb2b745bebcaecd7..e46e502d2db397be457d193e2802c379e4f6c289 100644
--- a/ipatests/test_xmlrpc/test_host_plugin.py
+++ b/ipatests/test_xmlrpc/test_host_plugin.py
@@ -26,7 +26,6 @@ Test the `ipalib.plugins.host` module.
 import os
 import tempfile
 import base64
-import functools
 
 import pytest
 
@@ -34,6 +33,7 @@ from ipapython import ipautil
 from ipalib import api, errors, x509
 from ipapython.dn import DN
 from ipapython.dnsutil import DNSName
+from ipatests.test_xmlrpc.ldaptracker import Tracker
 from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test,
     fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer,
     fuzzy_hex, raises_exact)
@@ -41,7 +41,6 @@ from ipatests.test_xmlrpc.test_user_plugin import get_group_dn
 from ipatests.test_xmlrpc import objectclasses
 from ipatests.test_xmlrpc.testcert import get_testcert
 from ipatests.util import assert_deepequal
-from ipapython.version import API_VERSION
 
 # Constants DNS integration tests
 # TODO: Use tracker fixtures for zones/records/users/groups
@@ -96,33 +95,12 @@ hostgroup1_dn = DN(('cn',hostgroup1),('cn','hostgroups'),('cn','accounts'),
                     api.env.basedn)
 
 
-class HostTracker(object):
+class HostTracker(Tracker):
     """Wraps and tracks modifications to a Host object
 
-    Stores a copy of state of a Host object, and allows checking that
-    the state in the database is the same as expected.
-    This allows creating independent tests: the individual tests check
-    that the relevant changes have been made. At the same time
-    the Host doesn't heet to be recreated and cleaned up for each test.
+    Implements the helper functions for host plugin.
 
-    Two attributes are used for tracking: ``exists`` (true if the Host is
-    supposed to exist) and ``attrs`` (a dict of LDAP attributes that are
-    expected to be returned from IPA commands).
-
-    For commonly used operations, there is a helper method, e.g.
-    ``create``, ``update``, or ``find``, that does these steps:
-
-    * ensure the Host exists (or does not exist, for "create")
-    * store the expected modifications
-    * get the IPA command to run, and run it
-    * check that the result matches the expected state
-
-    Tests that require customization of these steps are expected to do them
-    manually, using lower-level methods.
-    Especially the first step (ensure the Host exists) is important for
-    achieving independent tests.
-
-    The HostTracker object also stores information about the host, e.g.
+    The HostTracker object stores information about the host, e.g.
     ``fqdn`` and ``dn``.
     """
     retrieve_keys = {
@@ -148,8 +126,7 @@ class HostTracker(object):
     allowedto_keys = retrieve_keys - {'has_keytab', 'has_password'}
 
     def __init__(self, name, fqdn=None, default_version=None):
-        self.api = api
-        self.default_version = default_version or API_VERSION
+        super(HostTracker, self).__init__(default_version=default_version)
 
         self.shortname = name
         if fqdn:
@@ -162,75 +139,6 @@ class HostTracker(object):
         self.description = u'Test host <%s>' % name
         self.location = u'Undisclosed location <%s>' % name
 
-        self.exists = False
-
-    def filter_attrs(self, keys):
-        """Return a dict of expected attrs, filtered by the given keys"""
-        return {k: v for k, v in self.attrs.items() if k in keys}
-
-    def run_command(self, name, *args, **options):
-        """Run the given IPA command
-
-        Logs the command using print for easier debugging
-        """
-        cmd = self.api.Command[name]
-
-        options.setdefault('version', self.default_version)
-
-        args_repr = ', '.join(
-            [repr(a) for a in args] +
-            ['%s=%r' % item for item in options.items()])
-        try:
-            result = cmd(*args, **options)
-        except Exception as e:
-            print 'Ran command: %s(%s): %s: %s' % (cmd, args_repr,
-                                                    type(e).__name__, e)
-            raise
-        else:
-            print 'Ran command: %s(%s): OK' % (cmd, args_repr)
-        return result
-
-    def make_command(self, name, *args, **options):
-        """Make a functools.partial function to run the given command"""
-        return functools.partial(self.run_command, name, *args, **options)
-
-    def make_fixture(self, request):
-        """Make a pytest fixture for this tracker
-
-        The fixture ensures the host does not exist before and after the tests
-        that use it.
-        """
-        del_command = self.make_delete_command()
-        try:
-            del_command()
-        except errors.NotFound:
-            pass
-
-        def cleanup():
-            existed = self.exists
-            try:
-                del_command()
-            except errors.NotFound:
-                if existed:
-                    raise
-            self.exists = False
-
-        request.addfinalizer(cleanup)
-
-        return self
-
-    def ensure_exists(self):
-        """If the host does not exist (according to tracker state), create it
-        """
-        if not self.exists:
-            self.create(force=True)
-
-    def ensure_missing(self):
-        """If the host exists (according to tracker state), delete it
-        """
-        if self.exists:
-            self.delete()
-
     def make_create_command(self, force=True):
         """Make function that creates this host using host_add"""
         return self.make_command('host_add', self.fqdn,
@@ -258,14 +166,6 @@ class HostTracker(object):
         """Make function that modifies the host using host_mod"""
         return self.make_command('host_mod', self.fqdn, **updates)
 
-    def create(self, force=True):
-        """Helper function to create a host and check the result"""
-        self.ensure_missing()
-        self.track_create()
-        command = self.make_create_command(force=force)
-        result = command()
-        self.check_create(result)
-
     def track_create(self):
         """Update expected state for host creation"""
         self.attrs = dict(
@@ -295,19 +195,6 @@ class HostTracker(object):
             result=self.filter_attrs(self.create_keys),
         ), result)
 
-    def delete(self):
-        """Helper function to delete a host and check the result"""
-        self.ensure_exists()
-        self.track_delete()
-        command = self.make_delete_command()
-        result = command()
-        self.check_delete(result)
-
-    def track_delete(self):
-        """Update expected state for host deletion"""
-        self.exists = False
-        self.attrs = {}
-
     def check_delete(self, result):
         """Check `host_del` command result"""
         assert_deepequal(dict(
@@ -316,13 +203,6 @@ class HostTracker(object):
             result=dict(failed=[]),
         ), result)
 
-    def retrieve(self, all=False, raw=False):
-        """Helper function to retrieve a host and check the result"""
-        self.ensure_exists()
-        command = self.make_retrieve_command(all=all, raw=raw)
-        result = command()
-        self.check_retrieve(result, all=all, raw=raw)
-
     def check_retrieve(self, result, all=False, raw=False):
         """Check `host_show` command result"""
         if all:
@@ -335,13 +215,6 @@ class HostTracker(object):
             result=expected,
         ), result)
 
-    def find(self, all=False, raw=False):
-        """Helper function to search for this hosts and check the result"""
-        self.ensure_exists()
-        command = self.make_find_command(self.fqdn, all=all, raw=raw)
-        result = command()
-        self.check_find(result, all=all, raw=raw)
-
     def check_find(self, result, all=False, raw=False):
         """Check `host_find` command result"""
         if all:
@@ -355,24 +228,6 @@ class HostTracker(object):
             result=[expected],
         ), result)
 
-    def update(self, updates, expected_updates=None):
-        """Helper function to update this hosts and check the result
-
-        The ``updates`` are used as options to the *_mod command,
-        and the self.attrs is updated with this dict.
-        Additionally, self.attrs is updated with ``expected_updates``.
-        """
-        if expected_updates is None:
-            expected_updates = {}
-
-        self.ensure_exists()
-        command = self.make_update_command(updates)
-        result = command()
-        self.attrs.update(updates)
-        self.attrs.update(expected_updates)
-        self.check_update(result, extra_keys=set(updates.keys()) |
-                                             set(expected_updates.keys()))
-
     def check_update(self, result, extra_keys=()):
         """Check `host_update` command result"""
         assert_deepequal(dict(
-- 
1.9.3

-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to