Attached is a new version which fixes most of the issues found.

Comments below.

On Mon, 2015-01-12 at 15:53 +0100, Petr Viktorin wrote:
> On 01/06/2015 03:26 AM, Nathaniel McCallum wrote:
> > On Thu, 2014-11-20 at 11:13 -0500, Nathaniel McCallum wrote:
> >> >This tests the general workflow for OTP including most possible
> >> >token combinations. This includes 5872 tests. Further optimization
> >> >is possible to reduce the number of duplicate tests run.
> >> >
> >> >Things not yet tested:
> >> >* ipa-kdb
> >> >* ipa-otpd
> >> >* otptoken-sync
> >> >* RADIUS proxy
> >> >* token self-management
> >> >* type specific attributes
> > Attached is the latest iteration of the OTP test work. This now includes
> > all major cases except token self-management and RADIUS proxy (which
> > will come in its own patch). Token self-management is held up by the
> > fact that I can't get alternate ccaches to work with the API. I have
> > tried kinit-ing to an independent ccache and exporting KRB5CCNAME, but
> > this doesn't work for some reason I can't figure out.
> >
> > I ended up creating my own fixture mechanism. I'm not in love with it,
> > but it is simple and at least gets the scoping correct. It also
> > generates individual tests for each parameterized state, so the output
> > is both correct and obvious.
> >
> > I also implemented OTP myself. This isn't much code, but pyotp has a
> > major bug and is dead upstream. I'd like to migrate to
> > python-cryptography when it lands as a dependency of FreeIPA. But due to
> > timing issues, we can't land it now. This will be a small patch in the
> > future.
> >
> > Even with the caveats above, I feel like the test coverage provided by
> > this test is worth review/merge. As a rough estimate, I think this is
> > about 70% code coverage. Of the remaining coverage, I see:
> > * RADIUS proxy - 10%
> > * token self-management - 10%
> > * misc testable - 5%
> > * misc untestable - 5%
> >
> > All tests in this patch succeed on 4.1.2.
> 
> It doesn't work for me on the 4.2 branch, see the attached errors. Not 
> sure what I'm doing wrong.

This doesn't work at all on the 4.2 branch. This is because nose, like
the upstream test harnesses, executes yielded tests as they are yielded.
However, pytest does not. It collects all the yielded tests into a
single bundle and then runs them after the state of their fixtures is
already destroyed. This complete breaks the fixturize() decorator.

Why are we using pytest again? It may offer nifty features, but it sucks
at integration testing.

> See some comments andnitpicks inline.
> 
> 
> >
> > freeipa-npmccallum-0081.1-Add-initial-tests-for-OTP.patch
> >
> >
> >  From c7b01ea4415db3847110ffe51a9bb5193072d1a8 Mon Sep 17 00:00:00 2001
> > From: Nathaniel McCallum<npmccal...@redhat.com>
> > Date: Thu, 20 Nov 2014 11:02:00 -0500
> > Subject: [PATCH] Add initial tests for OTP
> >
> > This tests the general workflow for OTP including most possible
> > token combinations. It includes tests for all token options,
> > enablement scenarios and token synchronization.
> > ---
> >   ipatests/test_xmlrpc/test_otptoken_plugin.py | 536 
> > +++++++++++++++++++++++++++
> >   1 file changed, 536 insertions(+)
> >   create mode 100644 ipatests/test_xmlrpc/test_otptoken_plugin.py
> >
> > diff --git a/ipatests/test_xmlrpc/test_otptoken_plugin.py 
> > b/ipatests/test_xmlrpc/test_otptoken_plugin.py
> > new file mode 100644
> > index 
> > 0000000000000000000000000000000000000000..7a27d720cb9789c648e45aece1ebd0b6b751e3d8
> > --- /dev/null
> > +++ b/ipatests/test_xmlrpc/test_otptoken_plugin.py
> > @@ -0,0 +1,536 @@
> > +# Authors:
> > +#   Nathaniel McCallum<npmccal...@redhat.com>
> > +#
> > +# Copyright (C) 2014  Red Hat
> > +# see file 'COPYING' for use and warranty information
> > +#
> > +# This program is free software; you can redistribute it and/or modify
> > +# it under the terms of the GNU General Public License as published by
> > +# the Free Software Foundation, either version 3 of the License, or
> > +# (at your option) any later version.
> > +#
> > +# This program is distributed in the hope that it will be useful,
> > +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> > +# GNU General Public License for more details.
> > +#
> > +# You should have received a copy of the GNU General Public License
> > +# along with this program.  If not, see<http://www.gnu.org/licenses/>.
> > +
> > +"""
> > +Test the `ipalib/plugins/otptoken.py` module.
> > +"""
> > +
> > +import abc
> > +import base64
> > +import hashlib
> > +import hmac
> > +import os
> > +import unittest
> > +import urlparse
> > +import uuid
> > +import random
> > +import string
> > +import struct
> > +import subprocess
> > +import time
> > +import types
> > +
> > +from datetime import datetime, timedelta
> > +
> > +from ipalib import api
> > +from xmlrpc_test import XMLRPC_test, Declarative
> 
> ^ unused
> 
> > +from ipatests.util import assert_deepequal, raises
> 
> I recommend nose.tools.assert_raises rather than ipatests.util.raises -- 
> it's easier to debug if an unexpected error occurs.

Fixed.

> > +from ipapython.version import API_VERSION
> > +from ipapython.dn import DN
> > +import ldap
> > +
> > +
> > +class Fixture(object):
> > +    __metaclass__ = abc.ABCMeta
> > +
> > +    @property
> > +    def context(self):
> > +        try:
> > +            return self.__fixtures
> > +        except AttributeError:
> > +            return tuple()
> > +
> > +    @context.setter
> > +    def context(self, value):
> > +        self.__fixtures = value
> > +
> > +    @abc.abstractmethod
> > +    def __enter__(self):
> > +        raise NotImplementedError()
> > +
> > +    @abc.abstractmethod
> > +    def __exit__(self, type, value, traceback):
> > +        raise NotImplementedError()
> > +
> > +    def __repr__(self):
> > +        return '%s()' % self.__class__.__name__
> 
> It seems that this class could be written as:
> 
>      class Fixture(object):
>          context = ()
> 
> and all the rest except __repr__ is unnecessary.
> For documenting what subclasses should add/override, a mention in a 
> docstring might be more helpful than ABC stubs.

Fixed.

> > +def fixturize(*args):
> > +    def outer(func):
> > +        def inner(s, fixtures=args, ctx=()):
> > +            try:
> > +                fixture = fixtures[0]
> > +            except IndexError:
> > +                yield (lambda *a: func(s, *a),) + ctx
> > +            else:
> > +                for f in fixture if type(fixture) is tuple else (fixture,):
> > +                    if isinstance(f, Fixture):
> > +                        f.context = ctx
> > +
> > +                    if hasattr(f, "__enter__"):
> > +                        with f as tmp:
> > +                            for test in inner(s, fixtures[1:], ctx + 
> > (tmp,)):
> > +                                yield test
> > +                    else:
> > +                        for test in inner(s, fixtures[1:], ctx + (f,)):
> > +                            yield test
> > +
> > +        inner.__name__ = func.__name__
> > +        return inner
> > +    return outer
> 
> Well, that works. A docstring would be really nice though.

Fixed, with extensive documentation.

> > +def cmd(cmd, *args, **kwargs):
> > +    return api.Command[cmd](*args, version=API_VERSION, **kwargs)
> 
> This requires rpcclient to be connected. See the XMLRPC_test class for 
> the necessary setup.

Fixed.

> > +class AuthenticationError(Exception):
> > +    pass
> > +
> > +
> > +class Login(object):
> > +    def __repr__(self):
> > +        return '%s()' % self.__class__.__name__
> > +
> > +    def __init__(self):
> > +        self.__fails = 0
> > +
> > +    def __call__(self, uid, pwd):
> > +        try:
> > +            self.login(uid, pwd)
> > +            self.__fails = 0
> > +        except:
> > +            self.__fails += 1
> > +            if self.__fails > 2:
> > +                cmd('user_unlock', uid)
> > +                self.__fails = 0
> > +            raise
> 
> I'm not sure what you're doing here.

Documented. I'm keeping the account from getting locked when we test
authentication failures.

> > +
> > +    def login(self, uid, pwd):
> > +        raise NotImplementedError()
> > +
> > +
> > +class LDAPLogin(Login):
> > +    def __init__(self):
> > +        super(LDAPLogin, self).__init__()
> > +
> > +        self.__conn = ldap.initialize('ldap://' + api.env.host)
> > +
> > +    def login(self, uid, pwd):
> > +        dn = DN(('uid', uid), api.env.container_user, api.env.basedn)
> > +        try:
> > +            self.__conn.simple_bind_s(str(dn), pwd)
> > +        except ldap.INVALID_CREDENTIALS as e:
> > +            raise AuthenticationError(e.message)
> > +
> > +
> > +class Krb5Login(Login):
> > +    def __init__(self):
> > +        super(Krb5Login, self).__init__()
> > +
> > +        self.__fast = subprocess.Popen(
> > +            ['/usr/bin/klist'],
> > +            stdout=subprocess.PIPE
> > +        ).communicate()[0].split('\n')[0].split(': ')[1]
> > +
> > +    def kinit(self, uid, pwd, newpwd=None, ccache="FILE:/dev/null"):
> > +        data = pwd + '\n'
> > +        if newpwd is not None:
> > +            data += newpwd + '\n' + newpwd + '\n'
> > +
> > +        p = subprocess.Popen(
> > +            ['/usr/bin/kinit',
> > +             '-T', self.__fast,
> > +             '-c', ccache,
> > +             uid + '@' + api.env.realm],
> > +            stdin=subprocess.PIPE,
> > +            stdout=subprocess.PIPE,
> > +            stderr=subprocess.PIPE
> > +        )
> > +        out, err = p.communicate(data)
> 
> Can you use ipautil.run here?

Fixed.

> > +        if p.returncode != 0:
> > +            raise AuthenticationError(out, err)
> > +
> > +    def login(self, uid, pwd):
> > +        self.kinit(uid, pwd)
> > +
> > +
> > +class User(Fixture):
> > +    def __init__(self, **kwargs):
> > +        self.username = u''.join(random.sample(string.lowercase, 20))
> > +        self.password = u''.join(random.sample(string.letters, 20))
> 
> Could you prefix the test user with `test`, so it's identifiable when it 
> doesn't get deleted for some reason?

Fixed.

> > +        self.__kwargs = kwargs
> > +
> > +    def __enter__(self):
> > +        cmd('user_add', self.username,
> > +            userpassword=self.password,
> > +            givenname=self.username[:len(self.username) / 2],
> > +            sn=self.username[len(self.username) / 2:],
> > +            **self.__kwargs)
> > +
> > +        # Change password
> > +        Krb5Login().kinit(self.username, self.password, self.password)
> > +        return self
> > +
> > +    def __exit__(self, type, value, traceback):
> > +        cmd('user_del', self.username)
> > +
> > +
> > +class Enablement(Fixture):
> > +    def __repr__(self):
> > +        return "Enablement(%s)" % ("user" if self.user else "global")
> > +
> > +    def __set_authtype(self, authtype):
> > +        if self.user:
> > +            for fixture in self.context:
> > +                if isinstance(fixture, User):
> > +                    cmd('user_mod', fixture.username, 
> > ipauserauthtype=authtype)
> > +                    return
> > +            assert False
> > +        else:
> > +            cmd('config_mod', ipauserauthtype=authtype)
> > +            time.sleep(60)  # Work around cfg update hack in ipa-kdb
> 
> The config_mod might raise EmptyModList which you should ignore.

Fixed.

> > +
> > +    def __init__(self, user=True):
> > +        self.user = user
> > +
> > +    def __enter__(self):
> > +        self.__set_authtype((u'otp',))
> > +        return self
> > +
> > +    def __exit__(self, type, value, traceback):
> > +        self.__set_authtype(())
> > +
> > +
> > +class Token(dict, Fixture):
> 
> Why is Token a dict, as opposed to having a "params" dict?

Because a token is just a collection of parameters. See how we define
the tokens in the fixtures.

> > +    @property
> > +    def type(self):
> > +        return self[u'type'].upper()
> > +
> > +    @property
> > +    def active(self):
> > +        if self.get(u'ipatokendisabled', False):
> > +            return False
> > +
> > +        nb = self.get(u'ipatokennotbefore', None)
> > +        if nb is not None and datetime.now() < nb:
> > +            return False
> > +
> > +        na = self.get(u'ipatokennotafter', None)
> > +        if na is not None and datetime.now() > na:
> > +            return False
> > +
> > +        return True
> > +
> > +    def __repr__(self):
> > +        prefixes = ['ipatoken' + x for x in ('totp', 'hotp', 'otp', '')]
> > +
> > +        args = {}
> > +        for k, v in self.items():
> > +            for prefix in prefixes:
> > +                if k.startswith(prefix):
> > +                    k = k[len(prefix):]
> > +                    break
> > +
> > +            if k in ('key', 'uniqueid', 'owner'):
> > +                continue
> > +
> > +            if isinstance(v, datetime):
> > +                v = v.strftime("%Y%m%d%H%M%SZ")
> > +
> > +            args[k] = v
> > +
> > +        return "%s(%s:%d|%d%s%s%r)" % (
> > +            args.pop('type').upper(),
> > +            args.pop('algorithm'),
> > +            args.pop('digits'),
> > +            args.pop('counter', args.pop('timestep', 0)),
> > +            ':' if self.type == 'TOTP' else '',
> > +            str(args.pop('clockoffset', '')),
> > +            args
> > +        )
> > +
> > +    def otp(self, at=0):
> > +        # I first attempted implementing this with pyotp. However, pyotp 
> > has
> > +        # a critical bug which appeared in testing. I fixed this bug and
> > +        # submitted it upstream:https://github.com/nathforge/pyotp/pull/9
> > +        #
> > +        # However, upstream pyotp appears to be dead. For now, I have
> > +        # implemented the algorithm myself. In the future, it would be nice
> > +        # to use python-cryptography here.
> > +
> > +        # If the token is time-based, calculate the counter from the time.
> > +        if self.type == u"TOTP":
> > +            intrvl = self[u'ipatokentotptimestep']
> > +            offset = self.get(u'ipatokentotpclockoffset', 0)
> > +            at = (time.time() + offset + intrvl * at) / intrvl
> > +
> > +        # Otherwise, just account for the specified counter offset.
> > +        elif self.type == u"HOTP":
> > +            if at < 0:  # Skip invalid test offsets.
> > +                raise unittest.SkipTest()
> > +            at += self.get(u'ipatokenhotpcounter', 0)
> > +
> > +        # Create the HMAC of the current counter
> > +        countr = struct.pack("!Q", at)
> > +        hasher = getattr(hashlib, self[u'ipatokenotpalgorithm'])
> 
> hashlib.new(self[u'ipatokenotpalgorithm']) should work

It doesn't. HMAC needs the function itself as input, not a new instance.

> > +        digest = hmac.HMAC(self[u'ipatokenotpkey'], countr, 
> > hasher).digest()
> > +
> > +        # Get the number of digits
> > +        digits = self[u'ipatokenotpdigits']
> > +
> > +        # Truncate the digest
> > +        offset = ord(digest[-1]) & 0xf
> > +        binary = (ord(digest[offset+0]) & 0x7f) << 0x18
> > +        binary |= (ord(digest[offset+1]) & 0xff) << 0x10
> > +        binary |= (ord(digest[offset+2]) & 0xff) << 0x08
> > +        binary |= (ord(digest[offset+3]) & 0xff) << 0x00
> > +        binary = binary % (10 ** digits)
> > +
> > +        return "0" * (digits - len(str(binary))) + str(binary)
> 
> str(binary).rjust(digits, '0')

Fixed.

> > +    def __init__(self, **kwargs):
> > +        super(Token, self).__init__(**kwargs)
> > +
> > +        # Add in default values.
> > +        self.setdefault(u'ipatokenuniqueid', unicode(uuid.uuid4()))
> > +        self.setdefault(u'ipatokenotpkey', os.urandom(20))
> > +        for i in range(len(api.Object['otptoken'].params)):
> > +            param = api.Object['otptoken'].params[i]
> > +            if param.default is not None and param.name:
> > +                self.setdefault(param.name, param.default)
> > +
> > +        # Remove defaults that don't apply.
> > +        types = {
> > +            "HOTP": (u'ipatokenhotpcounter',),
> > +            "TOTP": (u'ipatokentotptimestep', u'ipatokentotpclockoffset'),
> > +        }
> > +        for k, names in types.items():
> > +            if k != self.type:
> > +                for name in names:
> > +                    del self[name]
> > +
> > +    def __enter__(self):
> > +        kwargs = {}
> > +        kwargs.update(self)
> 
> kwargs = dict(self)

Fixed.

> > +        # Get the owner from the user fixture.
> > +        for fixture in self.context:
> > +            if isinstance(fixture, User):
> > +                kwargs[u'ipatokenowner'] = fixture.username
> > +                break
> > +
> > +        # Add the token.
> > +        result = cmd('otptoken_add', **kwargs)
> > +
> > +        try:
> > +            # Remove the URI and validate the rest of the return value.
> > +            uri = result.get('result', {}).pop('uri', None)
> > +            expected = {}
> > +            for k, v in kwargs.items():
> > +                if k == 'setattr':
> > +                    continue
> > +
> > +                if isinstance(v, bool):
> > +                    expected[k] = (unicode(v).upper(),)
> > +                elif isinstance(v, int):
> > +                    expected[k] = (unicode(v),)
> > +                else:
> > +                    expected[k] = (v,)
> > +            expected[u'type'] = expected[u'type'][0].upper()
> > +            expected[u'dn'] = 
> > u'ipatokenuniqueid=%s,cn=otp,dc=example,dc=com'
> > +            expected[u'dn'] %= self[u'ipatokenuniqueid']
> 
> That only works for dc=example,dc=com; use:
>      DN(('ipatokenuniqueid', self[u'ipatokenuniqueid']), 'cn=otp', 
> api.env.basedn)

Fixed.

> > +            assert_deepequal({
> > +                u'summary': u'Added OTP token "%s"' % 
> > self[u'ipatokenuniqueid'],
> > +                u'result': expected,
> > +                u'value': self[u'ipatokenuniqueid'],
> > +            }, result)
> > +
> > +            # Validate the URI.
> > +            split = urlparse.urlsplit(uri)
> > +            assert split.scheme == u'otpauth'
> > +            assert split.netloc.upper() == self.type
> > +            assert split.path == '/%s@%s:%s' % (
> > +                kwargs[u'ipatokenowner'],
> > +                api.env.realm,
> > +                self[u'ipatokenuniqueid']
> > +            )
> > +
> > +            # Validate the query.
> > +            types = {
> > +                "HOTP": {u'counter': self.get(u'ipatokenhotpcounter')},
> > +                "TOTP": {u'period': self.get(u'ipatokentotptimestep')},
> > +            }
> > +            query = {
> > +                u'algorithm': self.get(u'ipatokenotpalgorithm'),
> > +                u'issuer': u'%s@%s' % (kwargs[u'ipatokenowner'], 
> > api.env.realm),
> > +                u'digits': self.get(u'ipatokenotpdigits'),
> > +                u'secret': base64.b32encode(self[u'ipatokenotpkey']),
> > +            }
> > +            query.update(types.get(self.type, {}))
> > +            assert_deepequal(
> > +                {k: (unicode(v),) for k, v in query.items()},
> > +                urlparse.parse_qs(split.query)
> > +            )
> > +        except:
> > +            cmd('otptoken_del', self[u'ipatokenuniqueid'])
> > +            raise
> > +
> > +        return self
> > +
> > +    def __exit__(self, type, value, traceback):
> > +        cmd('otptoken_del', self[u'ipatokenuniqueid'])
> > +
> > +
> > +def auth(success, login, user, token, pwd=None, at=0):
> > +    if pwd is None:
> > +        pwd = user.password
> > +
> > +    if isinstance(at, basestring):
> > +        code = at
> > +
> > +    else:
> > +        code = token.otp(at) if token else '123456'
> > +        if code is None:  # Skip invalid test offsets.
> > +            raise unittest.SkipTest()
> > +
> > +    if success:
> > +        login(user.username, pwd + code)
> > +    else:
> > +        raises(AuthenticationError, login, user.username, pwd + code)
> > +
> > +
> > +class TestEnablement:
> 
> Classes should derive from object. (Same for TestTokens, TestSync)

Fixed.

> > +    # This describes the success conditions for authentication.
> > +    #   1. If user-auth-type does not include otp: password-only.
> > +    #   2. Otherwise, if no token exists: password-only.
> > +    #   3. Otherwise: password AND code.
> > +    #
> > +    # All states not accounted for here MUST fail.
> > +    #
> > +    # Format: (enabled, token created, pwd given, token given): success
> > +    EXPECTATIONS = {
> > +        (False, False, True, False): True,  # Condition 1
> > +        (False, True,  True, False): True,  # Condition 1
> > +        (True,  False, True, False): True,  # Condition 2
> > +        (True,  True,  True, True):  True,  # Condition 3
> > +    }
> > +
> > +    @fixturize(Krb5Login(), User(), (
> > +            None,
> > +            Enablement(True),
> > +            Enablement(False)
> > +        ), (
> > +            None,
> > +            Token()
> > +        ), (
> > +            (True,  None, ''),        # Password-only
> > +            (True,  None, 0),         # Both
> > +            (False, '', 1),           # Code-only
> > +            (False, '', ''),          # Neither
> > +            (False, 'xxx', 2),        # Bad password
> > +            (False, None, '123456'),  # Bad code
> > +        )
> > +    )
> > +    def test(self, login, user, enablement, token, params):
> > +        success, pwd, at = params
> > +
> > +        if success:
> > +            success = self.EXPECTATIONS.get((
> > +                enablement is not None,
> > +                token is not None,
> > +                pwd is None,
> > +                isinstance(at, int)
> > +            ), False)
> > +
> > +        auth(success, login, user, token, pwd, at)
> > +
> > +
> > +class TestTokens:
> > +    TODAY = datetime.now().replace(microsecond=0)
> > +    TOMORROW = TODAY + timedelta(1)
> > +    YESTERDAY = TODAY - timedelta(1)
> > +
> > +    @fixturize(
> > +        Krb5Login(),
> > +        User(ipauserauthtype=(u'otp')), (
> > +            Token(type=u'hotp'),
> > +            Token(type=u'HOTP'),
> > +            Token(type=u'HOTP', ipatokenhotpcounter=1000),
> > +
> > +            Token(type=u'totp'),
> > +            Token(type=u'TOTP'),
> > +            Token(type=u'TOTP', ipatokentotptimestep=60),
> > +            Token(type=u'TOTP', ipatokentotpclockoffset=30000),
> > +
> > +            Token(ipatokenotpalgorithm=u'sha1'),
> > +            Token(ipatokenotpalgorithm=u'sha256'),
> > +            Token(ipatokenotpalgorithm=u'sha384'),
> > +            Token(ipatokenotpalgorithm=u'sha512'),
> > +
> > +            Token(ipatokenotpdigits=6),
> > +            Token(ipatokenotpdigits=8),
> > +
> > +            Token(ipatokendisabled=False),
> > +            Token(ipatokennotbefore=YESTERDAY),
> > +            Token(ipatokennotafter=TOMORROW),
> > +
> > +            Token(ipatokendisabled=True),
> > +            Token(ipatokennotbefore=TOMORROW),
> > +            Token(ipatokennotafter=YESTERDAY),
> > +        ), (
> > +            (False, -1000),  # Check distant past OTP
> > +            (True,  -2),     # Check past OTP
> > +            (True,   0),     # Check current OTP
> > +            (False,  0),     # Check duplicate OTP
> > +            (True,   1),     # Check next OTP
> > +            (True,   3),     # Check future OTP
> > +            (False,  1000),  # Check distant future OTP
> > +        )
> > +    )
> > +    def test(self, login, user, token, params):
> > +        auth(params[0] and token.active, login, user, token, None, 
> > params[1])
> > +
> > +
> > +class TestSync:
> > +    """NOTE: this test requires ca.crt in your confdir so that the client 
> > can
> > +validate the server's certificate."""
> > +
> > +    @fixturize(Krb5Login(), User(ipauserauthtype=(u'otp')), (
> > +            Token(type=u'hotp'),
> > +            Token(type=u'totp')
> > +        ), (
> > +            (False, True, 0),   # Normal token operation
> > +            (False, False, 7),  # Skip-ahead failure
> > +            (True, True, 7),    # Skip-ahead success after sync
> > +        )
> > +    )
> > +    def test(self, login, user, token, params):
> > +        sync, success, at = params
> > +
> > +        if sync:
> > +            cmd('otptoken_sync',
> > +                user=user.username,
> > +                password=user.password,
> > +                first_code=unicode(token.otp(at)),
> > +                second_code=unicode(token.otp(at + 1)))
> > +            at += 2
> > +
> > +        auth(success, login, user, token, None, at)
> > -- 2.1.0
> >
> >
> 
> 

From 347b87f422c2f7abf492d05f9d0811b3e473b67c Mon Sep 17 00:00:00 2001
From: Nathaniel McCallum <npmccal...@redhat.com>
Date: Thu, 20 Nov 2014 11:02:00 -0500
Subject: [PATCH] Add initial tests for OTP

This tests the general workflow for OTP including most possible
token combinations. It includes tests for all token options,
enablement scenarios and token synchronization.
---
 ipatests/test_xmlrpc/test_otptoken_plugin.py | 637 +++++++++++++++++++++++++++
 1 file changed, 637 insertions(+)
 create mode 100644 ipatests/test_xmlrpc/test_otptoken_plugin.py

diff --git a/ipatests/test_xmlrpc/test_otptoken_plugin.py b/ipatests/test_xmlrpc/test_otptoken_plugin.py
new file mode 100644
index 0000000000000000000000000000000000000000..d0f79c2a6aa0851145c9e5c114dd149af350b17d
--- /dev/null
+++ b/ipatests/test_xmlrpc/test_otptoken_plugin.py
@@ -0,0 +1,637 @@
+# Authors:
+#   Nathaniel McCallum <npmccal...@redhat.com>
+#
+# Copyright (C) 2014  Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib/plugins/otptoken.py` module.
+"""
+
+import abc
+import base64
+import hashlib
+import hmac
+import os
+import unittest
+import urlparse
+import uuid
+import random
+import string
+import struct
+import subprocess
+import time
+import types
+
+from datetime import datetime, timedelta
+
+
+from ipalib import api
+from ipalib.errors import EmptyModlist
+from ipatests.util import assert_deepequal
+from ipapython import ipautil
+from ipapython.version import API_VERSION
+from ipapython.dn import DN
+
+from nose.tools import assert_raises
+import ldap
+
+
+class Fixture(object):
+    fixtures = ()
+
+    def __repr__(self):
+        return '%s()' % self.__class__.__name__
+
+
+def fixturize(*args):
+    """This decorator is used to execute a test function multiple times within
+a given set of contexts. Let's look at some examples.
+
+    @fixturize(1, 2)
+    def test(a, b):
+        print a, b
+
+In the above example, test() will be executed once and will print "1 2".
+Let's move on to some more complex examples.
+
+    @fixturize(1, (2, 3))
+    def test(a, b):
+        print a, b
+
+If one of the parameters to the decorator is a tuple, multiple iterations are
+performed. In the above case, test() would execute twice, printing:
+    1 2
+    1 3
+
+This feature can be multiplied:
+
+    @fixturize(1, (2, 3), (4, 5))
+    def test(a, b, c):
+        print a, b
+
+The above would print:
+    1 2 4
+    1 2 5
+    1 3 4
+    1 3 5
+
+Parameters on the left have a longer lifecycle than on the right.
+
+Parameters can also have a context state. Take this example:
+
+    class Foo(object):
+        def __init__(self, name):
+            self.__name = name
+
+        def __enter__(self);
+            print "start", self.__name
+            return self
+
+        def __exit__(self, type, value, traceback):
+            print "end", self.__name
+
+
+    @fixturize(
+        (Foo("a"), Foo("b")),
+        (1, 2),
+    )
+    def test(file, number):
+        print number
+
+In this case, test() will be executed four times; once for each Foo() and
+number. When a new parameter comes into context, if an __enter__() method
+exists, it will be called. The same is true for __exit__() when it goes
+out of context.  So in this case, we would get this output:
+    start a
+    1
+    2
+    end a
+    start b
+    1
+    2
+    end a
+
+Again, notice that lifecycles are longer on the left.
+
+One last thing should be added. If the provided parameter has an attribute
+named "fixtures" it will be set to a tuple of the left-side active fixtures
+before calling __enter__(). This allows right-side parameters to use
+attributes of active left-side parameters when they are activated.
+
+For instance:
+
+    class ModifyUser(object):
+        def __init__(self, operation):
+            self.fixtures = ()
+            ...
+
+        def __enter__(self):
+            print self.fixtures[-1].name
+            ...
+        ...
+
+    @fixturize(
+        (User("alice"), User("bob")),
+        (ModifyUser("promote"), ModifyUser("demote")),
+    )
+    def test(file, number):
+        ...
+
+In this example, when each ModifyUser() comes into context, it prints the name
+of the currently instantiated user. This means that parameters on the right
+can interact with the parameters on the left.
+"""
+    def outer(func):
+        def inner(s, fixtures=args, ctx=()):
+            try:
+                fixture = fixtures[0]
+            except IndexError:
+                yield (lambda *a: func(s, *a),) + ctx
+            else:
+                for f in fixture if type(fixture) is tuple else (fixture,):
+                    if hasattr(f, "fixtures"):
+                        f.fixtures = ctx
+
+                    if hasattr(f, "__enter__"):
+                        with f as tmp:
+                            for test in inner(s, fixtures[1:], ctx + (tmp,)):
+                                yield test
+                    else:
+                        for test in inner(s, fixtures[1:], ctx + (f,)):
+                            yield test
+
+        inner.__name__ = func.__name__
+        return inner
+    return outer
+
+
+def cmd(cmd, *args, **kwargs):
+    return api.Command[cmd](*args, version=API_VERSION, **kwargs)
+
+
+class AuthenticationError(Exception):
+    pass
+
+
+class Login(object):
+    """A base class for login methods.
+
+In particular, it implements a behavior which calls 'user_unlock'
+automatically to prevent locking of the account after a series of failed
+logins. This is needed because we are often testing for failures and we
+don't want false-positives. Nor do we want the account locked after testing
+failures several times in a row.
+"""
+
+    def __repr__(self):
+        return '%s()' % self.__class__.__name__
+
+    def __init__(self):
+        self.__fails = 0
+
+    def __call__(self, uid, pwd):
+        try:
+            self.login(uid, pwd)
+            self.__fails = 0
+        except:
+            self.__fails += 1
+            if self.__fails > 2:
+                cmd('user_unlock', uid)
+                self.__fails = 0
+            raise
+
+    def login(self, uid, pwd):
+        raise NotImplementedError()
+
+
+class LDAPLogin(Login):
+    "Log in using LDAP."
+
+    def __init__(self):
+        super(LDAPLogin, self).__init__()
+
+        self.__conn = ldap.initialize('ldap://' + api.env.host)
+
+    def login(self, uid, pwd):
+        dn = DN(('uid', uid), api.env.container_user, api.env.basedn)
+        try:
+            self.__conn.simple_bind_s(str(dn), pwd)
+        except ldap.INVALID_CREDENTIALS as e:
+            raise AuthenticationError(e.message)
+
+
+class Krb5Login(Login):
+    "Log in using Kerberos."
+
+    def __init__(self):
+        super(Krb5Login, self).__init__()
+
+        out = ipautil.run(['/usr/bin/klist'])[0]
+        self.__fast = out.split('\n')[0].split(': ')[1]
+
+    def kinit(self, uid, pwd, newpwd=None, ccache="FILE:/dev/null"):
+        data = pwd + '\n'
+        if newpwd is not None:
+            data += newpwd + '\n' + newpwd + '\n'
+
+        out, err, rc = ipautil.run(
+            ['/usr/bin/kinit',
+             '-T', self.__fast,
+             '-c', ccache,
+             uid + '@' + api.env.realm],
+            data,
+            False
+        )
+        if rc != 0:
+            raise AuthenticationError(out, err)
+
+    def login(self, uid, pwd):
+        self.kinit(uid, pwd)
+
+
+class User(Fixture):
+    def __init__(self, **kwargs):
+        self.username = u'test' + u''.join(random.sample(string.lowercase, 20))
+        self.password = u''.join(random.sample(string.letters, 20))
+        self.__kwargs = kwargs
+
+    def __enter__(self):
+        cmd('user_add', self.username,
+            userpassword=self.password,
+            givenname=self.username[:len(self.username) / 2],
+            sn=self.username[len(self.username) / 2:],
+            **self.__kwargs)
+
+        # Change password
+        Krb5Login().kinit(self.username, self.password, self.password)
+        return self
+
+    def __exit__(self, type, value, traceback):
+        cmd('user_del', self.username)
+
+
+class Enablement(Fixture):
+    def __repr__(self):
+        return "Enablement(%s)" % ("user" if self.user else "global")
+
+    def __set_authtype(self, authtype):
+        if self.user:
+            for fixture in self.fixtures:
+                if isinstance(fixture, User):
+                    cmd('user_mod', fixture.username, ipauserauthtype=authtype)
+                    return
+            assert False
+        else:
+            try:
+                cmd('config_mod', ipauserauthtype=authtype)
+                time.sleep(60)  # Work around cfg update hack in ipa-kdb
+            except EmptyModlist:
+                pass
+
+    def __init__(self, user=True):
+        self.user = user
+
+    def __enter__(self):
+        self.__set_authtype((u'otp',))
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self.__set_authtype(())
+
+
+class Token(dict, Fixture):
+    @property
+    def type(self):
+        return self[u'type'].upper()
+
+    @property
+    def active(self):
+        if self.get(u'ipatokendisabled', False):
+            return False
+
+        nb = self.get(u'ipatokennotbefore', None)
+        if nb is not None and datetime.now() < nb:
+            return False
+
+        na = self.get(u'ipatokennotafter', None)
+        if na is not None and datetime.now() > na:
+            return False
+
+        return True
+
+    def __repr__(self):
+        prefixes = ['ipatoken' + x for x in ('totp', 'hotp', 'otp', '')]
+
+        args = {}
+        for k, v in self.items():
+            for prefix in prefixes:
+                if k.startswith(prefix):
+                    k = k[len(prefix):]
+                    break
+
+            if k in ('key', 'uniqueid', 'owner'):
+                continue
+
+            if isinstance(v, datetime):
+                v = v.strftime("%Y%m%d%H%M%SZ")
+
+            args[k] = v
+
+        return "%s(%s:%d|%d%s%s%r)" % (
+            args.pop('type').upper(),
+            args.pop('algorithm'),
+            args.pop('digits'),
+            args.pop('counter', args.pop('timestep', 0)),
+            ':' if self.type == 'TOTP' else '',
+            str(args.pop('clockoffset', '')),
+            args
+        )
+
+    def otp(self, at=0):
+        # I first attempted implementing this with pyotp. However, pyotp has
+        # a critical bug which appeared in testing. I fixed this bug and
+        # submitted it upstream: https://github.com/nathforge/pyotp/pull/9
+        #
+        # However, upstream pyotp appears to be dead. For now, I have
+        # implemented the algorithm myself. In the future, it would be nice
+        # to use python-cryptography here.
+
+        # If the token is time-based, calculate the counter from the time.
+        if self.type == u"TOTP":
+            intrvl = self[u'ipatokentotptimestep']
+            offset = self.get(u'ipatokentotpclockoffset', 0)
+            at = (time.time() + offset + intrvl * at) / intrvl
+
+        # Otherwise, just account for the specified counter offset.
+        elif self.type == u"HOTP":
+            if at < 0:  # Skip invalid test offsets.
+                raise unittest.SkipTest()
+            at += self.get(u'ipatokenhotpcounter', 0)
+
+        # Create the HMAC of the current counter
+        countr = struct.pack("!Q", at)
+        hasher = getattr(hashlib, self[u'ipatokenotpalgorithm'])
+        digest = hmac.HMAC(self[u'ipatokenotpkey'], countr, hasher).digest()
+
+        # Get the number of digits
+        digits = self[u'ipatokenotpdigits']
+
+        # Truncate the digest
+        offset = ord(digest[-1]) & 0xf
+        binary = (ord(digest[offset+0]) & 0x7f) << 0x18
+        binary |= (ord(digest[offset+1]) & 0xff) << 0x10
+        binary |= (ord(digest[offset+2]) & 0xff) << 0x08
+        binary |= (ord(digest[offset+3]) & 0xff) << 0x00
+        binary = binary % (10 ** digits)
+
+        return str(binary).rjust(digits, '0')
+
+    def __init__(self, **kwargs):
+        super(Token, self).__init__(**kwargs)
+
+        # Add in default values.
+        self.setdefault(u'ipatokenuniqueid', unicode(uuid.uuid4()))
+        self.setdefault(u'ipatokenotpkey', os.urandom(20))
+        for i in range(len(api.Object['otptoken'].params)):
+            param = api.Object['otptoken'].params[i]
+            if param.default is not None and param.name:
+                self.setdefault(param.name, param.default)
+
+        # Remove defaults that don't apply.
+        types = {
+            "HOTP": (u'ipatokenhotpcounter',),
+            "TOTP": (u'ipatokentotptimestep', u'ipatokentotpclockoffset'),
+        }
+        for k, names in types.items():
+            if k != self.type:
+                for name in names:
+                    del self[name]
+
+    def __enter__(self):
+        kwargs = dict(self)
+
+        # Get the owner from the user fixture.
+        for fixture in self.fixtures:
+            if isinstance(fixture, User):
+                kwargs[u'ipatokenowner'] = fixture.username
+                break
+
+        # Add the token.
+        result = cmd('otptoken_add', **kwargs)
+
+        try:
+            # Remove the URI and validate the rest of the return value.
+            uri = result.get('result', {}).pop('uri', None)
+            expected = {}
+            for k, v in kwargs.items():
+                if k == 'setattr':
+                    continue
+
+                if isinstance(v, bool):
+                    expected[k] = (unicode(v).upper(),)
+                elif isinstance(v, int):
+                    expected[k] = (unicode(v),)
+                else:
+                    expected[k] = (v,)
+            expected[u'type'] = expected[u'type'][0].upper()
+            expected[u'dn'] = DN(
+                 ('ipatokenuniqueid', self[u'ipatokenuniqueid']),
+                 'cn=otp',
+                 api.env.basedn
+            )
+            assert_deepequal({
+                u'summary': u'Added OTP token "%s"' % self[u'ipatokenuniqueid'],
+                u'result': expected,
+                u'value': self[u'ipatokenuniqueid'],
+            }, result)
+
+            # Validate the URI.
+            split = urlparse.urlsplit(uri)
+            assert split.scheme == u'otpauth'
+            assert split.netloc.upper() == self.type
+            assert split.path == '/%s@%s:%s' % (
+                kwargs[u'ipatokenowner'],
+                api.env.realm,
+                self[u'ipatokenuniqueid']
+            )
+
+            # Validate the query.
+            types = {
+                "HOTP": {u'counter': self.get(u'ipatokenhotpcounter')},
+                "TOTP": {u'period': self.get(u'ipatokentotptimestep')},
+            }
+            query = {
+                u'algorithm': self.get(u'ipatokenotpalgorithm'),
+                u'issuer': u'%s@%s' % (kwargs[u'ipatokenowner'], api.env.realm),
+                u'digits': self.get(u'ipatokenotpdigits'),
+                u'secret': base64.b32encode(self[u'ipatokenotpkey']),
+            }
+            query.update(types.get(self.type, {}))
+            assert_deepequal(
+                {k: (unicode(v),) for k, v in query.items()},
+                urlparse.parse_qs(split.query)
+            )
+        except:
+            cmd('otptoken_del', self[u'ipatokenuniqueid'])
+            raise
+
+        return self
+
+    def __exit__(self, type, value, traceback):
+        cmd('otptoken_del', self[u'ipatokenuniqueid'])
+
+
+def auth(success, login, user, token, pwd=None, at=0):
+    if pwd is None:
+        pwd = user.password
+
+    if isinstance(at, basestring):
+        code = at
+
+    else:
+        code = token.otp(at) if token else '123456'
+        if code is None:  # Skip invalid test offsets.
+            raise unittest.SkipTest()
+
+    if success:
+        login(user.username, pwd + code)
+    else:
+        with assert_raises(AuthenticationError):
+            login(user.username, pwd + code)
+
+
+class TestEnablement(object):
+    # This describes the success conditions for authentication.
+    #   1. If user-auth-type does not include otp: password-only.
+    #   2. Otherwise, if no token exists: password-only.
+    #   3. Otherwise: password AND code.
+    #
+    # All states not accounted for here MUST fail.
+    #
+    # Format: (enabled, token created, pwd given, token given): success
+    EXPECTATIONS = {
+        (False, False, True, False): True,  # Condition 1
+        (False, True,  True, False): True,  # Condition 1
+        (True,  False, True, False): True,  # Condition 2
+        (True,  True,  True, True):  True,  # Condition 3
+    }
+
+    @fixturize(Krb5Login(), User(), (
+            None,
+            Enablement(True),
+            Enablement(False)
+        ), (
+            None,
+            Token()
+        ), (
+            (True,  None, ''),        # Password-only
+            (True,  None, 0),         # Both
+            (False, '', 1),           # Code-only
+            (False, '', ''),          # Neither
+            (False, 'xxx', 2),        # Bad password
+            (False, None, '123456'),  # Bad code
+        )
+    )
+    def test(self, login, user, enablement, token, params):
+        success, pwd, at = params
+
+        if success:
+            success = self.EXPECTATIONS.get((
+                enablement is not None,
+                token is not None,
+                pwd is None,
+                isinstance(at, int)
+            ), False)
+
+        auth(success, login, user, token, pwd, at)
+
+
+class TestTokens(object):
+    TODAY = datetime.now().replace(microsecond=0)
+    TOMORROW = TODAY + timedelta(1)
+    YESTERDAY = TODAY - timedelta(1)
+
+    @fixturize(
+        Krb5Login(),
+        User(ipauserauthtype=(u'otp')), (
+            Token(type=u'hotp'),
+            Token(type=u'HOTP'),
+            Token(type=u'HOTP', ipatokenhotpcounter=1000),
+
+            Token(type=u'totp'),
+            Token(type=u'TOTP'),
+            Token(type=u'TOTP', ipatokentotptimestep=60),
+            Token(type=u'TOTP', ipatokentotpclockoffset=30000),
+
+            Token(ipatokenotpalgorithm=u'sha1'),
+            Token(ipatokenotpalgorithm=u'sha256'),
+            Token(ipatokenotpalgorithm=u'sha384'),
+            Token(ipatokenotpalgorithm=u'sha512'),
+
+            Token(ipatokenotpdigits=6),
+            Token(ipatokenotpdigits=8),
+
+            Token(ipatokendisabled=False),
+            Token(ipatokennotbefore=YESTERDAY),
+            Token(ipatokennotafter=TOMORROW),
+
+            Token(ipatokendisabled=True),
+            Token(ipatokennotbefore=TOMORROW),
+            Token(ipatokennotafter=YESTERDAY),
+        ), (
+            (False, -1000),  # Check distant past OTP
+            (True,  -2),     # Check past OTP
+            (True,   0),     # Check current OTP
+            (False,  0),     # Check duplicate OTP
+            (True,   1),     # Check next OTP
+            (True,   3),     # Check future OTP
+            (False,  1000),  # Check distant future OTP
+        )
+    )
+    def test(self, login, user, token, params):
+        auth(params[0] and token.active, login, user, token, None, params[1])
+
+
+class TestSync(object):
+    """NOTE: this test requires ca.crt in your confdir so that the client can
+validate the server's certificate."""
+
+    @fixturize(Krb5Login(), User(ipauserauthtype=(u'otp')), (
+            Token(type=u'hotp'),
+            Token(type=u'totp')
+        ), (
+            (False, True, 0),   # Normal token operation
+            (False, False, 7),  # Skip-ahead failure
+            (True, True, 7),    # Skip-ahead success after sync
+        )
+    )
+    def test(self, login, user, token, params):
+        sync, success, at = params
+
+        if sync:
+            cmd('otptoken_sync',
+                user=user.username,
+                password=user.password,
+                first_code=unicode(token.otp(at)),
+                second_code=unicode(token.otp(at + 1)))
+            at += 2
+
+        auth(success, login, user, token, None, at)
+
+
+if not api.Backend.rpcclient.isconnected():
+    api.Backend.rpcclient.connect(fallback=False)
-- 
2.1.0

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to