I've pushed recoco.consumer to the carp branch which has a couple generic consumer classes. Using those, you can write a simple component like:
from pox.core import core from pox.lib.recoco.consumer import BaseConsumer import pox.openflow.libopenflow_01 as of class MyConsumer (BaseConsumer): def _do_work (self, work): # Work is (dpid,packet_in) dpid,packet_in = work po = of.ofp_packet_out(data = packet_in) po.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD)) connection = core.openflow.getConnection(dpid) if connection is None: self.log.warn("Connection lost before sending packet") return connection.send(po) def handle_PacketIn (event): consumer.add_work((event.dpid,event.ofp)) def launch (): global consumer consumer = MyConsumer() core.openflow.addListenerByName("PacketIn", handle_PacketIn) Here, the normal OpenFlow event Task is acting like a producer, pushing work items (which in this case are a DPID and a packet_in) to a simple consumer which just floods the packet back out. So it's a dumb hub, but written producer-consumer style. BaseConsumer's initializer has some parameters for maximum batch size and Task priority. More comments below. On Mar 23, 2013, at 3:59 PM, Tmusic wrote: > I'm still a bit confused about the work producing and consuming as it's > implemented now. So there is the main task loop in OpenFlow_01_Task which > loops over the connections and calls the read function on each of them. This > read functions calls the appropriate handler which in its turn fires the > appropriate event on the pox core (which are then further handled). So > everything would be processed connection by connection... > > But if I understand you correctly, the handlers called by the read function > put the jobs in a queue, which is than emptied by a separate task loop (which > I can't find at the moment). Can you give a hint where (in the code) the task > loop runs that empties the queue and where the filling of the queue exactly > happens? Ah, we have miscommunicated. There's no queue in general. The OpenFlow events are entirely (or almost entirely?) raised by a single Task with a select loop (OpenFlow_01_Task or whatever). It raises them directly. The miscommunication I believe stems from me saying, "The OpenFlow event handlers are producers that fill a work queue and then you have a consumer in the form of a recoco Task that tries to drain the queue." I wasn't describing how it works now. I was describing the solution to your problem. The example above *does* implement this. The idea being rather than do expensive processing directly in the handlers, you're better off handling them quickly by just shoving them off at a work queue which can try to handle them later (and perhaps with more flexible priorities). The new BaseConsumer/FlexConsumer classes are meant to simplify this pattern. (They're based on the producer/consumer example I posed a few days ago, but now it's generic.) -- Murphy