Hi Colleagues:
I recently updated to Stackless 2.6.1 and Twisted 8.2. I executed the following
programme that works fine under Stackless 2.5.2 and Twisted (including 8.2) and
I received the following error:
traceback (most recent call last):
File "ToyProcessor5.py", line 60, in twistedReactor
reactor.run()
File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py"
, line 1048, in run
self.mainLoop()
--- <exception caught here> ---
File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py"
, line 1057, in mainLoop
self.runUntilCurrent()
File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py"
, line 707, in runUntilCurrent
log.deferr()
File "/usr/local/lib/python2.6/site-packages/twisted/python/log.py", l
ine 153, in err
_stuff = failure.Failure()
File "/usr/local/lib/python2.6/site-packages/twisted/python/failure.py
", line 265, in __init__
parentCs = reflect.allYourBase(self.type)
", line 542, in allYourBase
accumulateBases(classObj, l, baseClass)
File
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550,
in accumulateBases
accumulateBases(base, l, baseClass)
File
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550,
in accumulateBases
accumulateBases(base, l, baseClass)
File
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550,
in accumulateBases
accumulateBases(base, l, baseClass)
File
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550,
in accumulateBases
accumulateBases(base, l, baseClass)
exceptions.RuntimeError: maximum recursion depth exceeded
Similarily written programmes fail in the same fashion. What is suspicious is
that there is no recursion in the offending section. Even if I create one
worker tasklet, I get the same error.
def twistedReactor():
l = task.LoopingCall(stackless.schedule)
l.start(.01)
reactor.run()
however, if I change
l = task.LoopingCall(stackless.schedule)
to
l = task.LoopingCall(tick)
and tick is
def tick():
stackless.schedule()
the programme works. Although a work around, I would like to find the real
problem.
What particular worries me is when I created small test examples, I was not
able to recreate the problem. Something else is going on....
What I would appreciate is some hints as to what may be happening.
- I normally don't expect "Maximum Recursion Depth" errors in Stackless.
- What is the reflect.allYourBases stuff?
- What is log.deferr? (I don't recall seeing that method).
If I get few clues, it would make it easier for me to write new tests and zero
in on the problem. Hopefully the problem is with my code rather than Stackless
2.6.1 and/or Twisted 8.2
I have included some sample code. Unfortunately ToyProcessor5.py is a bit large
(I have newer code that is smaller but requires more files)
Cheers,
Andrew
#!/usr/bin/env python
"""
ToyProcessor.py
Andrew Francis
April 3rd, 2008
The purpose of this programme is to illustrate the techniques
used in the presentation "Adventures in Stackless Python / Twisted Integration"
<song>Dead Letter and the Infinite - Wintersleep </song>
"""
import stackless
import time
import sys
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.internet import reactor
from twisted.web import client
from twisted.web import http
from twisted.python import log, logfile
from twisted.internet import task
RECEIVE_RESPONSE = 0
REPLY_RESPONSE = 1
INVOKE_RESPONSE = 2
WAIT_RESPONSE = 3
REQUEST_ERROR = -1
REPLY_ERROR = -1
__DEBUG__ = False
class CorrelationNotFound:
def __init__(self, data):
self.data = data
class Request(object):
pass
class Response(object):
pass
"""
run the Twisted Reactor in its own tasklet
also run a task that makes the Twisted Reactor yield to the Stackless scheduler
"""
def twistedReactor():
l = task.LoopingCall(stackless.schedule)
l.start(.01)
reactor.run()
class InvokeResponse(Response):
def __init__(self, requestId, message, isError = False):
self.type = INVOKE_RESPONSE
self.requestId = requestId
self.message = message
self.isError = isError
return
class ReceiveResponse(Response):
def __init__(self, path, message, channel):
self.type = RECEIVE_RESPONSE
self.path = path
self.message = message
self.channel = channel
return
class ReplyRequest(Request):
def __init__(self, requestId, message, isError = False):
self.requestId = requestId
self.message = message
self.isError = isError
class ReplyResponse(Response):
def __init__(self, requestId, isError = False):
self.type = REPLY_RESPONSE
self.requestId = requestId
self.isError = isError
return
class WaitResponse(Response):
def __init__(self, requestId):
self.type = WAIT_RESPONSE
self.requestId = requestId
return
"""
Process corresponds to a WS-BPEL process
"""
class Process(object):
def __init__(self, url, messageExchange):
self.url = url
self.messageExchange = messageExchange
self.scopeId = Process.processor.__generateScope__()
self.replyMessage = "<html><head></head><body>hello world from process " + str(self.scopeId) + "</body></html>"
return
def execute(self):
log.msg("Process " + str(self.scopeId) + " started")
self.processor.receive(self.scopeId, self.url, self.messageExchange, self.__class__)
#result = self.processor.invoke("http://localhost")
self.processor.reply(self.scopeId, self.messageExchange, self.replyMessage)
return
class AlarmProcess(object):
def __init__(self, period):
self.period = period
self.scopeId = Process.processor.__generateScope__()
return
def execute(self):
log.msg("Process " + str(self.scopeId) + " started")
while (1):
self.processor.wait(self.scopeId, self.period)
#result = self.processor.invoke("http://localhost")
log.msg("Tick")
return
"""
Correlations are used to match incoming messages to the corresponding receive
activities
"""
class Correlation(object):
def __init__(self, path, messageExchange, requestId):
self.path = path
self.messageExchange = messageExchange
self.requestId = requestId
self.aChannel = None
return
def __eq__(self, other):
return self.path == other
def __repr__(self):
return self.path + " " + "requestId: " + str(self.requestId) + " " + self.messageExchange
def getChannel(self):
return self.aChannel
def setChannel(self, channel):
self.aChannel = channel
channel = property(getChannel, setChannel)
class ToyProcessor(object):
def __init__(self, channel):
self.responseChannel = channel
self.requestId = -1
self.scopeId = 0
self.requestTable = {}
self.correlations = []
self.messageExchangeStates = {}
self.flag = True
return
"""
level 0 calls
"""
def createProcess(self, factory, *args,**kwargs):
stackless.tasklet(factory(*args, **kwargs).execute)()
def invoke(self, url):
return self.__processRequest__(self.__invokeRequest__, url, self.responseChannel)
def receive(self, eventScope, url, messageExchange, factory):
result = self.__processRequest__(self.__receiveRequest__, eventScope, url, messageExchange)
# and create a new daemon
if factory != None:
log.msg("creating replacement daemon")
self.createProcess(factory, url, messageExchange)
return result
def reply(self, eventScope, messageExchange, message):
self.__processRequest__(self.__replyRequest__, eventScope, messageExchange, message)
def wait(self, eventScope, period):
self.__processRequest__(self.__waitRequest__, eventScope, period)
"""
level 1 calls
"""
def __rewrite__(self, scope, messageExchange):
return str(scope) + ":" + messageExchange
def __processRequest__(self, callable, *args, **kwargs):
channel = stackless.channel()
requestId = self.__addRequest__(channel)
callable(requestId, *args, **kwargs)
result = channel.receive()
self.__deleteRequest__(requestId)
return result
def __processResponse__(self, response):
try:
if response.type == RECEIVE_RESPONSE:
requestId, message = self.__receiveResponse__(response)
elif response.type == REPLY_RESPONSE:
requestId, message = self.__replyResponse__(response)
elif response.type == INVOKE_RESPONSE:
requestId, message = self.__invokeResponse__(response)
elif response.type == WAIT_RESPONSE:
requestId, message = self.__waitResponse__(response)
else:
log.message("something funny has happened " + str(requestId))
print response
reactor.stop
if requestId != REQUEST_ERROR:
self.requestTable[requestId].send(message)
except:
log.msg(sys.exc_info())
def __generateScope__(self):
self.scopeId += 1
return self.scopeId
def __addRequest__(self, channel):
self.requestId = self.requestId + 1
self.requestTable[self.requestId] = channel
return self.requestId
def __deleteRequest__(self, requestId):
del self.requestTable[requestId]
return
def __invokeRequest__(self, requestId, url, responseChannel):
log.msg("INVOKE_REQUEST Started " + str(requestId) + " " + url)
Connection(requestId, responseChannel).connect(url)
log.msg("INVOKE_REQUEST Finished " + str(requestId))
def __invokeResponse__(self, response):
log.msg("INVOKE_RESPONSE Started " + str(response.requestId))
return response.requestId, response.message
def __receiveRequest__(self, requestId, scope, path, messageExchange):
log.msg("__RECEIVE_REQUEST___ STARTED " + str(requestId) + " pid:" + str(scope))
self.__addCorrelation__(requestId, path, self.__rewrite__(scope, messageExchange))
log.msg("__RECEIVE_REQUEST__ FINISHED__" + str(requestId) + " pid:" + str(scope))
def __receiveResponse__(self, response):
log.msg("RECEIVE_RESPONSE STARTED")
try:
correlation = self.__removeCorrelation__(response.path)
self.messageExchangeStates[correlation.messageExchange] = response.channel
except CorrelationNotFound, E:
response.channel.send(ReplyRequest(None,"<html><head><body>Path not found</body></html>",True))
return REQUEST_ERROR, None
else:
log.msg("RECEIVE_RESPONSE ENDED " + str(correlation.requestId))
return correlation.requestId, response.message
def __replyRequest__(self, requestId, scope, messageExchange, message):
log.msg("REPLY_REQUEST STARTED")
newMessageExchange = self.__rewrite__(scope, messageExchange)
channel = self.messageExchangeStates[newMessageExchange]
del self.messageExchangeStates[newMessageExchange]
log.msg("REPLY_REQUEST SENDING MESSAGE")
channel.send(ReplyRequest(requestId, message))
log.msg("REQUEST_REQUEST FINISHED")
return
def __replyResponse__(self, response):
return response.requestId, None
def __waitRequest__(self, requestId, scope, period):
def alarm(requestId):
self.responseChannel.send(WaitResponse(requestId))
reactor.callLater(period, alarm, requestId)
def __waitResponse__(self, response):
return response.requestId, None
def __addCorrelation__(self, requestId, path, messageExchange):
log.msg("entering __addCorrelation__")
self.correlations.append(Correlation(path, messageExchange, requestId))
log.msg("exiting __addCorrelation__")
def __removeCorrelation__(self, path):
try:
log.msg("Looking for correlation =>" + path)
i = self.correlations.index(path)
print "found =>", self.correlations[i]
except ValueError:
raise CorrelationNotFound(path)
return self.correlations.pop(i)
def execute(self):
while (self.flag):
log.msg("processor - waiting for message")
self.__processResponse__(self.responseChannel.receive())
"""
level 2
"""
class MyRequestHandler(http.Request):
"""
it is necessary to run the body of the request handler in its own tasklet
as to prevent the entire reactor from blocking on a channel
"""
def process(self):
stackless.tasklet(self.doWork)()
def doWork(self):
channel = stackless.channel()
receiveRequest = ReceiveResponse(self.path, self.content.read(), channel)
MyRequestHandler.responseChannel.send(receiveRequest)
log.msg("REQUEST HANDLER WAITING ON CHANNEL")
reply = channel.receive()
if reply.isError:
self.setResponseCode(http.NOT_FOUND)
theReplyId = REPLY_ERROR
else:
theReplyId = reply.requestId
self.write(reply.message.encode("utf-8"))
self.finish()
log.msg("REQUEST HANDLER WRITING REPLY " + str(theReplyId))
MyRequestHandler.responseChannel.send(ReplyResponse(theReplyId))
log.msg("REQUEST HANDLER COMPLETED " + str(theReplyId))
return
class MyHttp(http.HTTPChannel):
requestFactory = MyRequestHandler
class MyHttpFactory(http.HTTPFactory):
protocol = MyHttp
class Connection(object):
def __init__(self, requestId, channel):
self.channel = channel
self.requestId = requestId
def __handleConnection__(self, result):
log.msg("INVOKE RESPONSE " + str(self.requestId))
self.channel.send(InvokeResponse(self.requestId, result))
def __handleError__(self, failure):
self.channel.send(InvokeResponse(self.requestId, failure, True))
def connect(self, url):
client.getPage(url).addCallback(self.__handleConnection__).addErrback(self.__handleError__)
if __name__ == "__main__":
log.startLogging(sys.stdout)
log.msg("test starting Solution")
log.msg("test starting now")
"""
responses are send on a channel shared by Twisted and the ToyProcessor
"""
responseChannel = stackless.channel()
processor = ToyProcessor(responseChannel)
MyRequestHandler.responseChannel = responseChannel
MyRequestHandler.processor = processor
#make all Process instances inherit a processor
#should create an abstract Process
Process.processor = processor
AlarmProcess.processor = processor
"""
okay start the processor
"""
stackless.tasklet(processor.execute)()
"""
okay, let us run some processes
"""
for i in range(0,1000):
processor.createProcess(Process, "/" + str(i), "message" + str(i))
processor.createProcess(AlarmProcess, 10)
"""
and create a HTTP server
"""
reactor.listenTCP(8000, MyHttpFactory())
stackless.tasklet(twistedReactor)()
while (stackless.getruncount() > 1):
stackless.schedule()
log.msg("programme has abnormally terminated?")
#!/usr/bin/env python
"""
BadProcessor.py
Andrew Francis
January 31th, 2009
Track down error in Python 2.6.1/Twisted 8.2
This version should work
<song> Wrapped Around My Finger -The Police </song>
"""
import stackless
import time
import sys
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.internet import reactor
from twisted.web import client
from twisted.web import http
from twisted.python import log, logfile
from twisted.internet import task
from BufferedChannel import BufferedChannel
REQUEST_ERROR = -1
REPLY_ERROR = -1
OKAY = 200
MAX_PROCESSES = 2
__DEBUG__ = False
def tick():
while(True):
print "tick"
stackless.schedule()
class BadProcessor(object):
def __init__(self):
return
def startNetworking(self):
"""
run the Twisted Reactor in its own tasklet
also run a task that makes the Twisted Reactor yield to the Stackless
scheduler
"""
l = task.LoopingCall(stackless.schedule)
l.start(.01)
reactor.run()
if __name__ == "__main__":
processor = BadProcessor()
stackless.tasklet(processor.startNetworking)()
stackless.tasklet(tick)()
stackless.run()
#!/usr/bin/env python
"""
GoodBPELProcessor.py
Andrew Francis
January 31th, 2009
Track down error in Python 2.6.1/Twisted 8.2
This version should work
<song> Hardcore UFOs -Guided By Voices </song>
"""
import stackless
import time
import sys
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.internet import reactor
from twisted.web import client
from twisted.web import http
from twisted.python import log, logfile
from twisted.internet import task
from BufferedChannel import BufferedChannel
REQUEST_ERROR = -1
REPLY_ERROR = -1
OKAY = 200
MAX_PROCESSES = 2
__DEBUG__ = False
class GoodProcessor(object):
def __init__(self):
return
def tick(self):
print "tick"
stackless.schedule()
def startNetworking(self):
"""
run the Twisted Reactor in its own tasklet
also run a task that makes the Twisted Reactor yield to the Stackless
scheduler
"""
l = task.LoopingCall(self.tick)
l.start(.01)
reactor.run()
if __name__ == "__main__":
processor = GoodProcessor()
stackless.tasklet(processor.startNetworking)()
stackless.run()
_______________________________________________
Stackless mailing list
[email protected]
http://www.stackless.com/mailman/listinfo/stackless