Hi NRB,
Neutral Robot Boy wrote:
alright, so i'm still in 'beginner' mode with stackless here. i did a
bit of reading which suggested that stackless should be able to
distribute processing across multiple cores without trouble, and i
decided to write a really simple script and look at how much of a load
it puts on my cpu.
The stackless scheduler which you activate by calling stackless.run()
only runs in one thread. Each tasklet is added to that scheduler and
called in turn. No other core will be used.
A few months ago, Carlos Eduardo de Paulo posted a code example on this
list for Stackless MultiCore. I've reattached the code for you here. I
haven't tested it or anything tho it looks good. You will need the
processing module:
http://pypi.python.org/pypi/processing
Hope that helps
Simon
import processing
from threading import local
import stackless
import time
_locals = local()
running = True
def Sleep(secondsToWait):
channel = stackless.channel()
endTime = time.time() + secondsToWait
_locals.sleepingTasklets.append((endTime, channel))
_locals.sleepingTasklets.sort()
# Block until we get sent an awakening notification.
channel.receive()
def ManageSleepingTasklets(threadID):
global running
_locals.sleepingTasklets = []
while running:
if len(_locals.sleepingTasklets):
endTime = _locals.sleepingTasklets[0][0]
if endTime <= time.time():
channel = _locals.sleepingTasklets[0][1]
del _locals.sleepingTasklets[0]
# We have to send something, but it doesn't matter what as it is not used.
channel.send(None)
elif stackless.getruncount() == 1:
# Give up if there are no more sleeping tasklets. Otherwise the two
# threads keep on running endlessly.
break
stackless.schedule()
def processManagerTasklet(processID, q):
global running
while running:
if not q.empty():
func, args = q.get()
print "Process", processID, "received task", func, args
t = stackless.tasklet(func)(*args)
stackless.schedule()
def processSpawner(processID, q):
stackless.tasklet(ManageSleepingTasklets)(processID)
stackless.tasklet(processManagerTasklet)(processID, q)
stackless.run()
print "Exit process", processID
#--------------------------
processesChannels = {}
mainTasks = processing.Queue()
n_cores = processing.cpuCount()
core = 1
for n in range(1, n_cores):
n = n+1
processesChannels[n] = processing.Queue()
def mainManager():
global running
while running:
#print "Main waiting tasks, active core:", core
if not mainTasks.empty():
func, args = mainTasks.get()
print "Main received task...", func, args
t = stackless.tasklet(func)(*args)
else:
stackless.schedule()
print "Exiting main manager"
running = False
def new(func, *args, **kw):
global core
print "Sending task to core:", core
if core == 1:
mainTasks.put((func, args))
elif core > 1:
processesChannels[core].put((func, args))
if core < n_cores:
core += 1
elif core == n_cores:
core = 1
def run():
for n in range(1, n_cores):
n = n+1
p = processing.Process(target=processSpawner, args=[n, processesChannels[n]])
p.start()
stackless.tasklet(ManageSleepingTasklets)(1)
stackless.tasklet(mainManager)()
stackless.run()
import stackless
from stacklessmulticore import new, Sleep, run
def looping_tasklet(me):
n = 5
while n > 0:
n -= 1
print me, "looping", n
Sleep(1)
print me, "exiting"
def looping_tasklet_r(ch):
print "Spawned receiver.... will sleep for 3 seconds to do a receive()"
Sleep(3)
print "Woke up... receiving from channel...."
msg = ch.receive()
print "Received from other tasklet:", msg
print "Receiver ended..."
def looping_tasklet_s(ch):
print "Sending to other tasklet, I will block."
ch.send("Bla!!!!!")
print "Sender unblocked..."
def monitor(c):
while 1:
if c.balance != 0:
print "----> Monitor: ", c.balance, c.queue
stackless.schedule()
if __name__ == "__main__":
c = stackless.channel()
new(looping_tasklet, 1)
new(monitor, c)
new(looping_tasklet, 2)
new(looping_tasklet_r, c)
new(looping_tasklet_s, c)
new(looping_tasklet, 3)
new(looping_tasklet, 4)
new(looping_tasklet, 5)
run()
import stackless
import time
from stacklessmulticore import *
def doStuff(mult=1):
st = time.time()
c = 0
for i in xrange(int(100000*mult)):
c = c + 1
stackless.schedule()
print 100000*mult, " operations took: " , time.time() - st , " seconds."
def sleepalittle(howmuch):
print "Started sleep for", howmuch, " seconds."
st = time.time()
Sleep(howmuch)
print "Woke after ", howmuch, " seconds. (", time.time()-st, ")"
if __name__ == "__main__":
new(doStuff,1)
new(sleepalittle,5)
new(sleepalittle,0.2)
new(doStuff,2)
new(doStuff,3)
new(sleepalittle,2)
new(sleepalittle,1)
new(sleepalittle,3)
new(doStuff,4)
new(doStuff,5)
run()
_______________________________________________
Stackless mailing list
[email protected]
http://www.stackless.com/mailman/listinfo/stackless