Hi,

this patch adds initial support for custom LDAP entry objects, as described in <http://freeipa.org/page/V3/LDAP_code>.

The LDAPEntry class implements the mapping object. The current version works like a dict, plus it has a DN property and validates and normalizes attribute names (there is no case preservation yet).

The LDAPEntryCompat class provides compatibility with old code that uses (dn, attrs) tuples. The reason why this is a separate class is that our code currently has 2 contradicting requirements on the entry object: tuple unpacking must work with it (i.e. iter(entry) yields dn and attribute dict), but it also must work as an argument to dict constructor (i.e. iter(entry) yields attribute names). This class will be removed once our code is converted to use LDAPEntry.

Honza

--
Jan Cholasta
>From 55d14475f58fe8e631c0cec63afa929b63762c74 Mon Sep 17 00:00:00 2001
From: Jan Cholasta <jchol...@redhat.com>
Date: Wed, 16 Jan 2013 14:14:58 +0100
Subject: [PATCH] Add custom mapping object for LDAP entry data.

---
 ipaserver/plugins/ldap2.py        | 93 ++++++++++++++++++++++++++++++++++++---
 tests/test_ipaserver/test_ldap.py | 37 +++++++++++++++-
 2 files changed, 123 insertions(+), 7 deletions(-)

diff --git a/ipaserver/plugins/ldap2.py b/ipaserver/plugins/ldap2.py
index 8e8e160..c3445a0 100644
--- a/ipaserver/plugins/ldap2.py
+++ b/ipaserver/plugins/ldap2.py
@@ -47,7 +47,6 @@ import ldap.filter as _ldap_filter
 import ldap.sasl as _ldap_sasl
 from ipapython.dn import DN, RDN
 from ipapython.ipautil import CIDict
-from collections import namedtuple
 from ipalib.errors import NetworkError, DatabaseError
 
 
@@ -75,6 +74,79 @@ from ipalib.request import context
 
 _debug_log_ldap = False
 
+class LDAPEntry(dict):
+    __slots__ = ('_dn',)
+
+    def __init__(self, _dn=None, _obj=None, **kwargs):
+        if isinstance(_dn, LDAPEntry):
+            assert _obj is None
+            _obj = _dn
+            self._dn = DN(_obj._dn)    #pylint: disable=E1103
+        else:
+            assert isinstance(_dn, DN)
+            if _obj is None:
+                _obj = {}
+            self._dn = _dn
+
+        super(LDAPEntry, self).__init__(self._init_iter(_obj, **kwargs))
+
+    @property
+    def dn(self):
+        return self._dn
+
+    @dn.setter
+    def dn(self, value):
+        assert isinstance(value, DN)
+        self._dn = value
+
+    def _attr_name(self, name):
+        if not isinstance(name, basestring):
+            raise TypeError(
+                "attribute name must be unicode or str, got %s object %r" % (
+                    name.__class__.__name__, name))
+        if isinstance(name, str):
+            name = name.decode('ascii')
+        return name.lower()
+
+    def _init_iter(self, _obj, **kwargs):
+        _obj = dict(_obj, **kwargs)
+        for (k, v) in _obj.iteritems():
+            yield (self._attr_name(k), v)
+
+    def __repr__(self):
+        dict_repr = super(LDAPEntry, self).__repr__()
+        return '%s(%s, %s)' % (type(self).__name__, repr(self._dn), dict_repr)
+
+    def copy(self):
+        return LDAPEntry(self)
+
+    def __setitem__(self, name, value):
+        super(LDAPEntry, self).__setitem__(self._attr_name(name), value)
+
+    def setdefault(self, name, default):
+        return super(LDAPEntry, self).setdefault(self._attr_name(name), default)
+
+    def update(self, _obj={}, **kwargs):
+        super(LDAPEntry, self).update(self._init_iter(_obj, **kwargs))
+
+    def __getitem__(self, name):
+        return super(LDAPEntry, self).__getitem__(self._attr_name(name))
+
+    def get(self, name, default=None):
+        return super(LDAPEntry, self).get(self._attr_name(name), default)
+
+    def __delitem__(self, name):
+        super(LDAPEntry, self).__delitem__(self._attr_name(name))
+
+    def pop(self, name, *default):
+        return super(LDAPEntry, self).pop(self._attr_name(name), *default)
+
+    def __contains__(self, name):
+        return super(LDAPEntry, self).__contains__(self._attr_name(name))
+
+    def has_key(self, name):
+        return super(LDAPEntry, self).has_key(self._attr_name(name))
+
 # Make python-ldap tuple style result compatible with Entry and Entity
 # objects by allowing access to the dn (tuple index 0) via the 'dn'
 # attribute name and the attr dict (tuple index 1) via the 'data'
@@ -82,7 +154,17 @@ _debug_log_ldap = False
 # r = result[0]
 # r[0] == r.dn
 # r[1] == r.data
-LDAPEntry = namedtuple('LDAPEntry', ['dn', 'data'])
+class LDAPEntryCompat(tuple):
+    def __new__(cls, entry):
+        return super(LDAPEntryCompat, cls).__new__(cls, (entry.dn, entry))
+
+    @property
+    def dn(self):
+        return self[0]
+
+    @property
+    def data(self):
+        return self[1]
 
 
 # Group Member types
@@ -454,14 +536,13 @@ class IPASimpleLDAPObject(object):
             original_dn = dn_tuple[0]
             original_attrs = dn_tuple[1]
 
-            ipa_dn = DN(original_dn)
-            ipa_attrs = dict()
+            ipa_entry = LDAPEntry(DN(original_dn))
 
             for attr, original_values in original_attrs.items():
                 target_type = self._SYNTAX_MAPPING.get(self.get_syntax(attr), unicode_from_utf8)
-                ipa_attrs[attr.lower()] = self.convert_value_list(attr, target_type, original_values)
+                ipa_entry[attr] = self.convert_value_list(attr, target_type, original_values)
 
-            ipa_result.append(LDAPEntry(ipa_dn, ipa_attrs))
+            ipa_result.append(LDAPEntryCompat(ipa_entry))
 
         if _debug_log_ldap:
             self.debug('ldap.result: %s', ipa_result)
diff --git a/tests/test_ipaserver/test_ldap.py b/tests/test_ipaserver/test_ldap.py
index cd3ba3c..c18739b 100644
--- a/tests/test_ipaserver/test_ldap.py
+++ b/tests/test_ipaserver/test_ldap.py
@@ -27,7 +27,7 @@
 
 import nose
 import os
-from ipaserver.plugins.ldap2 import ldap2
+from ipaserver.plugins.ldap2 import ldap2, LDAPEntry, LDAPEntryCompat
 from ipalib.plugins.service import service, service_show
 from ipalib.plugins.host import host
 import nss.nss as nss
@@ -145,3 +145,38 @@ class test_ldap(object):
         cert = cert[0]
         serial = unicode(x509.get_serial_number(cert, x509.DER))
         assert serial is not None
+
+    def test_entry(self):
+        """
+        Test the LDAPEntry class
+        """
+        cn1 = [u'test1']
+        cn2 = [u'test2']
+        dn1 = DN(('cn', cn1[0]))
+        dn2 = DN(('cn', cn2[0]))
+
+        e = LDAPEntry(dn1, cn=cn1)
+        assert e.dn is dn1
+        assert 'CN' in e
+        assert e['CN'] is cn1
+        assert e['CN'] is e[u'cn']
+
+        c = LDAPEntryCompat(e)
+        assert c.dn is e.dn
+        assert c.data is e
+        assert c[0] is e.dn
+        assert c[1] is e
+
+        e.dn = dn2
+        assert e.dn is dn2
+        assert c.dn is dn1
+        assert c.data is e
+
+        e['cn'] = cn2
+        assert 'CN' in e
+        assert e['CN'] is cn2
+        assert e['CN'] is e[u'cn']
+
+        del e['CN']
+        assert 'CN' not in e
+        assert u'cn' not in e
-- 
1.8.1

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to