I've pushed recoco.consumer to the carp branch which has a couple generic 
consumer classes.  Using those, you can write a simple component like:

from pox.core import core
from pox.lib.recoco.consumer import BaseConsumer
import pox.openflow.libopenflow_01 as of

class MyConsumer (BaseConsumer):
  def _do_work (self, work):
    # Work is (dpid,packet_in)
    dpid,packet_in = work
    po = of.ofp_packet_out(data = packet_in)
    po.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))

    connection = core.openflow.getConnection(dpid)
    if connection is None:
      self.log.warn("Connection lost before sending packet")
      return
    connection.send(po)

def handle_PacketIn (event):
  consumer.add_work((event.dpid,event.ofp))

def launch ():
  global consumer
  consumer = MyConsumer()
  core.openflow.addListenerByName("PacketIn", handle_PacketIn)

Here, the normal OpenFlow event Task is acting like a producer, pushing work 
items (which in this case are a DPID and a packet_in) to a simple consumer 
which just floods the packet back out.  So it's a dumb hub, but written 
producer-consumer style.  BaseConsumer's initializer has some parameters for 
maximum batch size and Task priority.

More comments below.

On Mar 23, 2013, at 3:59 PM, Tmusic wrote:
> I'm still a bit confused about the work producing and consuming as it's 
> implemented now. So there is the main task loop in OpenFlow_01_Task which 
> loops over the connections and calls the read function on each of them. This 
> read functions calls the appropriate handler which in its turn fires the 
> appropriate event on the pox core (which are then further handled). So 
> everything would be processed connection by connection...
> 
> But if I understand you correctly, the handlers called by the read function 
> put the jobs in a queue, which is than emptied by a separate task loop (which 
> I can't find at the moment). Can you give a hint where (in the code) the task 
> loop runs that empties the queue and where the filling of the queue exactly 
> happens?

Ah, we have miscommunicated.  There's no queue in general.  The OpenFlow events 
are entirely (or almost entirely?) raised by a single Task with a select loop 
(OpenFlow_01_Task or whatever).  It raises them directly.

The miscommunication I believe stems from me saying, "The OpenFlow event 
handlers are producers that fill a work queue and then you have a consumer in 
the form of a recoco Task that tries to drain the queue."  I wasn't describing 
how it works now.  I was describing the solution to your problem.

The example above *does* implement this.  The idea being rather than do 
expensive processing directly in the handlers, you're better off handling them 
quickly by just shoving them off at a work queue which can try to handle them 
later (and perhaps with more flexible priorities).  The new 
BaseConsumer/FlexConsumer classes are meant to simplify this pattern.  (They're 
based on the producer/consumer example I posed a few days ago, but now it's 
generic.)

-- Murphy

Reply via email to