Hi Phillip,
I am not sure if we really need the mock reactor. Twisted provides
very good trial test support for creating local loopbacks between
Twisted clients and Twisted servers. What is the issue with run and
stop of a reactor? To my knowledge this should not be a problem.
Also if all the unit tests were running in the same process why would
you need to stop and start the reactor?
I have attached my recent submissions to twisted core which include a
pop3TestServer, a pop3Client, and a unittest illustrating how to set
up local client server communication.
Thoughts?
Phillip J. Eby wrote:
Hi. I've heard recently that there are some tests that ideally
would need to run under the Twisted reactor, in order to properly
exercise the functionality under test. However, there are a number
of issues including the possible need for multiple uses of run/stop,
dependency on external servers, test duration, etc.
There is, however, a relatively straightforward solution: use a mock
reactor. I've successfully used this approach in the past to test
event-driven libraries, although it was only with a subset of the
full Twisted reactor capabilities. A mock reactor can be stopped,
reset, and started as many times as you like, because it doesn't
rely on hooking a GUI event loop, running in a separate thread, or
anything like that.
A mock reactor can run in "simulated time", which means that it uses
a time() function that runs faster than "real" time. For example,
if you schedule a callback, and there's no pending simulated I/O or
other scheduled calls, the simulated time jumps ahead to the next
scheduled callback.
One additional side benefit of simulated time is that it's
deterministic and therefore can be reliably reproduced in repeated
tests. In PEAK, for example, I once wrote tests for some components
that might be compared to WakeupCallers in Chandler. I had several
scheduled to wake up on various intervals, and the test then
verified that they had run at all the times they should have. Since
the time is simulated, there were no rounding errors or clock
precision to take into account, and the tests could instantaneously
whether they were simulating seconds, minutes, or even hours of
scheduling operations.
A mock reactor for Chandler tests could also use my "mockets" (fake
sockets) library in order to avoid doing any "real" network I/O,
allowing servers to listen and clients to connect to addresses on a
virtual network that exists only in the process' memory, thereby
avoiding the complexity of using external processes to set up and
tear down servers or depending on other servers being up and having
connectivity to them.
Although I don't have a complete mock reactor implementation, I do
have most of the prerequisites and experience that would be needed
to implement one, if anybody is interested. So, if you have things
(like Zanshin, Chandler client protocols, etc.) that need
reactor-based testing, and would be interested in helping me test a
mock reactor for your test cases, let me know. It's also possible
that this could be a joint project with the Twisted folks; as early
as last year, Itamar expressed an interest in allowing test reactors
to run using simulated time:
http://twistedmatrix.com/pipermail/twisted-python/2004-January/006982.html
And I would be surprised if they're not interested in having a
mocket-based reactor as well. The last hacking I did on Twisted was
around 1.1, so it might take me some time to get familiar with the
2.0 reactor interfaces. However, unless there are major differences
I don't expect it to be difficult to do; Twisted is designed to
isolate code from the underlying transport mechanism in use. About
the only "interesting" part would likely be SSL/TLS, since I doubt
M2Crypto and OpenSSL will want to talk to mocket objects instead of
real sockets. It might be necessary to create mock SSL "Transport"
objects as well as a mock reactor.
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev
------------------------------------------------------------------------
# -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) 2001-2004 Divmod Inc.
# See LICENSE for details.
"""POP3 client protocol implementation
Don't use this module directly. Use twisted.mail.pop3 instead.
@author U{Jp Calderone<mailto:[EMAIL PROTECTED]>}
API Stability: Unstable
"""
import re, md5
from twisted.python import log
from twisted.internet import defer
from twisted.protocols import basic
from twisted.protocols import policies
from twisted.internet import error
from twisted.internet import interfaces
OK = '+OK'
ERR = '-ERR'
class POP3ClientError(Exception):
"""Base class for all exceptions raised by POP3Client.
"""
class InsecureAuthenticationDisallowed(POP3ClientError):
"""Secure authentication was required but no mechanism could be found.
"""
class TLSError(POP3ClientError):
"""Secure authentication was required but no mechanism could be found.
"""
class TLSNotSupportedError(POP3ClientError):
"""Secure authentication was required but no mechanism could be found.
"""
class OptionNotSupportedError(POP3ClientError):
"""Secure authentication was required but no mechanism could be found.
"""
class ServerErrorResponse(POP3ClientError):
"""The server returned an error response to a request.
"""
def __init__(self, reason, consumer=None):
POP3ClientError.__init__(self, reason)
self.consumer = consumer
class LineTooLong(POP3ClientError):
"""The server sent an extremely long line.
"""
class _ListSetter:
# Internal helper. POP3 responses sometimes occur in the
# form of a list of lines containing two pieces of data,
# a message index and a value of some sort. When a message
# is deleted, it is omitted from these responses. The
# setitem method of this class is meant to be called with
# these two values. In the cases where indexes are skipped,
# it takes care of padding out the missing values with None.
def __init__(self, L):
self.L = L
def setitem(self, (item, value)):
diff = item - len(self.L) + 1
if diff > 0:
self.L.extend([None] * diff)
self.L[item] = value
def _statXform(line):
# Parse a STAT response
numMsgs, totalSize = line.split(None, 1)
return int(numMsgs), int(totalSize)
def _listXform(line):
# Parse a LIST response
index, size = line.split(None, 1)
return int(index) - 1, int(size)
def _uidXform(line):
# Parse a UIDL response
index, uid = line.split(None, 1)
return int(index) - 1, uid
def _codeStatusSplit(line):
# Parse an +OK or -ERR response
parts = line.split(' ', 1)
if len(parts) == 1:
return parts[0], ''
return parts
def _dotUnquoter(line):
"""
'.' characters which begin a line of a message are doubled to avoid
confusing with the terminating '.\r\n' sequence. This function unquotes
them.
"""
if line.startswith('..'):
return line[1:]
return line
class POP3Client(basic.LineOnlyReceiver, policies.TimeoutMixin):
"""POP3 client protocol implementation class
Instances of this class provide a convenient, efficient API for
retrieving and deleting messages from a POP3 server.
"""
# Capabilities are not allowed to change during the session
# So cache the first response and use that for all later
# lookups
_capCache = None
# Whether STARTTLS has been issued successfully yet or not.
startedTLS = False
# Indicate whether login() should be allowed if the server
# offers no authentication challenge and if our transport
# does not offer any protection via encryption.
allowInsecureLogin = False
# Regular expression to search for in the challenge string in the server
# greeting line.
challengeMagicRe = re.compile('(<[^>]+>)')
# Challenge received from the server
serverChallenge = None
# List of pending calls.
# We are a pipelining API but don't actually
# support pipelining on the network yet.
_blockedQueue = None
# The Deferred to which the very next result will go.
waiting = None
# Number of seconds to wait before timing out a connection.
# If the number is <= 0 no timeout checking will be performed.
timeout = 0
#Overides LineOnlyReceiver to set a larger max length.
MAX_LENGTH = 16384 * 2
def __init__(self, contextFactory = None):
self.context = contextFactory
self.timedOut = False
def _blocked(self, f, *a):
# Internal helper. If commands are being blocked, append
# the given command and arguments to a list and return a Deferred
# that will be chained with the return value of the function
# when it eventually runs. Otherwise, set up for commands to be
# blocked and return None.
if self._blockedQueue is not None:
d = defer.Deferred()
self._blockedQueue.append((d, f, a))
return d
self._blockedQueue = []
return None
def _unblock(self):
# Internal helper. Indicate that a function has completed.
# If there are blocked commands, run the next one. If there
# are not, set up for the next command to not be blocked.
if self._blockedQueue == []:
self._blockedQueue = None
elif self._blockedQueue is not None:
d, f, a = self._blockedQueue.pop(0)
d2 = f(*a)
d2.chainDeferred(d)
def sendShort(self, cmd, args):
# Internal helper. Send a command to which a short response
# is expected. Return a Deferred that fires when the response
# is received. Block all further commands from being sent until
# the response is received. Transition the state to SHORT.
d = self._blocked(self.sendShort, cmd, args)
if d is not None:
return d
if args:
self.sendLine(cmd + ' ' + args)
else:
self.sendLine(cmd)
self.state = 'SHORT'
self.waiting = defer.Deferred()
return self.waiting
def sendLong(self, cmd, args, consumer, xform):
# Internal helper. Send a command to which a multiline
# response is expected. Return a Deferred that fires when
# the entire response is received. Block all further commands
# from being sent until the entire response is received.
# Transition the state to LONG_INITIAL.
d = self._blocked(self.sendLong, cmd, args, consumer, xform)
if d is not None:
return d
if args:
self.sendLine(cmd + ' ' + args)
else:
self.sendLine(cmd)
self.state = 'LONG_INITIAL'
self.xform = xform
self.consumer = consumer
self.waiting = defer.Deferred()
return self.waiting
# Twisted protocol callback
def connectionMade(self):
if self.timeout > 0:
self.setTimeout(self.timeout)
self.state = 'WELCOME'
def timeoutConnection(self):
self.timedOut = True
self.transport.loseConnection()
def connectionLost(self, reason):
if self.timeout > 0:
self.setTimeout(None)
if self.timedOut:
reason = error.TimeoutError()
self.timedOut = False
d = []
if self.waiting is not None:
d.append(self.waiting)
self.waiting = None
if self._blockedQueue is not None:
d.extend([deferred for (deferred, f, a) in self._blockedQueue])
self._blockedQueue = None
for w in d:
w.errback(reason)
def lineReceived(self, line):
if self.timeout > 0:
self.resetTimeout()
state = self.state
self.state = None
state = getattr(self, 'state_' + state)(line) or state
if self.state is None:
self.state = state
def lineLengthExceeded(self, buffer):
# XXX - We need to be smarter about this
if self.waiting is not None:
waiting, self.waiting = self.waiting, None
waiting.errback(LineTooLong())
self.transport.loseConnection()
# POP3 Client state logic - don't touch this.
def state_WELCOME(self, line):
# WELCOME is the first state. The server sends one line of text
# greeting us, possibly with an APOP challenge. Transition the
# state to WAITING.
code, status = _codeStatusSplit(line)
if code != OK:
#XXX: Should raise some kind of error here
self.transport.loseConnection()
else:
m = self.challengeMagicRe.search(status)
if m is not None:
self.serverChallenge = m.group(1)
self.serverGreeting(self.serverChallenge)
return 'WAITING'
def state_WAITING(self, line):
# The server isn't supposed to send us anything in this state.
log.msg("Illegal line from server: " + repr(line))
def state_SHORT(self, line):
# This is the state we are in when waiting for a single
# line response. Parse it and fire the appropriate callback
# or errback. Transition the state back to WAITING.
deferred, self.waiting = self.waiting, None
self._unblock()
code, status = _codeStatusSplit(line)
if code == OK:
deferred.callback(status)
else:
deferred.errback(ServerErrorResponse(status))
return 'WAITING'
def state_LONG_INITIAL(self, line):
# This is the state we are in when waiting for the first
# line of a long response. Parse it and transition the
# state to LONG if it is an okay response; if it is an
# error response, fire an errback, clean up the things
# waiting for a long response, and transition the state
# to WAITING.
code, status = _codeStatusSplit(line)
if code == OK:
return 'LONG'
consumer = self.consumer
deferred = self.waiting
self.consumer = self.waiting = self.xform = None
self._unblock()
deferred.errback(ServerErrorResponse(status, consumer))
return 'WAITING'
def state_LONG(self, line):
# This is the state for each line of a long response.
# If it is the last line, finish things, fire the
# Deferred, and transition the state to WAITING.
# Otherwise, pass the line to the consumer.
if line == '.':
consumer = self.consumer
deferred = self.waiting
self.consumer = self.waiting = self.xform = None
self._unblock()
deferred.callback(consumer)
return 'WAITING'
else:
if self.xform is not None:
self.consumer(self.xform(line))
else:
self.consumer(line)
return 'LONG'
def serverGreeting(self, challenge):
"""Called when the server has sent us a greeting.
@type challenge: C{Str} (None if no challenge returned in the Server
Greeting)
@param challenge: A POP3 server which implements the APOP command will
include a timestamp challenge in its banner
greeting (RFC 1939).
.
"""
def startTLS(self, contextFactory=None):
"""
Initiates a 'STLS' request and negotiates the TLS / SSL
Handshake.
@param contextFactory: The TLS / SSL Context Factory to
leverage. If the contextFactory is None the POP3Client will
either use the current TLS / SSL Context Factory or attempt to
create a new one.
@type contextFactory: C{ssl.ClientContextFactory}
@return: A Deferred which fires when the transport has been
secured according to the given contextFactory, or which fails
if the transport cannot be secured.
"""
if self._capCache is None:
d = self.capabilities()
else:
d = defer.succeed(self._capCache)
d.addCallback(self._startTLS, contextFactory)
return d
def _startTLS(self, caps, contextFactory):
assert not self.startedTLS, "Client and Server are currently communicating
via TLS"
if contextFactory is None:
contextFactory = self._getContextFactory()
if contextFactory is None:
return defer.fail(TLSError(
"POP3Client requires a TLS context to "
"initiate the STARTTLS handshake"))
if 'STLS' not in caps:
return defer.fail(TLSNotSupportedError(
"Server does not support secure communication "
"via TLS / SSL"))
tls = interfaces.ITLSTransport(self.transport, None)
if tls is None:
return defer.fail(TLSError(
"POP3Client transport does not implement "
"interfaces.ITLSTransport"))
d = self.sendShort('STLS', None)
d.addCallback(self._startedTLS, contextFactory)
d.addCallback(lambda _: self.capabilities())
return d
def _startedTLS(self, result, context):
self.transport.startTLS(context)
self._capCache = None
self.startedTLS = True
self.context = context
return result
def _getContextFactory(self):
if self.context is not None:
return self.context
try:
from twisted.internet import ssl
except ImportError:
return None
else:
context = ssl.ClientContextFactory()
context.method = ssl.SSL.TLSv1_METHOD
return context
# External hooks - call these (most of 'em anyway)
def login(self, username, password):
"""Log into the server.
If APOP is available it will be used. Otherwise, if
TLS is available a 'STLS' session will be started and
plaintext login will proceed. Otherwise, if the
instance attribute allowInsecureLogin is set to True,
insecure plaintext login will proceed. Otherwise,
InsecureAuthenticationDisallowed will be raised
(asynchronously).
@param username: The username with which to log in.
@param password: The password with which to log in.
@rtype: C{Deferred}
@return: A deferred which fires when login has
completed.
"""
if self._capCache is None:
d = self.capabilities()
else:
d = defer.succeed(self._capCache)
d.addCallback(self._login, username, password)
return d
def _login(self, caps, username, password):
if self.serverChallenge is not None:
return self._apop(username, password, self.serverChallenge)
tryTLS = 'STLS' in caps
#If our transport supports switching to TLS, we might want to try to
switch to TLS.
tlsableTransport = interfaces.ITLSTransport(self.transport,
default=None) is not None
# If our transport is not already using TLS, we might want to try to
switch to TLS.
nontlsTransport = interfaces.ISSLTransport(self.transport, default=None)
is None
if not self.startedTLS and tryTLS and tlsableTransport and
nontlsTransport:
d = self.startTLS()
d.addCallback(self._loginTLS, username, password)
return d
elif self.startedTLS or self.allowInsecureLogin:
return self._plaintext(username, password)
else:
return defer.fail(InsecureAuthenticationDisallowed())
def _loginTLS(self, res, username, password):
return self._plaintext(username, password)
def _plaintext(self, username, password):
# Internal helper. Send a username/password pair, returning a Deferred
# that fires when both have succeeded or fails when the server rejects
# either.
return self.user(username).addCallback(lambda r: self.password(password))
def _apop(self, username, password, challenge):
# Internal helper. Computes and sends an APOP response. Returns
# a Deferred that fires when the server responds to the response.
digest = md5.new(challenge + password).hexdigest()
return self.apop(username, digest)
def apop(self, username, digest):
"""Perform APOP login.
This should be used in special circumstances only, when it is
known that the server supports APOP authentication, and APOP
authentication is absolutely required. For the common case,
use L{login} instead.
@param username: The username with which to log in.
@param digest: The challenge response to authenticate with.
"""
return self.sendShort('APOP', username + ' ' + digest)
def user(self, username):
"""Send the user command.
This performs the first half of plaintext login. Unless this
is absolutely required, use the L{login} method instead.
@param username: The username with which to log in.
"""
return self.sendShort('USER', username)
def password(self, password):
"""Send the password command.
This performs the second half of plaintext login. Unless this
is absolutely required, use the L{login} method instead.
@param password: The plaintext password with which to authenticate.
"""
return self.sendShort('PASS', password)
def delete(self, index):
"""Delete a message from the server.
@type index: C{int}
@param index: The index of the message to delete.
This is 0-based.
@rtype: C{Deferred}
@return: A deferred which fires when the delete command
is successful, or fails if the server returns an error.
"""
return self.sendShort('DELE', str(index + 1))
def _consumeOrSetItem(self, cmd, args, consumer, xform):
# Internal helper. Send a long command. If no consumer is
# provided, create a consumer that puts results into a list
# and return a Deferred that fires with that list when it
# is complete.
if consumer is None:
L = []
consumer = _ListSetter(L).setitem
return self.sendLong(cmd, args, consumer, xform).addCallback(lambda
r: L)
return self.sendLong(cmd, args, consumer, xform)
def _consumeOrAppend(self, cmd, args, consumer, xform):
# Internal helper. Send a long command. If no consumer is
# provided, create a consumer that appends results to a list
# and return a Deferred that fires with that list when it is
# complete.
if consumer is None:
L = []
consumer = L.append
return self.sendLong(cmd, args, consumer, xform).addCallback(lambda
r: L)
return self.sendLong(cmd, args, consumer, xform)
def capabilities(self, useCache=1):
"""Retrieve the capabilities supported by this server.
"""
if useCache and self._capCache is not None:
return defer.succeed(self._capCache)
#Reset the Capabilities Cache
self._capCache = {}
d = self._consumeOrAppend('CAPA', None, self._capsConsumer, None)
#cabilities is not supported by some POP servers. If an error
#is thrown we still want the same behavior
d.addBoth(self._cbCapabilities)
return d
def _cbCapabilities(self, result):
"""Returns the Capabilities to the caller"""
return self._capCache
def _capsConsumer(self, line):
tmp = line.split()
size = len(tmp)
if size == 0:
return
if size == 1:
self._capCache[tmp[0]] = None
else:
self._capCache[tmp[0]] = tmp[1:]
def noop(self):
return self.sendShort("NOOP", None)
def rset(self):
return self.sendShort("RSET", None)
def retrieve(self, index, consumer=None, lines=None):
"""Retrieve a message from the server.
If L{consumer} is not None, it will be called with
each line of the message as it is received. Otherwise,
the returned Deferred will be fired with a list of all
the lines when the message has been completely received.
"""
idx = str(index + 1)
if lines is None:
return self._consumeOrAppend('RETR', idx, consumer, _dotUnquoter)
return self._consumeOrAppend('TOP', '%s %d' % (idx, lines), consumer,
_dotUnquoter)
def stat(self):
"""Issues a 'STAT' request which is allowed in the TRANSACTION state
(RFC 1939).
The returned Deferred will be fired with a tuple containing the
number or messages in the maildrop and the size of the
maildrop in octets.
"""
return self.sendShort('STAT', None).addCallback(_statXform)
def listSize(self, consumer=None):
"""Retrieve a list of the size of all messages on the server.
If L{consumer} is not None, it will be called with two-tuples
of message index number and message size as they are received.
Otherwise, a Deferred which will fire with a list of B{only}
message sizes will be returned. For messages which have been
deleted, None will be used in place of the message size.
"""
return self._consumeOrSetItem('LIST', None, consumer, _listXform)
def listUID(self, consumer=None):
"""Retrieve a list of the UIDs of all messages on the server.
If L{consumer} is not None, it will be called with two-tuples
of message index number and message UID as they are received.
Otherwise, a Deferred which will fire with of list of B{only}
message UIDs will be returned. For messages which have been
deleted, None will be used in place of the message UID.
"""
return self._consumeOrSetItem('UIDL', None, consumer, _uidXform)
def quit(self):
"""Disconnect from the server.
"""
return self.sendShort('QUIT', None)
__all__ = [
# Exceptions
'InsecureAuthenticationDisallowed', 'LineTooLong', 'POP3ClientError',
'ServerErrorResponse', 'TLSError', 'TLSNotSupportedError',
'OptionNotSupported',
# Protocol classes
'POP3Client']
------------------------------------------------------------------------
#!/usr/local/bin/python
from twisted.internet.protocol import Factory
from twisted.protocols import basic
from twisted.internet import reactor
import sys
USER = "test"
PASS = "twisted"
PORT = 1100
SSL_SUPPORT = True
UIDL_SUPPORT = True
INVALID_SERVER_RESPONSE = False
INVALID_CAPABILITY_RESPONSE = False
INVALID_LOGIN_RESPONSE = False
DENY_CONNECTION = False
DROP_CONNECTION = False
BAD_TLS_RESPONSE = False
TIMEOUT_RESPONSE = False
TIMEOUT_DEFERRED = False
SLOW_GREETING = False
"""Commands"""
CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
CAPABILITIES = [
"TOP",
"LOGIN-DELAY 180",
"USER",
"SASL LOGIN"
]
CAPABILITIES_SSL = "STLS"
CAPABILITIES_UIDL = "UIDL"
INVALID_RESPONSE = "-ERR Unknown request"
VALID_RESPONSE = "+OK Command Completed"
AUTH_DECLINED = "-ERR LOGIN failed"
AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
TLS_ERROR = "-ERR server side error start TLS handshake"
LOGOUT_COMPLETE = "+OK quit completed"
NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
STAT = "+OK 0 0"
UIDL = "+OK Unique-ID listing follows\r\n."
LIST = "+OK Mailbox scan listing follows\r\n."
CAP_START = "+OK Capability list follows:"
class POP3TestServer(basic.LineReceiver):
def __init__(self, contextFactory = None):
self.loggedIn = False
self.caps = None
self.tmpUser = None
self.ctx = contextFactory
def sendSTATResp(self, req):
self.sendLine(STAT)
def sendUIDLResp(self, req):
self.sendLine(UIDL)
def sendLISTResp(self, req):
self.sendLine(LIST)
def sendCapabilities(self):
if self.caps is None:
self.caps = [CAP_START]
if UIDL_SUPPORT:
self.caps.append(CAPABILITIES_UIDL)
if SSL_SUPPORT:
self.caps.append(CAPABILITIES_SSL)
for cap in CAPABILITIES:
self.caps.append(cap)
resp = '\r\n'.join(self.caps)
resp += "\r\n."
self.sendLine(resp)
def connectionMade(self):
if DENY_CONNECTION:
self.transport.loseConnection()
return
if SLOW_GREETING:
reactor.callLater(20, self.sendGreeting)
else:
self.sendGreeting()
def sendGreeting(self):
self.sendLine(CONNECTION_MADE)
def lineReceived(self, line):
"""Error Conditions"""
if TIMEOUT_RESPONSE:
"""Do not respond to clients request"""
return
if DROP_CONNECTION:
self.transport.loseConnection()
return
elif "CAPA" in line.upper():
if INVALID_CAPABILITY_RESPONSE:
self.sendLine(INVALID_RESPONSE)
else:
self.sendCapabilities()
elif "STLS" in line.upper() and SSL_SUPPORT:
self.startTLS()
elif "USER" in line.upper():
if INVALID_LOGIN_RESPONSE:
self.sendLine(INVALID_RESPONSE)
return
resp = None
try:
self.tmpUser = line.split(" ")[1]
resp = VALID_RESPONSE
except:
resp = AUTH_DECLINED
self.sendLine(resp)
elif "PASS" in line.upper():
resp = None
try:
pwd = line.split(" ")[1]
if self.tmpUser is None or pwd is None:
resp = AUTH_DECLINED
elif self.tmpUser == USER and pwd == PASS:
resp = AUTH_ACCEPTED
self.loggedIn = True
else:
resp = AUTH_DECLINED
except:
resp = AUTH_DECLINED
self.sendLine(resp)
elif "QUIT" in line.upper():
self.loggedIn = False
self.sendLine(LOGOUT_COMPLETE)
self.disconnect()
elif INVALID_SERVER_RESPONSE:
self.sendLine(INVALID_RESPONSE)
elif not self.loggedIn:
self.sendLine(NOT_LOGGED_IN)
elif "NOOP" in line.upper():
self.sendLine(VALID_RESPONSE)
elif "STAT" in line.upper():
if TIMEOUT_DEFERRED:
return
self.sendLine(STAT)
elif "LIST" in line.upper():
if TIMEOUT_DEFERRED:
return
self.sendLine(LIST)
elif "UIDL" in line.upper():
if TIMEOUT_DEFERRED:
return
elif not UIDL_SUPPORT:
self.sendLine(INVALID_RESPONSE)
return
self.sendLine(UIDL)
def startTLS(self):
if self.ctx is None:
self.getContext()
if SSL_SUPPORT and self.ctx is not None:
self.sendLine('+OK Begin TLS negotiation now')
self.transport.startTLS(self.ctx)
else:
self.sendLine('+OK TLS not available')
def disconnect(self):
self.transport.loseConnection()
def getContext(self):
try:
from twisted.internet import ssl
except ImportError:
self.ctx = None
else:
self.ctx = ssl.ClientContextFactory()
self.ctx.method = ssl.SSL.TLSv1_METHOD
usage = """popServer.py [arg] (default is Standard POP Server with no messages)
no_ssl - Start with no SSL support
no_uidl - Start with no UIDL support
bad_resp - Send a non-RFC compliant response to the Client
bad_cap_resp - send a non-RFC compliant response when the Client sends a
'CAPABILITY' request
bad_login_resp - send a non-RFC compliant response when the Client sends a
'LOGIN' request
deny - Deny the connection
drop - Drop the connection after sending the greeting
bad_tls - Send a bad response to a STARTTLS
timeout - Do not return a response to a Client request
to_deferred - Do not return a response on a 'Select' request. This
will test Deferred callback handling
slow - Wait 20 seconds after the connection is made to return a Server Greeting
"""
def printMessage(msg):
print "Server Starting in %s mode" % msg
def processArg(arg):
if arg.lower() == 'no_ssl':
global SSL_SUPPORT
SSL_SUPPORT = False
printMessage("NON-SSL")
elif arg.lower() == 'no_uidl':
global UIDL_SUPPORT
UIDL_SUPPORT = False
printMessage("NON-UIDL")
elif arg.lower() == 'bad_resp':
global INVALID_SERVER_RESPONSE
INVALID_SERVER_RESPONSE = True
printMessage("Invalid Server Response")
elif arg.lower() == 'bad_cap_resp':
global INVALID_CAPABILITY_RESPONSE
INVALID_CAPABILITY_RESPONSE = True
printMessage("Invalid Capability Response")
elif arg.lower() == 'bad_login_resp':
global INVALID_LOGIN_RESPONSE
INVALID_LOGIN_RESPONSE = True
printMessage("Invalid Capability Response")
elif arg.lower() == 'deny':
global DENY_CONNECTION
DENY_CONNECTION = True
printMessage("Deny Connection")
elif arg.lower() == 'drop':
global DROP_CONNECTION
DROP_CONNECTION = True
printMessage("Drop Connection")
elif arg.lower() == 'bad_tls':
global BAD_TLS_RESPONSE
BAD_TLS_RESPONSE = True
printMessage("Bad TLS Response")
elif arg.lower() == 'timeout':
global TIMEOUT_RESPONSE
TIMEOUT_RESPONSE = True
printMessage("Timeout Response")
elif arg.lower() == 'to_deferred':
global TIMEOUT_DEFERRED
TIMEOUT_DEFERRED = True
printMessage("Timeout Deferred Response")
elif arg.lower() == 'slow':
global SLOW_GREETING
SLOW_GREETING = True
printMessage("Slow Greeting")
elif arg.lower() == '--help':
print usage
sys.exit()
else:
print usage
sys.exit()
def main():
if len(sys.argv) < 2:
printMessage("POP3 with no messages")
else:
args = sys.argv[1:]
for arg in args:
processArg(arg)
f = Factory()
f.protocol = POP3TestServer
reactor.listenTCP(PORT, f)
reactor.run()
if __name__ == '__main__':
main()
------------------------------------------------------------------------
# -*- test-case-name: twisted.mail.test.test_pop3client -*-
# Copyright (c) 2001-2004 Divmod Inc.
# See LICENSE for details.
from twisted.mail.pop3 import AdvancedPOP3Client as POP3Client
from twisted.mail.pop3 import InsecureAuthenticationDisallowed
from twisted.mail.pop3 import ServerErrorResponse
from twisted.protocols import loopback
from twisted.internet import defer
from twisted.trial import unittest
from twisted.test.proto_helpers import StringTransport
from twisted.protocols import basic
import pop3TestServer
try:
from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
except ImportError:
ClientTLSContext = ServerTLSContext = None
capCache = {"TOP": None, "LOGIN-DELAY": "180", "UIDL": None, \
"STLS": None, "USER": None, "SASL": "LOGIN"}
def setUp():
p = POP3Client()
p._capCache = capCache
t = StringTransport()
p.makeConnection(t)
return p, t
def strip(f):
return lambda result, f=f: f()
class POP3ClientLoginTestCase(unittest.TestCase):
def testOkUser(self):
p, t = setUp()
d = p.user("username")
self.assertEquals(t.value(), "USER username\r\n")
p.dataReceived("+OK send password\r\n")
return d.addCallback(unittest.assertEqual, "send password")
def testBadUser(self):
p, t = setUp()
d = p.user("username")
self.assertEquals(t.value(), "USER username\r\n")
p.dataReceived("-ERR account suspended\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "account suspended")
def testOkPass(self):
p, t = setUp()
d = p.password("password")
self.assertEquals(t.value(), "PASS password\r\n")
p.dataReceived("+OK you're in!\r\n")
return d.addCallback(unittest.assertEqual, "you're in!")
def testBadPass(self):
p, t = setUp()
d = p.password("password")
self.assertEquals(t.value(), "PASS password\r\n")
p.dataReceived("-ERR go away\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "go away")
def testOkLogin(self):
p, t = setUp()
p.allowInsecureLogin = True
d = p.login("username", "password")
self.assertEquals(t.value(), "USER username\r\n")
p.dataReceived("+OK go ahead\r\n")
self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
p.dataReceived("+OK password accepted\r\n")
return d.addCallback(unittest.assertEqual, "password accepted")
def testBadPasswordLogin(self):
p, t = setUp()
p.allowInsecureLogin = True
d = p.login("username", "password")
self.assertEquals(t.value(), "USER username\r\n")
p.dataReceived("+OK waiting on you\r\n")
self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
p.dataReceived("-ERR bogus login\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "bogus login")
def testBadUsernameLogin(self):
p, t = setUp()
p.allowInsecureLogin = True
d = p.login("username", "password")
self.assertEquals(t.value(), "USER username\r\n")
p.dataReceived("-ERR bogus login\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "bogus login")
def testServerGreeting(self):
p, t = setUp()
# Make sure it *isn't* in the instance dict, just for sanity
self.failIfIn('serverChallenge', vars(p))
p.dataReceived("+OK lalala this has no challenge\r\n")
# Make sure it *is* in the instance dict and that it is None
self.assertEquals(p.serverChallenge, None)
def testServerGreetingWithChallenge(self):
p, t = setUp()
# Make sure it *isn't* in the instance dict, just for sanity
self.failIfIn('serverChallenge', vars(p))
p.dataReceived("+OK <here is the challenge>\r\n")
# Make sure it *is* in the instance dict and is what we sent
self.assertEquals(vars(p)['serverChallenge'], "<here is the challenge>")
def testAPOP(self):
p, t = setUp()
p.dataReceived("+OK <challenge string goes here>\r\n")
d = p.login("username", "password")
self.assertEquals(t.value(), "APOP username
f34f1e464d0d7927607753129cabe39a\r\n")
p.dataReceived("+OK Welcome!\r\n")
return d.addCallback(unittest.assertEqual, "Welcome!")
def testInsecureLoginRaisesException(self):
p, t = setUp()
p.dataReceived("+OK Howdy")
d = p.login("username", "password")
self.failIf(t.value())
self.assertRaises(InsecureAuthenticationDisallowed, unittest.wait, d)
class ListConsumer:
def __init__(self):
self.data = {}
def consume(self, (item, value)):
self.data.setdefault(item, []).append(value)
class MessageConsumer:
def __init__(self):
self.data = []
def consume(self, line):
self.data.append(line)
class POP3ClientListTestCase(unittest.TestCase):
def testListSize(self):
p, t = setUp()
d = p.listSize()
self.assertEquals(t.value(), "LIST\r\n")
p.dataReceived("+OK Here it comes\r\n")
p.dataReceived("1 3\r\n2 2\r\n3 1\r\n.\r\n")
return d.addCallback(unittest.assertEqual, [3, 2, 1])
def testListSizeWithConsumer(self):
p, t = setUp()
c = ListConsumer()
f = c.consume
d = p.listSize(f)
self.assertEquals(t.value(), "LIST\r\n")
p.dataReceived("+OK Here it comes\r\n")
p.dataReceived("1 3\r\n2 2\r\n3 1\r\n")
self.assertEquals(c.data, {0: [3], 1: [2], 2: [1]})
p.dataReceived("5 3\r\n6 2\r\n7 1\r\n")
self.assertEquals(c.data, {0: [3], 1: [2], 2: [1], 4: [3], 5: [2], 6:
[1]})
p.dataReceived(".\r\n")
return d.addCallback(unittest.assertIdentical, f)
def testFailedListSize(self):
p, t = setUp()
d = p.listSize()
self.assertEquals(t.value(), "LIST\r\n")
p.dataReceived("-ERR Fatal doom server exploded\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "Fatal doom server exploded")
def testListUID(self):
p, t = setUp()
d = p.listUID()
self.assertEquals(t.value(), "UIDL\r\n")
p.dataReceived("+OK Here it comes\r\n")
p.dataReceived("1 abc\r\n2 def\r\n3 ghi\r\n.\r\n")
return d.addCallback(unittest.assertEqual, ["abc", "def", "ghi"])
def testListUIDWithConsumer(self):
p, t = setUp()
c = ListConsumer()
f = c.consume
d = p.listUID(f)
self.assertEquals(t.value(), "UIDL\r\n")
p.dataReceived("+OK Here it comes\r\n")
p.dataReceived("1 xyz\r\n2 abc\r\n5 mno\r\n")
self.assertEquals(c.data, {0: ["xyz"], 1: ["abc"], 4: ["mno"]})
p.dataReceived(".\r\n")
return d.addCallback(unittest.assertIdentical, f)
def testFailedListUID(self):
p, t = setUp()
d = p.listUID()
self.assertEquals(t.value(), "UIDL\r\n")
p.dataReceived("-ERR Fatal doom server exploded\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "Fatal doom server exploded")
class POP3ClientMessageTestCase(unittest.TestCase):
def testRetrieve(self):
p, t = setUp()
d = p.retrieve(7)
self.assertEquals(t.value(), "RETR 8\r\n")
p.dataReceived("+OK Message incoming\r\n")
p.dataReceived("La la la here is message text\r\n")
p.dataReceived("..Further message text tra la la\r\n")
p.dataReceived(".\r\n")
return d.addCallback(
unittest.assertEqual,
["La la la here is message text",
".Further message text tra la la"])
def testRetrieveWithConsumer(self):
p, t = setUp()
c = MessageConsumer()
f = c.consume
d = p.retrieve(7, f)
self.assertEquals(t.value(), "RETR 8\r\n")
p.dataReceived("+OK Message incoming\r\n")
p.dataReceived("La la la here is message text\r\n")
p.dataReceived("..Further message text\r\n.\r\n")
self.assertIdentical(unittest.wait(d), f)
self.assertEquals(c.data, ["La la la here is message text",
".Further message text"])
def testPartialRetrieve(self):
p, t = setUp()
d = p.retrieve(7, lines=2)
self.assertEquals(t.value(), "TOP 8 2\r\n")
p.dataReceived("+OK 2 lines on the way\r\n")
p.dataReceived("Line the first! Woop\r\n")
p.dataReceived("Line the last! Bye\r\n")
p.dataReceived(".\r\n")
return d.addCallback(
unittest.assertEqual,
["Line the first! Woop",
"Line the last! Bye"])
def testPartialRetrieveWithConsumer(self):
p, t = setUp()
c = MessageConsumer()
f = c.consume
d = p.retrieve(7, f, lines=2)
self.assertEquals(t.value(), "TOP 8 2\r\n")
p.dataReceived("+OK 2 lines on the way\r\n")
p.dataReceived("Line the first! Woop\r\n")
p.dataReceived("Line the last! Bye\r\n")
p.dataReceived(".\r\n")
self.assertIdentical(unittest.wait(d), f)
self.assertEquals(c.data, ["Line the first! Woop",
"Line the last! Bye"])
def testFailedRetrieve(self):
p, t = setUp()
d = p.retrieve(0)
self.assertEquals(t.value(), "RETR 1\r\n")
p.dataReceived("-ERR Fatal doom server exploded\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "Fatal doom server exploded")
class POP3ClientMiscTestCase(unittest.TestCase):
def testCapability(self):
p, t = setUp()
d = p.capabilities(useCache=0)
self.assertEquals(t.value(), "CAPA\r\n")
p.dataReceived("+OK Capabilities on the way\r\n")
p.dataReceived("X\r\nY\r\nZ\r\n.\r\n")
return d.addCallback(unittest.assertEqual, {"X": None, "Y": None, "Z":
None})
def testCapabilityError(self):
p, t = setUp()
d = p.capabilities(useCache=0)
self.assertEquals(t.value(), "CAPA\r\n")
p.dataReceived("-ERR This server is lame!\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "This server is lame!")
def testStat(self):
p, t = setUp()
d = p.stat()
self.assertEquals(t.value(), "STAT\r\n")
p.dataReceived("+OK 1 1212\r\n")
return d.addCallback(unittest.assertEqual, (1, 1212))
def testStatError(self):
p, t = setUp()
d = p.stat()
self.assertEquals(t.value(), "STAT\r\n")
p.dataReceived("-ERR This server is lame!\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "This server is lame!")
def testNoop(self):
p, t = setUp()
d = p.noop()
self.assertEquals(t.value(), "NOOP\r\n")
p.dataReceived("+OK No-op to you too!\r\n")
return d.addCallback(unittest.assertEqual, "No-op to you too!")
def testNoopError(self):
p, t = setUp()
d = p.noop()
self.assertEquals(t.value(), "NOOP\r\n")
p.dataReceived("-ERR This server is lame!\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "This server is lame!")
def testRset(self):
p, t = setUp()
d = p.rset()
self.assertEquals(t.value(), "RSET\r\n")
p.dataReceived("+OK Reset state\r\n")
return d.addCallback(unittest.assertEqual, "Reset state")
def testRsetError(self):
p, t = setUp()
d = p.rset()
self.assertEquals(t.value(), "RSET\r\n")
p.dataReceived("-ERR This server is lame!\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "This server is lame!")
def testDelete(self):
p, t = setUp()
d = p.delete(3)
self.assertEquals(t.value(), "DELE 4\r\n")
p.dataReceived("+OK Hasta la vista\r\n")
return d.addCallback(unittest.assertEqual, "Hasta la vista")
def testDeleteError(self):
p, t = setUp()
d = p.delete(3)
self.assertEquals(t.value(), "DELE 4\r\n")
p.dataReceived("-ERR Winner is not you.\r\n")
exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
self.assertEquals(exc.args[0], "Winner is not you.")
class SimpleClient(POP3Client):
def __init__(self, deferred, contextFactory = None):
POP3Client.__init__(self, contextFactory)
self.deferred = deferred
self.allowInsecureLogin = True
def serverGreeting(self, challenge):
self.deferred.callback(None)
class POP3HelperMixin:
serverCTX = None
clientCTX = None
def setUp(self):
d = defer.Deferred()
self.server =
pop3TestServer.POP3TestServer(contextFactory=self.serverCTX)
self.client = SimpleClient(d, contextFactory=self.clientCTX)
self.client.timeout = 30
self.connected = d
def tearDown(self):
del self.server
del self.client
del self.connected
def _cbStopClient(self, ignore):
self.client.transport.loseConnection()
def _ebGeneral(self, failure):
self.client.transport.loseConnection()
self.server.transport.loseConnection()
failure.printTraceback(open('failure.log', 'w'))
failure.printTraceback()
raise failure.value
def loopback(self):
loopback.loopbackTCP(self.server, self.client, noisy=False)
class POP3TLSTestCase(POP3HelperMixin, unittest.TestCase):
serverCTX = ServerTLSContext and ServerTLSContext()
clientCTX = ClientTLSContext and ClientTLSContext()
def testStartTLS(self):
def login():
#this will startTLS automatically
return self.client.login('test', 'twisted')
def quit():
return self.client.quit()
methods = [login, quit]
map(self.connected.addCallback, map(strip, methods))
self.connected.addCallback(login)
self.connected.addCallback(quit)
self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
self.loopback()
class POP3TimeoutTestCase(POP3HelperMixin, unittest.TestCase):
def testTimeout(self):
def login():
#this will startTLS automatically
d = self.client.login('test', 'twisted')
d.addErrback(timedOut)
return d
def timedOut(failure):
self._cbStopClient(None)
failure.trap(error.TimeoutError)
def quit():
return self.client.quit()
self.client.timeout = 3
#No need to leverage SSL for timeout test
pop3TestServer.SSL_SUPPORT = False
#Tell the server to not return a response to client.
#This will trigger a timeout.
pop3TestServer.TIMEOUT_RESPONSE = True
methods = [login, quit]
map(self.connected.addCallback, map(strip, methods))
self.connected.addCallback(login)
self.connected.addCallback(quit)
self.connected.addCallback(self._cbStopClient)
self.connected.addErrback(self._ebGeneral)
self.loopback()
if ClientTLSContext is None:
for case in (POP3TLSTestCase,):
case.skip = "OpenSSL not present"
------------------------------------------------------------------------
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
Open Source Applications Foundation "Dev" mailing list
http://lists.osafoundation.org/mailman/listinfo/dev