John Anderson wrote:

I agree with Brian. It also seems like our tests would be more valid if we were using real Twisted code instead of something else.

John

Phillip and Heikki you bring up some good points. In regards to the timeout issue any client and any server code used in testing should leverage the Twisted TimeoutMixin. This will force a timeout after a certain number of seconds specified. Phillip, you are correct that there is no concept of a simulated duration. But it is easy to lower the actual duration value. For example, in test_pop3client to lower the timeout value from 60 to 5 to force a shorter duration all I have to do is popClientInstance.timeout = 5. I guess my point was we may not need to add the extra burden of maintaining our own mock reator. I think it is still worth looking in to just leveraging the code twisted provides.




Brian Kirsch wrote:

Hi Phillip,
I am not sure if we really need the mock reactor. Twisted provides very good trial test support for creating local loopbacks between Twisted clients and Twisted servers. What is the issue with run and stop of a reactor? To my knowledge this should not be a problem. Also if all the unit tests were running in the same process why would you need to stop and start the reactor?

I have attached my recent submissions to twisted core which include a pop3TestServer, a pop3Client, and a unittest illustrating how to set up local client server communication.


Thoughts?




Phillip J. Eby wrote:

Hi. I've heard recently that there are some tests that ideally would need to run under the Twisted reactor, in order to properly exercise the functionality under test. However, there are a number of issues including the possible need for multiple uses of run/stop, dependency on external servers, test duration, etc.

There is, however, a relatively straightforward solution: use a mock reactor. I've successfully used this approach in the past to test event-driven libraries, although it was only with a subset of the full Twisted reactor capabilities. A mock reactor can be stopped, reset, and started as many times as you like, because it doesn't rely on hooking a GUI event loop, running in a separate thread, or anything like that.

A mock reactor can run in "simulated time", which means that it uses a time() function that runs faster than "real" time. For example, if you schedule a callback, and there's no pending simulated I/O or other scheduled calls, the simulated time jumps ahead to the next scheduled callback.

One additional side benefit of simulated time is that it's deterministic and therefore can be reliably reproduced in repeated tests. In PEAK, for example, I once wrote tests for some components that might be compared to WakeupCallers in Chandler. I had several scheduled to wake up on various intervals, and the test then verified that they had run at all the times they should have. Since the time is simulated, there were no rounding errors or clock precision to take into account, and the tests could instantaneously whether they were simulating seconds, minutes, or even hours of scheduling operations.

A mock reactor for Chandler tests could also use my "mockets" (fake sockets) library in order to avoid doing any "real" network I/O, allowing servers to listen and clients to connect to addresses on a virtual network that exists only in the process' memory, thereby avoiding the complexity of using external processes to set up and tear down servers or depending on other servers being up and having connectivity to them.

Although I don't have a complete mock reactor implementation, I do have most of the prerequisites and experience that would be needed to implement one, if anybody is interested. So, if you have things (like Zanshin, Chandler client protocols, etc.) that need reactor-based testing, and would be interested in helping me test a mock reactor for your test cases, let me know. It's also possible that this could be a joint project with the Twisted folks; as early as last year, Itamar expressed an interest in allowing test reactors to run using simulated time:

http://twistedmatrix.com/pipermail/twisted-python/2004-January/006982.html

And I would be surprised if they're not interested in having a mocket-based reactor as well. The last hacking I did on Twisted was around 1.1, so it might take me some time to get familiar with the 2.0 reactor interfaces. However, unless there are major differences I don't expect it to be difficult to do; Twisted is designed to isolate code from the underlying transport mechanism in use. About the only "interesting" part would likely be SSL/TLS, since I doubt M2Crypto and OpenSSL will want to talk to mocket objects instead of real sockets. It might be necessary to create mock SSL "Transport" objects as well as a mock reactor.

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev




------------------------------------------------------------------------

# -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) 2001-2004 Divmod Inc.
# See LICENSE for details.

"""POP3 client protocol implementation

Don't use this module directly.  Use twisted.mail.pop3 instead.

@author U{Jp Calderone<mailto:[EMAIL PROTECTED]>}

API Stability: Unstable
"""

import re, md5

from twisted.python import log
from twisted.internet import defer
from twisted.protocols import basic
from twisted.protocols import policies
from twisted.internet import error
from twisted.internet import interfaces

OK = '+OK'
ERR = '-ERR'

class POP3ClientError(Exception):
   """Base class for all exceptions raised by POP3Client.
   """

class InsecureAuthenticationDisallowed(POP3ClientError):
   """Secure authentication was required but no mechanism could be found.
   """

class TLSError(POP3ClientError):
   """Secure authentication was required but no mechanism could be found.
   """

class TLSNotSupportedError(POP3ClientError):
   """Secure authentication was required but no mechanism could be found.
   """

class OptionNotSupportedError(POP3ClientError):
   """Secure authentication was required but no mechanism could be found.
   """


class ServerErrorResponse(POP3ClientError):
   """The server returned an error response to a request.
   """
   def __init__(self, reason, consumer=None):
       POP3ClientError.__init__(self, reason)
       self.consumer = consumer

class LineTooLong(POP3ClientError):
   """The server sent an extremely long line.
   """

class _ListSetter:
   # Internal helper.  POP3 responses sometimes occur in the
   # form of a list of lines containing two pieces of data,
   # a message index and a value of some sort.  When a message
   # is deleted, it is omitted from these responses.  The
   # setitem method of this class is meant to be called with
   # these two values.  In the cases where indexes are skipped,
   # it takes care of padding out the missing values with None.
   def __init__(self, L):
       self.L = L
   def setitem(self, (item, value)):
       diff = item - len(self.L) + 1
       if diff > 0:
           self.L.extend([None] * diff)
       self.L[item] = value


def _statXform(line):
   # Parse a STAT response
   numMsgs, totalSize = line.split(None, 1)
   return int(numMsgs), int(totalSize)


def _listXform(line):
   # Parse a LIST response
   index, size = line.split(None, 1)
   return int(index) - 1, int(size)


def _uidXform(line):
   # Parse a UIDL response
   index, uid = line.split(None, 1)
   return int(index) - 1, uid

def _codeStatusSplit(line):
   # Parse an +OK or -ERR response
   parts = line.split(' ', 1)
   if len(parts) == 1:
       return parts[0], ''
   return parts

def _dotUnquoter(line):
   """
   '.' characters which begin a line of a message are doubled to avoid
   confusing with the terminating '.\r\n' sequence.  This function unquotes
   them.
   """
   if line.startswith('..'):
       return line[1:]
   return line

class POP3Client(basic.LineOnlyReceiver, policies.TimeoutMixin):
   """POP3 client protocol implementation class

   Instances of this class provide a convenient, efficient API for
   retrieving and deleting messages from a POP3 server.
   """

   # Capabilities are not allowed to change during the session
   # So cache the first response and use that for all later
   # lookups
   _capCache = None

   # Whether STARTTLS has been issued successfully yet or not.
   startedTLS = False

   # Indicate whether login() should be allowed if the server
   # offers no authentication challenge and if our transport
   # does not offer any protection via encryption.
   allowInsecureLogin = False

   # Regular expression to search for in the challenge string in the server
   # greeting line.
   challengeMagicRe = re.compile('(<[^>]+>)')

   # Challenge received from the server
   serverChallenge = None

   # List of pending calls.
   # We are a pipelining API but don't actually
   # support pipelining on the network yet.
   _blockedQueue = None

   # The Deferred to which the very next result will go.
   waiting = None

   # Number of seconds to wait before timing out a connection.
   # If the number is <= 0 no timeout checking will be performed.
   timeout = 0

   #Overides LineOnlyReceiver to set a larger max length.
   MAX_LENGTH = 16384 * 2

   def __init__(self, contextFactory = None):
       self.context = contextFactory
       self.timedOut = False


   def _blocked(self, f, *a):
       # Internal helper.  If commands are being blocked, append
       # the given command and arguments to a list and return a Deferred
       # that will be chained with the return value of the function
       # when it eventually runs.  Otherwise, set up for commands to be

       # blocked and return None.
       if self._blockedQueue is not None:
           d = defer.Deferred()
           self._blockedQueue.append((d, f, a))
           return d
       self._blockedQueue = []
       return None

   def _unblock(self):
       # Internal helper.  Indicate that a function has completed.
       # If there are blocked commands, run the next one.  If there
       # are not, set up for the next command to not be blocked.
       if self._blockedQueue == []:
           self._blockedQueue = None
       elif self._blockedQueue is not None:
           d, f, a = self._blockedQueue.pop(0)

           d2 = f(*a)
           d2.chainDeferred(d)

   def sendShort(self, cmd, args):
       # Internal helper.  Send a command to which a short response
       # is expected.  Return a Deferred that fires when the response
       # is received.  Block all further commands from being sent until
       # the response is received.  Transition the state to SHORT.
       d = self._blocked(self.sendShort, cmd, args)
       if d is not None:
           return d

       if args:
           self.sendLine(cmd + ' ' + args)
       else:
           self.sendLine(cmd)
       self.state = 'SHORT'
       self.waiting = defer.Deferred()
       return self.waiting

   def sendLong(self, cmd, args, consumer, xform):
       # Internal helper.  Send a command to which a multiline
       # response is expected.  Return a Deferred that fires when
       # the entire response is received.  Block all further commands
       # from being sent until the entire response is received.
       # Transition the state to LONG_INITIAL.
       d = self._blocked(self.sendLong, cmd, args, consumer, xform)
       if d is not None:
           return d

       if args:
           self.sendLine(cmd + ' ' + args)
       else:
           self.sendLine(cmd)
       self.state = 'LONG_INITIAL'
       self.xform = xform
       self.consumer = consumer
       self.waiting = defer.Deferred()
       return self.waiting

   # Twisted protocol callback
   def connectionMade(self):
       if self.timeout > 0:
           self.setTimeout(self.timeout)

       self.state = 'WELCOME'

   def timeoutConnection(self):
       self.timedOut = True
       self.transport.loseConnection()

   def connectionLost(self, reason):
       if self.timeout > 0:
           self.setTimeout(None)

       if self.timedOut:
           reason = error.TimeoutError()
           self.timedOut = False

       d = []
       if self.waiting is not None:
           d.append(self.waiting)
           self.waiting = None
       if self._blockedQueue is not None:
           d.extend([deferred for (deferred, f, a) in self._blockedQueue])
           self._blockedQueue = None
       for w in d:
           w.errback(reason)

   def lineReceived(self, line):
       if self.timeout > 0:
           self.resetTimeout()

       state = self.state
       self.state = None
       state = getattr(self, 'state_' + state)(line) or state
       if self.state is None:
           self.state = state

   def lineLengthExceeded(self, buffer):
       # XXX - We need to be smarter about this
       if self.waiting is not None:
           waiting, self.waiting = self.waiting, None
           waiting.errback(LineTooLong())
       self.transport.loseConnection()

   # POP3 Client state logic - don't touch this.
   def state_WELCOME(self, line):
       # WELCOME is the first state.  The server sends one line of text
       # greeting us, possibly with an APOP challenge.  Transition the
       # state to WAITING.
       code, status = _codeStatusSplit(line)
       if code != OK:
           #XXX: Should raise some kind of error here
           self.transport.loseConnection()
       else:
           m = self.challengeMagicRe.search(status)

           if m is not None:
               self.serverChallenge = m.group(1)

           self.serverGreeting(self.serverChallenge)

       return 'WAITING'

   def state_WAITING(self, line):
       # The server isn't supposed to send us anything in this state.
       log.msg("Illegal line from server: " + repr(line))

   def state_SHORT(self, line):
       # This is the state we are in when waiting for a single
       # line response.  Parse it and fire the appropriate callback
       # or errback.  Transition the state back to WAITING.
       deferred, self.waiting = self.waiting, None
       self._unblock()
       code, status = _codeStatusSplit(line)
       if code == OK:
           deferred.callback(status)
       else:
           deferred.errback(ServerErrorResponse(status))
       return 'WAITING'

   def state_LONG_INITIAL(self, line):
       # This is the state we are in when waiting for the first
       # line of a long response.  Parse it and transition the
       # state to LONG if it is an okay response; if it is an
       # error response, fire an errback, clean up the things
       # waiting for a long response, and transition the state
       # to WAITING.
       code, status = _codeStatusSplit(line)
       if code == OK:
           return 'LONG'
       consumer = self.consumer
       deferred = self.waiting
       self.consumer = self.waiting = self.xform = None
       self._unblock()
       deferred.errback(ServerErrorResponse(status, consumer))
       return 'WAITING'

   def state_LONG(self, line):
       # This is the state for each line of a long response.
       # If it is the last line, finish things, fire the
       # Deferred, and transition the state to WAITING.
       # Otherwise, pass the line to the consumer.
       if line == '.':
           consumer = self.consumer
           deferred = self.waiting
           self.consumer = self.waiting = self.xform = None
           self._unblock()
           deferred.callback(consumer)
           return 'WAITING'
       else:
           if self.xform is not None:
               self.consumer(self.xform(line))
           else:
               self.consumer(line)
           return 'LONG'

   def serverGreeting(self, challenge):
       """Called when the server has sent us a greeting.

          @type challenge: C{Str} (None if no challenge returned in the Server 
Greeting)
          @param challenge: A POP3 server which implements the APOP command will
                            include a timestamp challenge in its banner 
greeting (RFC 1939).
                            .
       """

   def startTLS(self, contextFactory=None):
       """
       Initiates a 'STLS' request and negotiates the TLS / SSL
       Handshake.

       @param contextFactory: The TLS / SSL Context Factory to
       leverage.  If the contextFactory is None the POP3Client will
       either use the current TLS / SSL Context Factory or attempt to
       create a new one.

       @type contextFactory: C{ssl.ClientContextFactory}

       @return: A Deferred which fires when the transport has been
       secured according to the given contextFactory, or which fails
       if the transport cannot be secured.
       """

       if self._capCache is None:
           d = self.capabilities()

       else:
           d = defer.succeed(self._capCache)

       d.addCallback(self._startTLS, contextFactory)
       return d


   def _startTLS(self, caps, contextFactory):
       assert not self.startedTLS, "Client and Server are currently communicating 
via TLS"

       if contextFactory is None:
           contextFactory = self._getContextFactory()

       if contextFactory is None:
           return defer.fail(TLSError(
               "POP3Client requires a TLS context to "
               "initiate the STARTTLS handshake"))

       if 'STLS' not in caps:
           return defer.fail(TLSNotSupportedError(
               "Server does not support secure communication "
               "via TLS / SSL"))

       tls = interfaces.ITLSTransport(self.transport, None)

       if tls is None:
           return defer.fail(TLSError(
               "POP3Client transport does not implement "
               "interfaces.ITLSTransport"))

       d = self.sendShort('STLS', None)
       d.addCallback(self._startedTLS, contextFactory)
       d.addCallback(lambda _: self.capabilities())
       return d

   def _startedTLS(self, result, context):
       self.transport.startTLS(context)
       self._capCache = None
       self.startedTLS = True
       self.context = context
       return result

   def _getContextFactory(self):
       if self.context is not None:
           return self.context
       try:
           from twisted.internet import ssl
       except ImportError:
           return None
       else:
           context = ssl.ClientContextFactory()
           context.method = ssl.SSL.TLSv1_METHOD
           return context

   # External hooks - call these (most of 'em anyway)
   def login(self, username, password):
       """Log into the server.

       If APOP is available it will be used.  Otherwise, if
       TLS is available a 'STLS' session will be started and
       plaintext login will proceed.  Otherwise, if the
       instance attribute allowInsecureLogin is set to True,
       insecure plaintext login will proceed.  Otherwise,
       InsecureAuthenticationDisallowed will be raised
       (asynchronously).

       @param username: The username with which to log in.
       @param password: The password with which to log in.

       @rtype: C{Deferred}
       @return: A deferred which fires when login has
       completed.
       """
       if self._capCache is None:
           d = self.capabilities()

       else:
           d = defer.succeed(self._capCache)

       d.addCallback(self._login, username, password)
       return d

   def _login(self, caps, username, password):
       if self.serverChallenge is not None:
           return self._apop(username, password, self.serverChallenge)

       tryTLS = 'STLS' in caps

       #If our transport supports switching to TLS, we might want to try to 
switch to TLS.
       tlsableTransport = interfaces.ITLSTransport(self.transport, 
default=None) is not None

       # If our transport is not already using TLS, we might want to try to 
switch to TLS.
       nontlsTransport = interfaces.ISSLTransport(self.transport, default=None) 
is None

       if not self.startedTLS and tryTLS and tlsableTransport and 
nontlsTransport:
           d = self.startTLS()

           d.addCallback(self._loginTLS, username, password)
           return d

       elif self.startedTLS or self.allowInsecureLogin:
           return self._plaintext(username, password)
       else:
           return defer.fail(InsecureAuthenticationDisallowed())

   def _loginTLS(self, res, username, password):
       return self._plaintext(username, password)

   def _plaintext(self, username, password):
       # Internal helper.  Send a username/password pair, returning a Deferred
       # that fires when both have succeeded or fails when the server rejects
       # either.
       return self.user(username).addCallback(lambda r: self.password(password))

   def _apop(self, username, password, challenge):
       # Internal helper.  Computes and sends an APOP response.  Returns
       # a Deferred that fires when the server responds to the response.
       digest = md5.new(challenge + password).hexdigest()
       return self.apop(username, digest)

   def apop(self, username, digest):
       """Perform APOP login.

       This should be used in special circumstances only, when it is
       known that the server supports APOP authentication, and APOP
       authentication is absolutely required.  For the common case,
       use L{login} instead.

       @param username: The username with which to log in.
       @param digest: The challenge response to authenticate with.
       """
       return self.sendShort('APOP', username + ' ' + digest)

   def user(self, username):
       """Send the user command.

       This performs the first half of plaintext login.  Unless this
       is absolutely required, use the L{login} method instead.

       @param username: The username with which to log in.
       """
       return self.sendShort('USER', username)

   def password(self, password):
       """Send the password command.

       This performs the second half of plaintext login.  Unless this
       is absolutely required, use the L{login} method instead.

       @param password: The plaintext password with which to authenticate.
       """
       return self.sendShort('PASS', password)

   def delete(self, index):
       """Delete a message from the server.

       @type index: C{int}
       @param index: The index of the message to delete.
       This is 0-based.

       @rtype: C{Deferred}
       @return: A deferred which fires when the delete command
       is successful, or fails if the server returns an error.
       """
       return self.sendShort('DELE', str(index + 1))

   def _consumeOrSetItem(self, cmd, args, consumer, xform):
       # Internal helper.  Send a long command.  If no consumer is
       # provided, create a consumer that puts results into a list
       # and return a Deferred that fires with that list when it
       # is complete.
       if consumer is None:
           L = []
           consumer = _ListSetter(L).setitem
           return self.sendLong(cmd, args, consumer, xform).addCallback(lambda 
r: L)
       return self.sendLong(cmd, args, consumer, xform)

   def _consumeOrAppend(self, cmd, args, consumer, xform):
       # Internal helper.  Send a long command.  If no consumer is
       # provided, create a consumer that appends results to a list
       # and return a Deferred that fires with that list when it is
       # complete.
       if consumer is None:
           L = []
           consumer = L.append
           return self.sendLong(cmd, args, consumer, xform).addCallback(lambda 
r: L)
       return self.sendLong(cmd, args, consumer, xform)

   def capabilities(self, useCache=1):
       """Retrieve the capabilities supported by this server.
       """
       if useCache and self._capCache is not None:
           return defer.succeed(self._capCache)

       #Reset the Capabilities Cache
       self._capCache = {}

       d = self._consumeOrAppend('CAPA', None, self._capsConsumer, None)
       #cabilities is not supported by some POP servers. If an error
       #is thrown we still want the same behavior
       d.addBoth(self._cbCapabilities)
       return d

   def _cbCapabilities(self, result):
       """Returns the Capabilities to the caller"""
       return self._capCache


   def _capsConsumer(self, line):
       tmp = line.split()

       size = len(tmp)

       if size == 0:
           return

       if size == 1:
           self._capCache[tmp[0]] = None
       else:
           self._capCache[tmp[0]] = tmp[1:]

   def noop(self):
       return self.sendShort("NOOP", None)

   def rset(self):
       return self.sendShort("RSET", None)

   def retrieve(self, index, consumer=None, lines=None):
       """Retrieve a message from the server.

       If L{consumer} is not None, it will be called with
       each line of the message as it is received.  Otherwise,
       the returned Deferred will be fired with a list of all
       the lines when the message has been completely received.
       """
       idx = str(index + 1)
       if lines is None:
           return self._consumeOrAppend('RETR', idx, consumer, _dotUnquoter)

       return self._consumeOrAppend('TOP', '%s %d' % (idx, lines), consumer, 
_dotUnquoter)


   def stat(self):
       """Issues a 'STAT' request which is allowed in the TRANSACTION state 
(RFC 1939).
          The returned Deferred will be fired with a tuple containing the
          number or messages in the maildrop and the size of the
          maildrop in octets.
       """
       return self.sendShort('STAT', None).addCallback(_statXform)

   def listSize(self, consumer=None):
       """Retrieve a list of the size of all messages on the server.

       If L{consumer} is not None, it will be called with two-tuples
       of message index number and message size as they are received.
       Otherwise, a Deferred which will fire with a list of B{only}
       message sizes will be returned.  For messages which have been
       deleted, None will be used in place of the message size.
       """
       return self._consumeOrSetItem('LIST', None, consumer, _listXform)

   def listUID(self, consumer=None):
       """Retrieve a list of the UIDs of all messages on the server.

       If L{consumer} is not None, it will be called with two-tuples
       of message index number and message UID as they are received.
       Otherwise, a Deferred which will fire with of list of B{only}
       message UIDs will be returned.  For messages which have been
       deleted, None will be used in place of the message UID.
       """

       return self._consumeOrSetItem('UIDL', None, consumer, _uidXform)

   def quit(self):
       """Disconnect from the server.
       """
       return self.sendShort('QUIT', None)

__all__ = [
   # Exceptions
   'InsecureAuthenticationDisallowed', 'LineTooLong', 'POP3ClientError',
   'ServerErrorResponse', 'TLSError', 'TLSNotSupportedError', 
'OptionNotSupported',

   # Protocol classes
   'POP3Client']
------------------------------------------------------------------------

#!/usr/local/bin/python
from twisted.internet.protocol import Factory
from twisted.protocols import basic
from twisted.internet import reactor
import sys

USER = "test"
PASS = "twisted"

PORT = 1100

SSL_SUPPORT = True
UIDL_SUPPORT = True
INVALID_SERVER_RESPONSE = False
INVALID_CAPABILITY_RESPONSE = False
INVALID_LOGIN_RESPONSE = False
DENY_CONNECTION = False
DROP_CONNECTION = False
BAD_TLS_RESPONSE = False
TIMEOUT_RESPONSE = False
TIMEOUT_DEFERRED = False
SLOW_GREETING = False

"""Commands"""
CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
CAPABILITIES = [
"TOP",
"LOGIN-DELAY 180",
"USER",
"SASL LOGIN"
]

CAPABILITIES_SSL = "STLS"
CAPABILITIES_UIDL = "UIDL"


INVALID_RESPONSE = "-ERR Unknown request"
VALID_RESPONSE = "+OK Command Completed"
AUTH_DECLINED = "-ERR LOGIN failed"
AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
TLS_ERROR = "-ERR server side error start TLS handshake"
LOGOUT_COMPLETE = "+OK quit completed"
NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
STAT = "+OK 0 0"
UIDL = "+OK Unique-ID listing follows\r\n."
LIST = "+OK Mailbox scan listing follows\r\n."
CAP_START = "+OK Capability list follows:"


class POP3TestServer(basic.LineReceiver):
   def __init__(self, contextFactory = None):
       self.loggedIn = False
       self.caps = None
       self.tmpUser = None
self.ctx = contextFactory
   def sendSTATResp(self, req):
       self.sendLine(STAT)

   def sendUIDLResp(self, req):
       self.sendLine(UIDL)

   def sendLISTResp(self, req):
       self.sendLine(LIST)

   def sendCapabilities(self):
       if self.caps is None:
           self.caps = [CAP_START]

       if UIDL_SUPPORT:
           self.caps.append(CAPABILITIES_UIDL)

       if SSL_SUPPORT:
           self.caps.append(CAPABILITIES_SSL)

       for cap in CAPABILITIES:
           self.caps.append(cap)
       resp = '\r\n'.join(self.caps)
       resp += "\r\n."

       self.sendLine(resp)


   def connectionMade(self):
       if DENY_CONNECTION:
           self.transport.loseConnection()
           return

       if SLOW_GREETING:
           reactor.callLater(20, self.sendGreeting)

       else:
           self.sendGreeting()

   def sendGreeting(self):
       self.sendLine(CONNECTION_MADE)

   def lineReceived(self, line):
       """Error Conditions"""
       if TIMEOUT_RESPONSE:
           """Do not respond to clients request"""
           return

       if DROP_CONNECTION:
           self.transport.loseConnection()
           return

       elif "CAPA" in line.upper():
           if INVALID_CAPABILITY_RESPONSE:
               self.sendLine(INVALID_RESPONSE)
           else:
               self.sendCapabilities()

       elif "STLS" in line.upper() and SSL_SUPPORT:
           self.startTLS()

       elif "USER" in line.upper():
           if INVALID_LOGIN_RESPONSE:
               self.sendLine(INVALID_RESPONSE)
               return

           resp = None
           try:
               self.tmpUser = line.split(" ")[1]
               resp = VALID_RESPONSE
           except:
               resp = AUTH_DECLINED

           self.sendLine(resp)

       elif "PASS" in line.upper():
           resp = None
           try:
               pwd = line.split(" ")[1]

               if self.tmpUser is None or pwd is None:
                   resp = AUTH_DECLINED
               elif self.tmpUser == USER and pwd == PASS:
                   resp = AUTH_ACCEPTED
                   self.loggedIn = True
               else:
                   resp = AUTH_DECLINED
           except:
               resp = AUTH_DECLINED

           self.sendLine(resp)

       elif "QUIT" in line.upper():
           self.loggedIn = False
           self.sendLine(LOGOUT_COMPLETE)
           self.disconnect()

       elif INVALID_SERVER_RESPONSE:
           self.sendLine(INVALID_RESPONSE)

       elif not self.loggedIn:
           self.sendLine(NOT_LOGGED_IN)

       elif "NOOP" in line.upper():
           self.sendLine(VALID_RESPONSE)

       elif "STAT" in line.upper():
           if TIMEOUT_DEFERRED:
               return
           self.sendLine(STAT)

       elif "LIST" in line.upper():
           if TIMEOUT_DEFERRED:
               return
           self.sendLine(LIST)

       elif "UIDL" in line.upper():
           if TIMEOUT_DEFERRED:
               return
           elif not UIDL_SUPPORT:
               self.sendLine(INVALID_RESPONSE)
               return

           self.sendLine(UIDL)

   def startTLS(self):
       if self.ctx is None:
           self.getContext()

       if SSL_SUPPORT and self.ctx is not None:
           self.sendLine('+OK Begin TLS negotiation now')
           self.transport.startTLS(self.ctx)
       else:
           self.sendLine('+OK TLS not available')

   def disconnect(self):
       self.transport.loseConnection()

   def getContext(self):
       try:
           from twisted.internet import ssl
       except ImportError:
          self.ctx = None
       else:
           self.ctx = ssl.ClientContextFactory()
           self.ctx.method = ssl.SSL.TLSv1_METHOD


usage = """popServer.py [arg] (default is Standard POP Server with no messages)
no_ssl  - Start with no SSL support
no_uidl - Start with no UIDL support
bad_resp - Send a non-RFC compliant response to the Client
bad_cap_resp - send a non-RFC compliant response when the Client sends a 
'CAPABILITY' request
bad_login_resp - send a non-RFC compliant response when the Client sends a 
'LOGIN' request
deny - Deny the connection
drop - Drop the connection after sending the greeting
bad_tls - Send a bad response to a STARTTLS
timeout - Do not return a response to a Client request
to_deferred - Do not return a response on a 'Select' request. This
             will test Deferred callback handling
slow - Wait 20 seconds after the connection is made to return a Server Greeting
"""

def printMessage(msg):
   print "Server Starting in %s mode" % msg

def processArg(arg):

   if arg.lower() == 'no_ssl':
       global SSL_SUPPORT
       SSL_SUPPORT = False
       printMessage("NON-SSL")

   elif arg.lower() == 'no_uidl':
       global UIDL_SUPPORT
       UIDL_SUPPORT = False
       printMessage("NON-UIDL")

   elif arg.lower() == 'bad_resp':
       global INVALID_SERVER_RESPONSE
       INVALID_SERVER_RESPONSE = True
       printMessage("Invalid Server Response")

   elif arg.lower() == 'bad_cap_resp':
       global INVALID_CAPABILITY_RESPONSE
       INVALID_CAPABILITY_RESPONSE = True
       printMessage("Invalid Capability Response")

   elif arg.lower() == 'bad_login_resp':
       global INVALID_LOGIN_RESPONSE
       INVALID_LOGIN_RESPONSE = True
       printMessage("Invalid Capability Response")

   elif arg.lower() == 'deny':
global DENY_CONNECTION DENY_CONNECTION = True
       printMessage("Deny Connection")

   elif arg.lower() == 'drop':
global DROP_CONNECTION DROP_CONNECTION = True
       printMessage("Drop Connection")


   elif arg.lower() == 'bad_tls':
global BAD_TLS_RESPONSE BAD_TLS_RESPONSE = True
       printMessage("Bad TLS Response")

   elif arg.lower() == 'timeout':
       global TIMEOUT_RESPONSE
       TIMEOUT_RESPONSE = True
       printMessage("Timeout Response")

   elif arg.lower() == 'to_deferred':
       global TIMEOUT_DEFERRED
       TIMEOUT_DEFERRED = True
       printMessage("Timeout Deferred Response")

   elif arg.lower() == 'slow':
       global SLOW_GREETING
       SLOW_GREETING = True
       printMessage("Slow Greeting")

   elif arg.lower() == '--help':
       print usage
       sys.exit()

   else:
       print usage
       sys.exit()

def main():

   if len(sys.argv) < 2:
       printMessage("POP3 with no messages")
   else:
       args = sys.argv[1:]

       for arg in args:
           processArg(arg)

   f = Factory()
   f.protocol = POP3TestServer
   reactor.listenTCP(PORT, f)
   reactor.run()

if __name__ == '__main__':
   main()
------------------------------------------------------------------------

# -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) 2001-2004 Divmod Inc.
# See LICENSE for details.

from twisted.mail.pop3 import AdvancedPOP3Client as POP3Client
from twisted.mail.pop3 import InsecureAuthenticationDisallowed
from twisted.mail.pop3 import ServerErrorResponse
from twisted.protocols import loopback
from twisted.internet import defer

from twisted.trial import unittest
from twisted.test.proto_helpers import StringTransport
from twisted.protocols import basic
import pop3TestServer

try:
   from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
except ImportError:
   ClientTLSContext = ServerTLSContext = None

capCache = {"TOP": None, "LOGIN-DELAY": "180", "UIDL": None, \
           "STLS": None, "USER": None, "SASL": "LOGIN"}
def setUp():
   p = POP3Client()

   p._capCache = capCache

   t = StringTransport()
   p.makeConnection(t)
   return p, t

def strip(f):
   return lambda result, f=f: f()

class POP3ClientLoginTestCase(unittest.TestCase):
   def testOkUser(self):
       p, t = setUp()
       d = p.user("username")
       self.assertEquals(t.value(), "USER username\r\n")
       p.dataReceived("+OK send password\r\n")
       return d.addCallback(unittest.assertEqual, "send password")

   def testBadUser(self):
       p, t = setUp()
       d = p.user("username")
       self.assertEquals(t.value(), "USER username\r\n")
       p.dataReceived("-ERR account suspended\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "account suspended")

   def testOkPass(self):
       p, t = setUp()
       d = p.password("password")
       self.assertEquals(t.value(), "PASS password\r\n")
       p.dataReceived("+OK you're in!\r\n")
       return d.addCallback(unittest.assertEqual, "you're in!")

   def testBadPass(self):
       p, t = setUp()
       d = p.password("password")
       self.assertEquals(t.value(), "PASS password\r\n")
       p.dataReceived("-ERR go away\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "go away")

   def testOkLogin(self):
       p, t = setUp()
       p.allowInsecureLogin = True
       d = p.login("username", "password")
       self.assertEquals(t.value(), "USER username\r\n")
       p.dataReceived("+OK go ahead\r\n")
       self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
       p.dataReceived("+OK password accepted\r\n")
       return d.addCallback(unittest.assertEqual, "password accepted")

   def testBadPasswordLogin(self):
       p, t = setUp()
       p.allowInsecureLogin = True
       d = p.login("username", "password")
       self.assertEquals(t.value(), "USER username\r\n")
       p.dataReceived("+OK waiting on you\r\n")
       self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
       p.dataReceived("-ERR bogus login\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "bogus login")

   def testBadUsernameLogin(self):
       p, t = setUp()
       p.allowInsecureLogin = True
       d = p.login("username", "password")
       self.assertEquals(t.value(), "USER username\r\n")
       p.dataReceived("-ERR bogus login\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "bogus login")

   def testServerGreeting(self):
       p, t = setUp()
       # Make sure it *isn't* in the instance dict, just for sanity
       self.failIfIn('serverChallenge', vars(p))
       p.dataReceived("+OK lalala this has no challenge\r\n")
       # Make sure it *is* in the instance dict and that it is None
       self.assertEquals(p.serverChallenge, None)

   def testServerGreetingWithChallenge(self):
       p, t = setUp()
       # Make sure it *isn't* in the instance dict, just for sanity
       self.failIfIn('serverChallenge', vars(p))
       p.dataReceived("+OK <here is the challenge>\r\n")
       # Make sure it *is* in the instance dict and is what we sent
       self.assertEquals(vars(p)['serverChallenge'], "<here is the challenge>")

   def testAPOP(self):
       p, t = setUp()
       p.dataReceived("+OK <challenge string goes here>\r\n")
       d = p.login("username", "password")
       self.assertEquals(t.value(), "APOP username 
f34f1e464d0d7927607753129cabe39a\r\n")
       p.dataReceived("+OK Welcome!\r\n")
       return d.addCallback(unittest.assertEqual, "Welcome!")

   def testInsecureLoginRaisesException(self):
       p, t = setUp()
       p.dataReceived("+OK Howdy")
       d = p.login("username", "password")
       self.failIf(t.value())
       self.assertRaises(InsecureAuthenticationDisallowed, unittest.wait, d)

class ListConsumer:
   def __init__(self):
       self.data = {}

   def consume(self, (item, value)):
       self.data.setdefault(item, []).append(value)

class MessageConsumer:
   def __init__(self):
       self.data = []

   def consume(self, line):
       self.data.append(line)

class POP3ClientListTestCase(unittest.TestCase):
   def testListSize(self):
       p, t = setUp()
       d = p.listSize()
       self.assertEquals(t.value(), "LIST\r\n")
       p.dataReceived("+OK Here it comes\r\n")
       p.dataReceived("1 3\r\n2 2\r\n3 1\r\n.\r\n")
       return d.addCallback(unittest.assertEqual, [3, 2, 1])

   def testListSizeWithConsumer(self):
       p, t = setUp()
       c = ListConsumer()
       f = c.consume
       d = p.listSize(f)
       self.assertEquals(t.value(), "LIST\r\n")
       p.dataReceived("+OK Here it comes\r\n")
       p.dataReceived("1 3\r\n2 2\r\n3 1\r\n")
       self.assertEquals(c.data, {0: [3], 1: [2], 2: [1]})
       p.dataReceived("5 3\r\n6 2\r\n7 1\r\n")
       self.assertEquals(c.data, {0: [3], 1: [2], 2: [1], 4: [3], 5: [2], 6: 
[1]})
       p.dataReceived(".\r\n")
       return d.addCallback(unittest.assertIdentical, f)

   def testFailedListSize(self):
       p, t = setUp()
       d = p.listSize()
       self.assertEquals(t.value(), "LIST\r\n")
       p.dataReceived("-ERR Fatal doom server exploded\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "Fatal doom server exploded")

   def testListUID(self):
       p, t = setUp()
       d = p.listUID()
       self.assertEquals(t.value(), "UIDL\r\n")
       p.dataReceived("+OK Here it comes\r\n")
       p.dataReceived("1 abc\r\n2 def\r\n3 ghi\r\n.\r\n")
       return d.addCallback(unittest.assertEqual, ["abc", "def", "ghi"])

   def testListUIDWithConsumer(self):
       p, t = setUp()
       c = ListConsumer()
       f = c.consume
       d = p.listUID(f)
       self.assertEquals(t.value(), "UIDL\r\n")
       p.dataReceived("+OK Here it comes\r\n")
       p.dataReceived("1 xyz\r\n2 abc\r\n5 mno\r\n")
       self.assertEquals(c.data, {0: ["xyz"], 1: ["abc"], 4: ["mno"]})
       p.dataReceived(".\r\n")
       return d.addCallback(unittest.assertIdentical, f)

   def testFailedListUID(self):
       p, t = setUp()
       d = p.listUID()
       self.assertEquals(t.value(), "UIDL\r\n")
       p.dataReceived("-ERR Fatal doom server exploded\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "Fatal doom server exploded")

class POP3ClientMessageTestCase(unittest.TestCase):
   def testRetrieve(self):
       p, t = setUp()
       d = p.retrieve(7)
       self.assertEquals(t.value(), "RETR 8\r\n")
       p.dataReceived("+OK Message incoming\r\n")
       p.dataReceived("La la la here is message text\r\n")
       p.dataReceived("..Further message text tra la la\r\n")
       p.dataReceived(".\r\n")
       return d.addCallback(
unittest.assertEqual, ["La la la here is message text",
            ".Further message text tra la la"])

   def testRetrieveWithConsumer(self):
       p, t = setUp()
       c = MessageConsumer()
       f = c.consume
       d = p.retrieve(7, f)
       self.assertEquals(t.value(), "RETR 8\r\n")
       p.dataReceived("+OK Message incoming\r\n")
       p.dataReceived("La la la here is message text\r\n")
       p.dataReceived("..Further message text\r\n.\r\n")
       self.assertIdentical(unittest.wait(d), f)
       self.assertEquals(c.data, ["La la la here is message text",
                                  ".Further message text"])

   def testPartialRetrieve(self):
       p, t = setUp()
       d = p.retrieve(7, lines=2)
       self.assertEquals(t.value(), "TOP 8 2\r\n")
       p.dataReceived("+OK 2 lines on the way\r\n")
       p.dataReceived("Line the first!  Woop\r\n")
       p.dataReceived("Line the last!  Bye\r\n")
       p.dataReceived(".\r\n")
       return d.addCallback(
           unittest.assertEqual,
           ["Line the first!  Woop",
            "Line the last!  Bye"])

   def testPartialRetrieveWithConsumer(self):
       p, t = setUp()
       c = MessageConsumer()
       f = c.consume
       d = p.retrieve(7, f, lines=2)
       self.assertEquals(t.value(), "TOP 8 2\r\n")
       p.dataReceived("+OK 2 lines on the way\r\n")
       p.dataReceived("Line the first!  Woop\r\n")
       p.dataReceived("Line the last!  Bye\r\n")
       p.dataReceived(".\r\n")
       self.assertIdentical(unittest.wait(d), f)
       self.assertEquals(c.data, ["Line the first!  Woop",
                                  "Line the last!  Bye"])

   def testFailedRetrieve(self):
       p, t = setUp()
       d = p.retrieve(0)
       self.assertEquals(t.value(), "RETR 1\r\n")
       p.dataReceived("-ERR Fatal doom server exploded\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "Fatal doom server exploded")

class POP3ClientMiscTestCase(unittest.TestCase):
   def testCapability(self):
       p, t = setUp()
       d = p.capabilities(useCache=0)
       self.assertEquals(t.value(), "CAPA\r\n")
       p.dataReceived("+OK Capabilities on the way\r\n")
       p.dataReceived("X\r\nY\r\nZ\r\n.\r\n")
       return d.addCallback(unittest.assertEqual, {"X": None, "Y": None, "Z": 
None})

   def testCapabilityError(self):
       p, t = setUp()
       d = p.capabilities(useCache=0)
       self.assertEquals(t.value(), "CAPA\r\n")
       p.dataReceived("-ERR This server is lame!\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "This server is lame!")

   def testStat(self):
       p, t = setUp()
       d = p.stat()
       self.assertEquals(t.value(), "STAT\r\n")
       p.dataReceived("+OK 1 1212\r\n")
       return d.addCallback(unittest.assertEqual, (1, 1212))

   def testStatError(self):
       p, t = setUp()
       d = p.stat()
       self.assertEquals(t.value(), "STAT\r\n")
       p.dataReceived("-ERR This server is lame!\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "This server is lame!")

   def testNoop(self):
       p, t = setUp()
       d = p.noop()
       self.assertEquals(t.value(), "NOOP\r\n")
       p.dataReceived("+OK No-op to you too!\r\n")
       return d.addCallback(unittest.assertEqual, "No-op to you too!")

   def testNoopError(self):
       p, t = setUp()
       d = p.noop()
       self.assertEquals(t.value(), "NOOP\r\n")
       p.dataReceived("-ERR This server is lame!\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "This server is lame!")

   def testRset(self):
       p, t = setUp()
       d = p.rset()
       self.assertEquals(t.value(), "RSET\r\n")
       p.dataReceived("+OK Reset state\r\n")
       return d.addCallback(unittest.assertEqual, "Reset state")

   def testRsetError(self):
       p, t = setUp()
       d = p.rset()
       self.assertEquals(t.value(), "RSET\r\n")
       p.dataReceived("-ERR This server is lame!\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "This server is lame!")

   def testDelete(self):
       p, t = setUp()
       d = p.delete(3)
       self.assertEquals(t.value(), "DELE 4\r\n")
       p.dataReceived("+OK Hasta la vista\r\n")
       return d.addCallback(unittest.assertEqual, "Hasta la vista")

   def testDeleteError(self):
       p, t = setUp()
       d = p.delete(3)
       self.assertEquals(t.value(), "DELE 4\r\n")
       p.dataReceived("-ERR Winner is not you.\r\n")
       exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
       self.assertEquals(exc.args[0], "Winner is not you.")


class SimpleClient(POP3Client):
   def __init__(self, deferred, contextFactory = None):
       POP3Client.__init__(self, contextFactory)
       self.deferred = deferred
       self.allowInsecureLogin = True

   def serverGreeting(self, challenge):
       self.deferred.callback(None)

class POP3HelperMixin:
   serverCTX = None
   clientCTX = None

   def setUp(self):
       d = defer.Deferred()
       self.server = 
pop3TestServer.POP3TestServer(contextFactory=self.serverCTX)
       self.client = SimpleClient(d, contextFactory=self.clientCTX)
       self.client.timeout = 30
       self.connected = d

   def tearDown(self):
       del self.server
       del self.client
       del self.connected

   def _cbStopClient(self, ignore):
       self.client.transport.loseConnection()

   def _ebGeneral(self, failure):
       self.client.transport.loseConnection()
       self.server.transport.loseConnection()
       failure.printTraceback(open('failure.log', 'w'))
       failure.printTraceback()
       raise failure.value

   def loopback(self):
       loopback.loopbackTCP(self.server, self.client, noisy=False)

class POP3TLSTestCase(POP3HelperMixin, unittest.TestCase):
   serverCTX = ServerTLSContext and ServerTLSContext()
   clientCTX = ClientTLSContext and ClientTLSContext()

   def testStartTLS(self):
       def login():
           #this will startTLS automatically
           return self.client.login('test', 'twisted')

       def quit():
           return self.client.quit()

       methods = [login, quit]
       map(self.connected.addCallback, map(strip, methods))
       self.connected.addCallback(login)
       self.connected.addCallback(quit)
       self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
       self.loopback()

class POP3TimeoutTestCase(POP3HelperMixin, unittest.TestCase):
   def testTimeout(self):
       def login():
           #this will startTLS automatically
           d = self.client.login('test', 'twisted')
           d.addErrback(timedOut)
           return d

       def timedOut(failure):
           self._cbStopClient(None)
           failure.trap(error.TimeoutError)

       def quit():
           return self.client.quit()

       self.client.timeout = 3
       #No need to leverage SSL for timeout test
       pop3TestServer.SSL_SUPPORT = False

       #Tell the server to not return a response to client.
       #This will trigger a timeout.
       pop3TestServer.TIMEOUT_RESPONSE = True

       methods = [login, quit]
       map(self.connected.addCallback, map(strip, methods))
       self.connected.addCallback(login)
       self.connected.addCallback(quit)
       self.connected.addCallback(self._cbStopClient)
       self.connected.addErrback(self._ebGeneral)
       self.loopback()

if ClientTLSContext is None:
   for case in (POP3TLSTestCase,):
       case.skip = "OpenSSL not present"
------------------------------------------------------------------------

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev


--
Brian Kirsch - Email Framework Engineer
Open Source Applications Foundation
543 Howard St. 5th Floor
San Francisco, CA 94105
(415) 946-3056
http://www.osafoundation.org

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev

Reply via email to