Hi Phillip,
I am not sure if we really need the mock reactor. Twisted provides very good trial test support for creating local loopbacks between Twisted clients and Twisted servers. What is the issue with run and stop of a reactor? To my knowledge this should not be a problem. Also if all the unit tests were running in the same process why would you need to stop and start the reactor?

I have attached my recent submissions to twisted core which include a pop3TestServer, a pop3Client, and a unittest illustrating how to set up local client server communication.


Thoughts?




Phillip J. Eby wrote:

Hi. I've heard recently that there are some tests that ideally would need to run under the Twisted reactor, in order to properly exercise the functionality under test. However, there are a number of issues including the possible need for multiple uses of run/stop, dependency on external servers, test duration, etc.

There is, however, a relatively straightforward solution: use a mock reactor. I've successfully used this approach in the past to test event-driven libraries, although it was only with a subset of the full Twisted reactor capabilities. A mock reactor can be stopped, reset, and started as many times as you like, because it doesn't rely on hooking a GUI event loop, running in a separate thread, or anything like that.

A mock reactor can run in "simulated time", which means that it uses a time() function that runs faster than "real" time. For example, if you schedule a callback, and there's no pending simulated I/O or other scheduled calls, the simulated time jumps ahead to the next scheduled callback.

One additional side benefit of simulated time is that it's deterministic and therefore can be reliably reproduced in repeated tests. In PEAK, for example, I once wrote tests for some components that might be compared to WakeupCallers in Chandler. I had several scheduled to wake up on various intervals, and the test then verified that they had run at all the times they should have. Since the time is simulated, there were no rounding errors or clock precision to take into account, and the tests could instantaneously whether they were simulating seconds, minutes, or even hours of scheduling operations.

A mock reactor for Chandler tests could also use my "mockets" (fake sockets) library in order to avoid doing any "real" network I/O, allowing servers to listen and clients to connect to addresses on a virtual network that exists only in the process' memory, thereby avoiding the complexity of using external processes to set up and tear down servers or depending on other servers being up and having connectivity to them.

Although I don't have a complete mock reactor implementation, I do have most of the prerequisites and experience that would be needed to implement one, if anybody is interested. So, if you have things (like Zanshin, Chandler client protocols, etc.) that need reactor-based testing, and would be interested in helping me test a mock reactor for your test cases, let me know. It's also possible that this could be a joint project with the Twisted folks; as early as last year, Itamar expressed an interest in allowing test reactors to run using simulated time:

http://twistedmatrix.com/pipermail/twisted-python/2004-January/006982.html

And I would be surprised if they're not interested in having a mocket-based reactor as well. The last hacking I did on Twisted was around 1.1, so it might take me some time to get familiar with the 2.0 reactor interfaces. However, unless there are major differences I don't expect it to be difficult to do; Twisted is designed to isolate code from the underlying transport mechanism in use. About the only "interesting" part would likely be SSL/TLS, since I doubt M2Crypto and OpenSSL will want to talk to mocket objects instead of real sockets. It might be necessary to create mock SSL "Transport" objects as well as a mock reactor.

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev



--
Brian Kirsch - Email Framework Engineer
Open Source Applications Foundation
543 Howard St. 5th Floor
San Francisco, CA 94105
(415) 946-3056
http://www.osafoundation.org

# -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) 2001-2004 Divmod Inc.
# See LICENSE for details.

"""POP3 client protocol implementation

Don't use this module directly.  Use twisted.mail.pop3 instead.

@author U{Jp Calderone<mailto:[EMAIL PROTECTED]>}

API Stability: Unstable
"""

import re, md5

from twisted.python import log
from twisted.internet import defer
from twisted.protocols import basic
from twisted.protocols import policies
from twisted.internet import error
from twisted.internet import interfaces

OK = '+OK'
ERR = '-ERR'

class POP3ClientError(Exception):
    """Base class for all exceptions raised by POP3Client.
    """

class InsecureAuthenticationDisallowed(POP3ClientError):
    """Secure authentication was required but no mechanism could be found.
    """

class TLSError(POP3ClientError):
    """Secure authentication was required but no mechanism could be found.
    """

class TLSNotSupportedError(POP3ClientError):
    """Secure authentication was required but no mechanism could be found.
    """

class OptionNotSupportedError(POP3ClientError):
    """Secure authentication was required but no mechanism could be found.
    """


class ServerErrorResponse(POP3ClientError):
    """The server returned an error response to a request.
    """
    def __init__(self, reason, consumer=None):
        POP3ClientError.__init__(self, reason)
        self.consumer = consumer

class LineTooLong(POP3ClientError):
    """The server sent an extremely long line.
    """

class _ListSetter:
    # Internal helper.  POP3 responses sometimes occur in the
    # form of a list of lines containing two pieces of data,
    # a message index and a value of some sort.  When a message
    # is deleted, it is omitted from these responses.  The
    # setitem method of this class is meant to be called with
    # these two values.  In the cases where indexes are skipped,
    # it takes care of padding out the missing values with None.
    def __init__(self, L):
        self.L = L
    def setitem(self, (item, value)):
        diff = item - len(self.L) + 1
        if diff > 0:
            self.L.extend([None] * diff)
        self.L[item] = value


def _statXform(line):
    # Parse a STAT response
    numMsgs, totalSize = line.split(None, 1)
    return int(numMsgs), int(totalSize)


def _listXform(line):
    # Parse a LIST response
    index, size = line.split(None, 1)
    return int(index) - 1, int(size)


def _uidXform(line):
    # Parse a UIDL response
    index, uid = line.split(None, 1)
    return int(index) - 1, uid

def _codeStatusSplit(line):
    # Parse an +OK or -ERR response
    parts = line.split(' ', 1)
    if len(parts) == 1:
        return parts[0], ''
    return parts

def _dotUnquoter(line):
    """
    '.' characters which begin a line of a message are doubled to avoid
    confusing with the terminating '.\r\n' sequence.  This function unquotes
    them.
    """
    if line.startswith('..'):
        return line[1:]
    return line

class POP3Client(basic.LineOnlyReceiver, policies.TimeoutMixin):
    """POP3 client protocol implementation class

    Instances of this class provide a convenient, efficient API for
    retrieving and deleting messages from a POP3 server.
    """

    # Capabilities are not allowed to change during the session
    # So cache the first response and use that for all later
    # lookups
    _capCache = None

    # Whether STARTTLS has been issued successfully yet or not.
    startedTLS = False

    # Indicate whether login() should be allowed if the server
    # offers no authentication challenge and if our transport
    # does not offer any protection via encryption.
    allowInsecureLogin = False

    # Regular expression to search for in the challenge string in the server
    # greeting line.
    challengeMagicRe = re.compile('(<[^>]+>)')

    # Challenge received from the server
    serverChallenge = None

    # List of pending calls.
    # We are a pipelining API but don't actually
    # support pipelining on the network yet.
    _blockedQueue = None

    # The Deferred to which the very next result will go.
    waiting = None

    # Number of seconds to wait before timing out a connection.
    # If the number is <= 0 no timeout checking will be performed.
    timeout = 0

    #Overides LineOnlyReceiver to set a larger max length.
    MAX_LENGTH = 16384 * 2

    def __init__(self, contextFactory = None):
        self.context = contextFactory
        self.timedOut = False


    def _blocked(self, f, *a):
        # Internal helper.  If commands are being blocked, append
        # the given command and arguments to a list and return a Deferred
        # that will be chained with the return value of the function
        # when it eventually runs.  Otherwise, set up for commands to be

        # blocked and return None.
        if self._blockedQueue is not None:
            d = defer.Deferred()
            self._blockedQueue.append((d, f, a))
            return d
        self._blockedQueue = []
        return None

    def _unblock(self):
        # Internal helper.  Indicate that a function has completed.
        # If there are blocked commands, run the next one.  If there
        # are not, set up for the next command to not be blocked.
        if self._blockedQueue == []:
            self._blockedQueue = None
        elif self._blockedQueue is not None:
            d, f, a = self._blockedQueue.pop(0)

            d2 = f(*a)
            d2.chainDeferred(d)

    def sendShort(self, cmd, args):
        # Internal helper.  Send a command to which a short response
        # is expected.  Return a Deferred that fires when the response
        # is received.  Block all further commands from being sent until
        # the response is received.  Transition the state to SHORT.
        d = self._blocked(self.sendShort, cmd, args)
        if d is not None:
            return d

        if args:
            self.sendLine(cmd + ' ' + args)
        else:
            self.sendLine(cmd)
        self.state = 'SHORT'
        self.waiting = defer.Deferred()
        return self.waiting

    def sendLong(self, cmd, args, consumer, xform):
        # Internal helper.  Send a command to which a multiline
        # response is expected.  Return a Deferred that fires when
        # the entire response is received.  Block all further commands
        # from being sent until the entire response is received.
        # Transition the state to LONG_INITIAL.
        d = self._blocked(self.sendLong, cmd, args, consumer, xform)
        if d is not None:
            return d

        if args:
            self.sendLine(cmd + ' ' + args)
        else:
            self.sendLine(cmd)
        self.state = 'LONG_INITIAL'
        self.xform = xform
        self.consumer = consumer
        self.waiting = defer.Deferred()
        return self.waiting

    # Twisted protocol callback
    def connectionMade(self):
        if self.timeout > 0:
            self.setTimeout(self.timeout)

        self.state = 'WELCOME'

    def timeoutConnection(self):
        self.timedOut = True
        self.transport.loseConnection()

    def connectionLost(self, reason):
        if self.timeout > 0:
            self.setTimeout(None)

        if self.timedOut:
            reason = error.TimeoutError()
            self.timedOut = False

        d = []
        if self.waiting is not None:
            d.append(self.waiting)
            self.waiting = None
        if self._blockedQueue is not None:
            d.extend([deferred for (deferred, f, a) in self._blockedQueue])
            self._blockedQueue = None
        for w in d:
            w.errback(reason)

    def lineReceived(self, line):
        if self.timeout > 0:
            self.resetTimeout()

        state = self.state
        self.state = None
        state = getattr(self, 'state_' + state)(line) or state
        if self.state is None:
            self.state = state

    def lineLengthExceeded(self, buffer):
        # XXX - We need to be smarter about this
        if self.waiting is not None:
            waiting, self.waiting = self.waiting, None
            waiting.errback(LineTooLong())
        self.transport.loseConnection()

    # POP3 Client state logic - don't touch this.
    def state_WELCOME(self, line):
        # WELCOME is the first state.  The server sends one line of text
        # greeting us, possibly with an APOP challenge.  Transition the
        # state to WAITING.
        code, status = _codeStatusSplit(line)
        if code != OK:
            #XXX: Should raise some kind of error here
            self.transport.loseConnection()
        else:
            m = self.challengeMagicRe.search(status)

            if m is not None:
                self.serverChallenge = m.group(1)

            self.serverGreeting(self.serverChallenge)

        return 'WAITING'

    def state_WAITING(self, line):
        # The server isn't supposed to send us anything in this state.
        log.msg("Illegal line from server: " + repr(line))

    def state_SHORT(self, line):
        # This is the state we are in when waiting for a single
        # line response.  Parse it and fire the appropriate callback
        # or errback.  Transition the state back to WAITING.
        deferred, self.waiting = self.waiting, None
        self._unblock()
        code, status = _codeStatusSplit(line)
        if code == OK:
            deferred.callback(status)
        else:
            deferred.errback(ServerErrorResponse(status))
        return 'WAITING'

    def state_LONG_INITIAL(self, line):
        # This is the state we are in when waiting for the first
        # line of a long response.  Parse it and transition the
        # state to LONG if it is an okay response; if it is an
        # error response, fire an errback, clean up the things
        # waiting for a long response, and transition the state
        # to WAITING.
        code, status = _codeStatusSplit(line)
        if code == OK:
            return 'LONG'
        consumer = self.consumer
        deferred = self.waiting
        self.consumer = self.waiting = self.xform = None
        self._unblock()
        deferred.errback(ServerErrorResponse(status, consumer))
        return 'WAITING'

    def state_LONG(self, line):
        # This is the state for each line of a long response.
        # If it is the last line, finish things, fire the
        # Deferred, and transition the state to WAITING.
        # Otherwise, pass the line to the consumer.
        if line == '.':
            consumer = self.consumer
            deferred = self.waiting
            self.consumer = self.waiting = self.xform = None
            self._unblock()
            deferred.callback(consumer)
            return 'WAITING'
        else:
            if self.xform is not None:
                self.consumer(self.xform(line))
            else:
                self.consumer(line)
            return 'LONG'

    def serverGreeting(self, challenge):
        """Called when the server has sent us a greeting.

           @type challenge: C{Str} (None if no challenge returned in the Server 
Greeting)
           @param challenge: A POP3 server which implements the APOP command 
will
                             include a timestamp challenge in its banner 
greeting (RFC 1939).
                             .
        """

    def startTLS(self, contextFactory=None):
        """
        Initiates a 'STLS' request and negotiates the TLS / SSL
        Handshake.

        @param contextFactory: The TLS / SSL Context Factory to
        leverage.  If the contextFactory is None the POP3Client will
        either use the current TLS / SSL Context Factory or attempt to
        create a new one.

        @type contextFactory: C{ssl.ClientContextFactory}

        @return: A Deferred which fires when the transport has been
        secured according to the given contextFactory, or which fails
        if the transport cannot be secured.
        """

        if self._capCache is None:
            d = self.capabilities()

        else:
            d = defer.succeed(self._capCache)

        d.addCallback(self._startTLS, contextFactory)
        return d


    def _startTLS(self, caps, contextFactory):
        assert not self.startedTLS, "Client and Server are currently 
communicating via TLS"

        if contextFactory is None:
            contextFactory = self._getContextFactory()

        if contextFactory is None:
            return defer.fail(TLSError(
                "POP3Client requires a TLS context to "
                "initiate the STARTTLS handshake"))

        if 'STLS' not in caps:
            return defer.fail(TLSNotSupportedError(
                "Server does not support secure communication "
                "via TLS / SSL"))

        tls = interfaces.ITLSTransport(self.transport, None)

        if tls is None:
            return defer.fail(TLSError(
                "POP3Client transport does not implement "
                "interfaces.ITLSTransport"))

        d = self.sendShort('STLS', None)
        d.addCallback(self._startedTLS, contextFactory)
        d.addCallback(lambda _: self.capabilities())
        return d

    def _startedTLS(self, result, context):
        self.transport.startTLS(context)
        self._capCache = None
        self.startedTLS = True
        self.context = context
        return result

    def _getContextFactory(self):
        if self.context is not None:
            return self.context
        try:
            from twisted.internet import ssl
        except ImportError:
            return None
        else:
            context = ssl.ClientContextFactory()
            context.method = ssl.SSL.TLSv1_METHOD
            return context

    # External hooks - call these (most of 'em anyway)
    def login(self, username, password):
        """Log into the server.

        If APOP is available it will be used.  Otherwise, if
        TLS is available a 'STLS' session will be started and
        plaintext login will proceed.  Otherwise, if the
        instance attribute allowInsecureLogin is set to True,
        insecure plaintext login will proceed.  Otherwise,
        InsecureAuthenticationDisallowed will be raised
        (asynchronously).

        @param username: The username with which to log in.
        @param password: The password with which to log in.

        @rtype: C{Deferred}
        @return: A deferred which fires when login has
        completed.
        """
        if self._capCache is None:
            d = self.capabilities()

        else:
            d = defer.succeed(self._capCache)

        d.addCallback(self._login, username, password)
        return d

    def _login(self, caps, username, password):
        if self.serverChallenge is not None:
            return self._apop(username, password, self.serverChallenge)

        tryTLS = 'STLS' in caps

        #If our transport supports switching to TLS, we might want to try to 
switch to TLS.
        tlsableTransport = interfaces.ITLSTransport(self.transport, 
default=None) is not None

        # If our transport is not already using TLS, we might want to try to 
switch to TLS.
        nontlsTransport = interfaces.ISSLTransport(self.transport, 
default=None) is None

        if not self.startedTLS and tryTLS and tlsableTransport and 
nontlsTransport:
            d = self.startTLS()

            d.addCallback(self._loginTLS, username, password)
            return d

        elif self.startedTLS or self.allowInsecureLogin:
            return self._plaintext(username, password)
        else:
            return defer.fail(InsecureAuthenticationDisallowed())

    def _loginTLS(self, res, username, password):
        return self._plaintext(username, password)

    def _plaintext(self, username, password):
        # Internal helper.  Send a username/password pair, returning a Deferred
        # that fires when both have succeeded or fails when the server rejects
        # either.
        return self.user(username).addCallback(lambda r: 
self.password(password))

    def _apop(self, username, password, challenge):
        # Internal helper.  Computes and sends an APOP response.  Returns
        # a Deferred that fires when the server responds to the response.
        digest = md5.new(challenge + password).hexdigest()
        return self.apop(username, digest)

    def apop(self, username, digest):
        """Perform APOP login.

        This should be used in special circumstances only, when it is
        known that the server supports APOP authentication, and APOP
        authentication is absolutely required.  For the common case,
        use L{login} instead.

        @param username: The username with which to log in.
        @param digest: The challenge response to authenticate with.
        """
        return self.sendShort('APOP', username + ' ' + digest)

    def user(self, username):
        """Send the user command.

        This performs the first half of plaintext login.  Unless this
        is absolutely required, use the L{login} method instead.

        @param username: The username with which to log in.
        """
        return self.sendShort('USER', username)

    def password(self, password):
        """Send the password command.

        This performs the second half of plaintext login.  Unless this
        is absolutely required, use the L{login} method instead.

        @param password: The plaintext password with which to authenticate.
        """
        return self.sendShort('PASS', password)

    def delete(self, index):
        """Delete a message from the server.

        @type index: C{int}
        @param index: The index of the message to delete.
        This is 0-based.

        @rtype: C{Deferred}
        @return: A deferred which fires when the delete command
        is successful, or fails if the server returns an error.
        """
        return self.sendShort('DELE', str(index + 1))

    def _consumeOrSetItem(self, cmd, args, consumer, xform):
        # Internal helper.  Send a long command.  If no consumer is
        # provided, create a consumer that puts results into a list
        # and return a Deferred that fires with that list when it
        # is complete.
        if consumer is None:
            L = []
            consumer = _ListSetter(L).setitem
            return self.sendLong(cmd, args, consumer, xform).addCallback(lambda 
r: L)
        return self.sendLong(cmd, args, consumer, xform)

    def _consumeOrAppend(self, cmd, args, consumer, xform):
        # Internal helper.  Send a long command.  If no consumer is
        # provided, create a consumer that appends results to a list
        # and return a Deferred that fires with that list when it is
        # complete.
        if consumer is None:
            L = []
            consumer = L.append
            return self.sendLong(cmd, args, consumer, xform).addCallback(lambda 
r: L)
        return self.sendLong(cmd, args, consumer, xform)

    def capabilities(self, useCache=1):
        """Retrieve the capabilities supported by this server.
        """
        if useCache and self._capCache is not None:
            return defer.succeed(self._capCache)

        #Reset the Capabilities Cache
        self._capCache = {}

        d = self._consumeOrAppend('CAPA', None, self._capsConsumer, None)
        #cabilities is not supported by some POP servers. If an error
        #is thrown we still want the same behavior
        d.addBoth(self._cbCapabilities)
        return d

    def _cbCapabilities(self, result):
        """Returns the Capabilities to the caller"""
        return self._capCache


    def _capsConsumer(self, line):
        tmp = line.split()

        size = len(tmp)

        if size == 0:
            return

        if size == 1:
            self._capCache[tmp[0]] = None
        else:
            self._capCache[tmp[0]] = tmp[1:]

    def noop(self):
        return self.sendShort("NOOP", None)

    def rset(self):
        return self.sendShort("RSET", None)

    def retrieve(self, index, consumer=None, lines=None):
        """Retrieve a message from the server.

        If L{consumer} is not None, it will be called with
        each line of the message as it is received.  Otherwise,
        the returned Deferred will be fired with a list of all
        the lines when the message has been completely received.
        """
        idx = str(index + 1)
        if lines is None:
            return self._consumeOrAppend('RETR', idx, consumer, _dotUnquoter)

        return self._consumeOrAppend('TOP', '%s %d' % (idx, lines), consumer, 
_dotUnquoter)


    def stat(self):
        """Issues a 'STAT' request which is allowed in the TRANSACTION state 
(RFC 1939).
           The returned Deferred will be fired with a tuple containing the
           number or messages in the maildrop and the size of the
           maildrop in octets.
        """
        return self.sendShort('STAT', None).addCallback(_statXform)

    def listSize(self, consumer=None):
        """Retrieve a list of the size of all messages on the server.

        If L{consumer} is not None, it will be called with two-tuples
        of message index number and message size as they are received.
        Otherwise, a Deferred which will fire with a list of B{only}
        message sizes will be returned.  For messages which have been
        deleted, None will be used in place of the message size.
        """
        return self._consumeOrSetItem('LIST', None, consumer, _listXform)

    def listUID(self, consumer=None):
        """Retrieve a list of the UIDs of all messages on the server.

        If L{consumer} is not None, it will be called with two-tuples
        of message index number and message UID as they are received.
        Otherwise, a Deferred which will fire with of list of B{only}
        message UIDs will be returned.  For messages which have been
        deleted, None will be used in place of the message UID.
        """

        return self._consumeOrSetItem('UIDL', None, consumer, _uidXform)

    def quit(self):
        """Disconnect from the server.
        """
        return self.sendShort('QUIT', None)

__all__ = [
    # Exceptions
    'InsecureAuthenticationDisallowed', 'LineTooLong', 'POP3ClientError',
    'ServerErrorResponse', 'TLSError', 'TLSNotSupportedError', 
'OptionNotSupported',

    # Protocol classes
    'POP3Client']
#!/usr/local/bin/python
from twisted.internet.protocol import Factory
from twisted.protocols import basic
from twisted.internet import reactor
import sys

USER = "test"
PASS = "twisted"

PORT = 1100

SSL_SUPPORT = True
UIDL_SUPPORT = True
INVALID_SERVER_RESPONSE = False
INVALID_CAPABILITY_RESPONSE = False
INVALID_LOGIN_RESPONSE = False
DENY_CONNECTION = False
DROP_CONNECTION = False
BAD_TLS_RESPONSE = False
TIMEOUT_RESPONSE = False
TIMEOUT_DEFERRED = False
SLOW_GREETING = False

"""Commands"""
CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready" 

CAPABILITIES = [
"TOP",
"LOGIN-DELAY 180",
"USER",
"SASL LOGIN"
]

CAPABILITIES_SSL = "STLS"
CAPABILITIES_UIDL = "UIDL"

 
INVALID_RESPONSE = "-ERR Unknown request"
VALID_RESPONSE = "+OK Command Completed"
AUTH_DECLINED = "-ERR LOGIN failed"
AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
TLS_ERROR = "-ERR server side error start TLS handshake"
LOGOUT_COMPLETE = "+OK quit completed"
NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
STAT = "+OK 0 0"
UIDL = "+OK Unique-ID listing follows\r\n."
LIST = "+OK Mailbox scan listing follows\r\n."
CAP_START = "+OK Capability list follows:"


class POP3TestServer(basic.LineReceiver):
    def __init__(self, contextFactory = None):
        self.loggedIn = False
        self.caps = None
        self.tmpUser = None
        self.ctx = contextFactory 

    def sendSTATResp(self, req):
        self.sendLine(STAT)

    def sendUIDLResp(self, req):
        self.sendLine(UIDL)

    def sendLISTResp(self, req):
        self.sendLine(LIST)

    def sendCapabilities(self):
        if self.caps is None:
            self.caps = [CAP_START]

        if UIDL_SUPPORT:
            self.caps.append(CAPABILITIES_UIDL)

        if SSL_SUPPORT:
            self.caps.append(CAPABILITIES_SSL)

        for cap in CAPABILITIES:
            self.caps.append(cap)
        resp = '\r\n'.join(self.caps)
        resp += "\r\n."

        self.sendLine(resp)


    def connectionMade(self):
        if DENY_CONNECTION:
            self.transport.loseConnection()
            return

        if SLOW_GREETING:
            reactor.callLater(20, self.sendGreeting)

        else:
            self.sendGreeting()

    def sendGreeting(self):
        self.sendLine(CONNECTION_MADE)

    def lineReceived(self, line):
        """Error Conditions"""
        if TIMEOUT_RESPONSE:
            """Do not respond to clients request"""
            return

        if DROP_CONNECTION:
            self.transport.loseConnection()
            return

        elif "CAPA" in line.upper():
            if INVALID_CAPABILITY_RESPONSE:
                self.sendLine(INVALID_RESPONSE)
            else:
                self.sendCapabilities()

        elif "STLS" in line.upper() and SSL_SUPPORT:
            self.startTLS()

        elif "USER" in line.upper():
            if INVALID_LOGIN_RESPONSE:
                self.sendLine(INVALID_RESPONSE)
                return

            resp = None
            try:
                self.tmpUser = line.split(" ")[1]
                resp = VALID_RESPONSE
            except:
                resp = AUTH_DECLINED

            self.sendLine(resp)

        elif "PASS" in line.upper():
            resp = None
            try:
                pwd = line.split(" ")[1]

                if self.tmpUser is None or pwd is None:
                    resp = AUTH_DECLINED
                elif self.tmpUser == USER and pwd == PASS:
                    resp = AUTH_ACCEPTED
                    self.loggedIn = True
                else:
                    resp = AUTH_DECLINED
            except:
                resp = AUTH_DECLINED

            self.sendLine(resp)

        elif "QUIT" in line.upper():
            self.loggedIn = False
            self.sendLine(LOGOUT_COMPLETE)
            self.disconnect()

        elif INVALID_SERVER_RESPONSE:
            self.sendLine(INVALID_RESPONSE)

        elif not self.loggedIn:
            self.sendLine(NOT_LOGGED_IN)

        elif "NOOP" in line.upper():
            self.sendLine(VALID_RESPONSE)

        elif "STAT" in line.upper():
            if TIMEOUT_DEFERRED:
                return
            self.sendLine(STAT)

        elif "LIST" in line.upper():
            if TIMEOUT_DEFERRED:
                return
            self.sendLine(LIST)

        elif "UIDL" in line.upper():
            if TIMEOUT_DEFERRED:
                return
            elif not UIDL_SUPPORT:
                self.sendLine(INVALID_RESPONSE)
                return

            self.sendLine(UIDL)

    def startTLS(self):
        if self.ctx is None:
            self.getContext()

        if SSL_SUPPORT and self.ctx is not None:
            self.sendLine('+OK Begin TLS negotiation now')
            self.transport.startTLS(self.ctx)
        else:
            self.sendLine('+OK TLS not available')

    def disconnect(self):
        self.transport.loseConnection()

    def getContext(self):
        try:
            from twisted.internet import ssl
        except ImportError:
           self.ctx = None
        else:
            self.ctx = ssl.ClientContextFactory()
            self.ctx.method = ssl.SSL.TLSv1_METHOD


usage = """popServer.py [arg] (default is Standard POP Server with no messages)
no_ssl  - Start with no SSL support
no_uidl - Start with no UIDL support
bad_resp - Send a non-RFC compliant response to the Client
bad_cap_resp - send a non-RFC compliant response when the Client sends a 
'CAPABILITY' request
bad_login_resp - send a non-RFC compliant response when the Client sends a 
'LOGIN' request
deny - Deny the connection
drop - Drop the connection after sending the greeting
bad_tls - Send a bad response to a STARTTLS
timeout - Do not return a response to a Client request
to_deferred - Do not return a response on a 'Select' request. This
              will test Deferred callback handling
slow - Wait 20 seconds after the connection is made to return a Server Greeting
"""

def printMessage(msg):
    print "Server Starting in %s mode" % msg

def processArg(arg):

    if arg.lower() == 'no_ssl':
        global SSL_SUPPORT
        SSL_SUPPORT = False
        printMessage("NON-SSL")

    elif arg.lower() == 'no_uidl':
        global UIDL_SUPPORT
        UIDL_SUPPORT = False
        printMessage("NON-UIDL")

    elif arg.lower() == 'bad_resp':
        global INVALID_SERVER_RESPONSE
        INVALID_SERVER_RESPONSE = True
        printMessage("Invalid Server Response")

    elif arg.lower() == 'bad_cap_resp':
        global INVALID_CAPABILITY_RESPONSE
        INVALID_CAPABILITY_RESPONSE = True
        printMessage("Invalid Capability Response")

    elif arg.lower() == 'bad_login_resp':
        global INVALID_LOGIN_RESPONSE
        INVALID_LOGIN_RESPONSE = True
        printMessage("Invalid Capability Response")

    elif arg.lower() == 'deny':
        global DENY_CONNECTION 
        DENY_CONNECTION = True
        printMessage("Deny Connection")

    elif arg.lower() == 'drop':
        global DROP_CONNECTION 
        DROP_CONNECTION = True
        printMessage("Drop Connection")


    elif arg.lower() == 'bad_tls':
        global BAD_TLS_RESPONSE 
        BAD_TLS_RESPONSE = True
        printMessage("Bad TLS Response")

    elif arg.lower() == 'timeout':
        global TIMEOUT_RESPONSE
        TIMEOUT_RESPONSE = True
        printMessage("Timeout Response")

    elif arg.lower() == 'to_deferred':
        global TIMEOUT_DEFERRED
        TIMEOUT_DEFERRED = True
        printMessage("Timeout Deferred Response")

    elif arg.lower() == 'slow':
        global SLOW_GREETING
        SLOW_GREETING = True
        printMessage("Slow Greeting")

    elif arg.lower() == '--help':
        print usage
        sys.exit()

    else:
        print usage
        sys.exit()

def main():

    if len(sys.argv) < 2:
        printMessage("POP3 with no messages")
    else:
        args = sys.argv[1:]

        for arg in args:
            processArg(arg)

    f = Factory()
    f.protocol = POP3TestServer
    reactor.listenTCP(PORT, f)
    reactor.run()

if __name__ == '__main__':
    main()
# -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) 2001-2004 Divmod Inc.
# See LICENSE for details.

from twisted.mail.pop3 import AdvancedPOP3Client as POP3Client
from twisted.mail.pop3 import InsecureAuthenticationDisallowed
from twisted.mail.pop3 import ServerErrorResponse
from twisted.protocols import loopback
from twisted.internet import defer

from twisted.trial import unittest
from twisted.test.proto_helpers import StringTransport
from twisted.protocols import basic
import pop3TestServer

try:
    from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
except ImportError:
    ClientTLSContext = ServerTLSContext = None

capCache = {"TOP": None, "LOGIN-DELAY": "180", "UIDL": None, \
            "STLS": None, "USER": None, "SASL": "LOGIN"}
def setUp():
    p = POP3Client()

    p._capCache = capCache

    t = StringTransport()
    p.makeConnection(t)
    return p, t

def strip(f):
    return lambda result, f=f: f()

class POP3ClientLoginTestCase(unittest.TestCase):
    def testOkUser(self):
        p, t = setUp()
        d = p.user("username")
        self.assertEquals(t.value(), "USER username\r\n")
        p.dataReceived("+OK send password\r\n")
        return d.addCallback(unittest.assertEqual, "send password")

    def testBadUser(self):
        p, t = setUp()
        d = p.user("username")
        self.assertEquals(t.value(), "USER username\r\n")
        p.dataReceived("-ERR account suspended\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "account suspended")

    def testOkPass(self):
        p, t = setUp()
        d = p.password("password")
        self.assertEquals(t.value(), "PASS password\r\n")
        p.dataReceived("+OK you're in!\r\n")
        return d.addCallback(unittest.assertEqual, "you're in!")

    def testBadPass(self):
        p, t = setUp()
        d = p.password("password")
        self.assertEquals(t.value(), "PASS password\r\n")
        p.dataReceived("-ERR go away\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "go away")

    def testOkLogin(self):
        p, t = setUp()
        p.allowInsecureLogin = True
        d = p.login("username", "password")
        self.assertEquals(t.value(), "USER username\r\n")
        p.dataReceived("+OK go ahead\r\n")
        self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
        p.dataReceived("+OK password accepted\r\n")
        return d.addCallback(unittest.assertEqual, "password accepted")

    def testBadPasswordLogin(self):
        p, t = setUp()
        p.allowInsecureLogin = True
        d = p.login("username", "password")
        self.assertEquals(t.value(), "USER username\r\n")
        p.dataReceived("+OK waiting on you\r\n")
        self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
        p.dataReceived("-ERR bogus login\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "bogus login")

    def testBadUsernameLogin(self):
        p, t = setUp()
        p.allowInsecureLogin = True
        d = p.login("username", "password")
        self.assertEquals(t.value(), "USER username\r\n")
        p.dataReceived("-ERR bogus login\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "bogus login")

    def testServerGreeting(self):
        p, t = setUp()
        # Make sure it *isn't* in the instance dict, just for sanity
        self.failIfIn('serverChallenge', vars(p))
        p.dataReceived("+OK lalala this has no challenge\r\n")
        # Make sure it *is* in the instance dict and that it is None
        self.assertEquals(p.serverChallenge, None)

    def testServerGreetingWithChallenge(self):
        p, t = setUp()
        # Make sure it *isn't* in the instance dict, just for sanity
        self.failIfIn('serverChallenge', vars(p))
        p.dataReceived("+OK <here is the challenge>\r\n")
        # Make sure it *is* in the instance dict and is what we sent
        self.assertEquals(vars(p)['serverChallenge'], "<here is the challenge>")

    def testAPOP(self):
        p, t = setUp()
        p.dataReceived("+OK <challenge string goes here>\r\n")
        d = p.login("username", "password")
        self.assertEquals(t.value(), "APOP username 
f34f1e464d0d7927607753129cabe39a\r\n")
        p.dataReceived("+OK Welcome!\r\n")
        return d.addCallback(unittest.assertEqual, "Welcome!")

    def testInsecureLoginRaisesException(self):
        p, t = setUp()
        p.dataReceived("+OK Howdy")
        d = p.login("username", "password")
        self.failIf(t.value())
        self.assertRaises(InsecureAuthenticationDisallowed, unittest.wait, d)

class ListConsumer:
    def __init__(self):
        self.data = {}

    def consume(self, (item, value)):
        self.data.setdefault(item, []).append(value)

class MessageConsumer:
    def __init__(self):
        self.data = []

    def consume(self, line):
        self.data.append(line)

class POP3ClientListTestCase(unittest.TestCase):
    def testListSize(self):
        p, t = setUp()
        d = p.listSize()
        self.assertEquals(t.value(), "LIST\r\n")
        p.dataReceived("+OK Here it comes\r\n")
        p.dataReceived("1 3\r\n2 2\r\n3 1\r\n.\r\n")
        return d.addCallback(unittest.assertEqual, [3, 2, 1])

    def testListSizeWithConsumer(self):
        p, t = setUp()
        c = ListConsumer()
        f = c.consume
        d = p.listSize(f)
        self.assertEquals(t.value(), "LIST\r\n")
        p.dataReceived("+OK Here it comes\r\n")
        p.dataReceived("1 3\r\n2 2\r\n3 1\r\n")
        self.assertEquals(c.data, {0: [3], 1: [2], 2: [1]})
        p.dataReceived("5 3\r\n6 2\r\n7 1\r\n")
        self.assertEquals(c.data, {0: [3], 1: [2], 2: [1], 4: [3], 5: [2], 6: 
[1]})
        p.dataReceived(".\r\n")
        return d.addCallback(unittest.assertIdentical, f)

    def testFailedListSize(self):
        p, t = setUp()
        d = p.listSize()
        self.assertEquals(t.value(), "LIST\r\n")
        p.dataReceived("-ERR Fatal doom server exploded\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "Fatal doom server exploded")

    def testListUID(self):
        p, t = setUp()
        d = p.listUID()
        self.assertEquals(t.value(), "UIDL\r\n")
        p.dataReceived("+OK Here it comes\r\n")
        p.dataReceived("1 abc\r\n2 def\r\n3 ghi\r\n.\r\n")
        return d.addCallback(unittest.assertEqual, ["abc", "def", "ghi"])

    def testListUIDWithConsumer(self):
        p, t = setUp()
        c = ListConsumer()
        f = c.consume
        d = p.listUID(f)
        self.assertEquals(t.value(), "UIDL\r\n")
        p.dataReceived("+OK Here it comes\r\n")
        p.dataReceived("1 xyz\r\n2 abc\r\n5 mno\r\n")
        self.assertEquals(c.data, {0: ["xyz"], 1: ["abc"], 4: ["mno"]})
        p.dataReceived(".\r\n")
        return d.addCallback(unittest.assertIdentical, f)

    def testFailedListUID(self):
        p, t = setUp()
        d = p.listUID()
        self.assertEquals(t.value(), "UIDL\r\n")
        p.dataReceived("-ERR Fatal doom server exploded\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "Fatal doom server exploded")

class POP3ClientMessageTestCase(unittest.TestCase):
    def testRetrieve(self):
        p, t = setUp()
        d = p.retrieve(7)
        self.assertEquals(t.value(), "RETR 8\r\n")
        p.dataReceived("+OK Message incoming\r\n")
        p.dataReceived("La la la here is message text\r\n")
        p.dataReceived("..Further message text tra la la\r\n")
        p.dataReceived(".\r\n")
        return d.addCallback(
            unittest.assertEqual, 
            ["La la la here is message text",
             ".Further message text tra la la"])

    def testRetrieveWithConsumer(self):
        p, t = setUp()
        c = MessageConsumer()
        f = c.consume
        d = p.retrieve(7, f)
        self.assertEquals(t.value(), "RETR 8\r\n")
        p.dataReceived("+OK Message incoming\r\n")
        p.dataReceived("La la la here is message text\r\n")
        p.dataReceived("..Further message text\r\n.\r\n")
        self.assertIdentical(unittest.wait(d), f)
        self.assertEquals(c.data, ["La la la here is message text",
                                   ".Further message text"])

    def testPartialRetrieve(self):
        p, t = setUp()
        d = p.retrieve(7, lines=2)
        self.assertEquals(t.value(), "TOP 8 2\r\n")
        p.dataReceived("+OK 2 lines on the way\r\n")
        p.dataReceived("Line the first!  Woop\r\n")
        p.dataReceived("Line the last!  Bye\r\n")
        p.dataReceived(".\r\n")
        return d.addCallback(
            unittest.assertEqual,
            ["Line the first!  Woop",
             "Line the last!  Bye"])

    def testPartialRetrieveWithConsumer(self):
        p, t = setUp()
        c = MessageConsumer()
        f = c.consume
        d = p.retrieve(7, f, lines=2)
        self.assertEquals(t.value(), "TOP 8 2\r\n")
        p.dataReceived("+OK 2 lines on the way\r\n")
        p.dataReceived("Line the first!  Woop\r\n")
        p.dataReceived("Line the last!  Bye\r\n")
        p.dataReceived(".\r\n")
        self.assertIdentical(unittest.wait(d), f)
        self.assertEquals(c.data, ["Line the first!  Woop",
                                   "Line the last!  Bye"])

    def testFailedRetrieve(self):
        p, t = setUp()
        d = p.retrieve(0)
        self.assertEquals(t.value(), "RETR 1\r\n")
        p.dataReceived("-ERR Fatal doom server exploded\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "Fatal doom server exploded")

class POP3ClientMiscTestCase(unittest.TestCase):
    def testCapability(self):
        p, t = setUp()
        d = p.capabilities(useCache=0)
        self.assertEquals(t.value(), "CAPA\r\n")
        p.dataReceived("+OK Capabilities on the way\r\n")
        p.dataReceived("X\r\nY\r\nZ\r\n.\r\n")
        return d.addCallback(unittest.assertEqual, {"X": None, "Y": None, "Z": 
None})

    def testCapabilityError(self):
        p, t = setUp()
        d = p.capabilities(useCache=0)
        self.assertEquals(t.value(), "CAPA\r\n")
        p.dataReceived("-ERR This server is lame!\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "This server is lame!")

    def testStat(self):
        p, t = setUp()
        d = p.stat()
        self.assertEquals(t.value(), "STAT\r\n")
        p.dataReceived("+OK 1 1212\r\n")
        return d.addCallback(unittest.assertEqual, (1, 1212))

    def testStatError(self):
        p, t = setUp()
        d = p.stat()
        self.assertEquals(t.value(), "STAT\r\n")
        p.dataReceived("-ERR This server is lame!\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "This server is lame!")

    def testNoop(self):
        p, t = setUp()
        d = p.noop()
        self.assertEquals(t.value(), "NOOP\r\n")
        p.dataReceived("+OK No-op to you too!\r\n")
        return d.addCallback(unittest.assertEqual, "No-op to you too!")

    def testNoopError(self):
        p, t = setUp()
        d = p.noop()
        self.assertEquals(t.value(), "NOOP\r\n")
        p.dataReceived("-ERR This server is lame!\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "This server is lame!")

    def testRset(self):
        p, t = setUp()
        d = p.rset()
        self.assertEquals(t.value(), "RSET\r\n")
        p.dataReceived("+OK Reset state\r\n")
        return d.addCallback(unittest.assertEqual, "Reset state")

    def testRsetError(self):
        p, t = setUp()
        d = p.rset()
        self.assertEquals(t.value(), "RSET\r\n")
        p.dataReceived("-ERR This server is lame!\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "This server is lame!")

    def testDelete(self):
        p, t = setUp()
        d = p.delete(3)
        self.assertEquals(t.value(), "DELE 4\r\n")
        p.dataReceived("+OK Hasta la vista\r\n")
        return d.addCallback(unittest.assertEqual, "Hasta la vista")

    def testDeleteError(self):
        p, t = setUp()
        d = p.delete(3)
        self.assertEquals(t.value(), "DELE 4\r\n")
        p.dataReceived("-ERR Winner is not you.\r\n")
        exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
        self.assertEquals(exc.args[0], "Winner is not you.")


class SimpleClient(POP3Client):
    def __init__(self, deferred, contextFactory = None):
        POP3Client.__init__(self, contextFactory)
        self.deferred = deferred
        self.allowInsecureLogin = True

    def serverGreeting(self, challenge):
        self.deferred.callback(None)

class POP3HelperMixin:
    serverCTX = None
    clientCTX = None

    def setUp(self):
        d = defer.Deferred()
        self.server = 
pop3TestServer.POP3TestServer(contextFactory=self.serverCTX)
        self.client = SimpleClient(d, contextFactory=self.clientCTX)
        self.client.timeout = 30
        self.connected = d

    def tearDown(self):
        del self.server
        del self.client
        del self.connected

    def _cbStopClient(self, ignore):
        self.client.transport.loseConnection()

    def _ebGeneral(self, failure):
        self.client.transport.loseConnection()
        self.server.transport.loseConnection()
        failure.printTraceback(open('failure.log', 'w'))
        failure.printTraceback()
        raise failure.value

    def loopback(self):
        loopback.loopbackTCP(self.server, self.client, noisy=False)

class POP3TLSTestCase(POP3HelperMixin, unittest.TestCase):
    serverCTX = ServerTLSContext and ServerTLSContext()
    clientCTX = ClientTLSContext and ClientTLSContext()

    def testStartTLS(self):
        def login():
            #this will startTLS automatically
            return self.client.login('test', 'twisted')

        def quit():
            return self.client.quit()

        methods = [login, quit]
        map(self.connected.addCallback, map(strip, methods))
        self.connected.addCallback(login)
        self.connected.addCallback(quit)
        self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
        self.loopback()

class POP3TimeoutTestCase(POP3HelperMixin, unittest.TestCase):
    def testTimeout(self):
        def login():
            #this will startTLS automatically
            d = self.client.login('test', 'twisted')
            d.addErrback(timedOut)
            return d

        def timedOut(failure):
            self._cbStopClient(None)
            failure.trap(error.TimeoutError)

        def quit():
            return self.client.quit()

        self.client.timeout = 3
        #No need to leverage SSL for timeout test
        pop3TestServer.SSL_SUPPORT = False

        #Tell the server to not return a response to client.
        #This will trigger a timeout.
        pop3TestServer.TIMEOUT_RESPONSE = True

        methods = [login, quit]
        map(self.connected.addCallback, map(strip, methods))
        self.connected.addCallback(login)
        self.connected.addCallback(quit)
        self.connected.addCallback(self._cbStopClient)
        self.connected.addErrback(self._ebGeneral)
        self.loopback()

if ClientTLSContext is None:
    for case in (POP3TLSTestCase,):
        case.skip = "OpenSSL not present"
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev

Reply via email to