Author: rhs
Date: Mon Nov 16 12:05:50 2009
New Revision: 880720

URL: http://svn.apache.org/viewvc?rev=880720&view=rev
Log:
added address support for specifying node type and properties

Modified:
    qpid/trunk/qpid/python/qpid/address.py
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/ops.py
    qpid/trunk/qpid/python/qpid/tests/address.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/address.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/address.py?rev=880720&r1=880719&r2=880720&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/address.py (original)
+++ qpid/trunk/qpid/python/qpid/address.py Mon Nov 16 12:05:50 2009
@@ -38,7 +38,7 @@
 SLASH = Type("SLASH", r"/")
 COMMA = Type("COMMA", r",")
 NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
-ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*')
+ID = Type("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?')
 STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
 ESC = Type("ESC", 
r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]")
 SYM = Type("SYM", r"[....@$^!+-]")

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=880720&r1=880719&r2=880720&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Mon Nov 16 12:05:50 2009
@@ -471,43 +471,20 @@
       if _snd.options is None:
         _snd.options = {}
 
-      def do_link():
-        snd.linked = True
-
-      def do_queue_q(result):
-        if sst.detached:
-          return
-
-        if result.queue:
+      def do_link(type, subtype):
+        if type == "topic":
+          _snd._exchange = _snd.name
+          _snd._routing_key = _snd.subject
+        elif type == "queue":
           _snd._exchange = ""
           _snd._routing_key = _snd.name
-          do_link()
-        else:
-          snd.error = ("no such queue: %s" % _snd.name,)
-          del self._attachments[snd]
-          snd.closed = True
 
-      def do_exchange_q(result):
-        if sst.detached:
-          return
-
-        if result.not_found:
-          if _snd.options.get("create") in ("always", "sender"):
-            sst.write_cmd(QueueDeclare(queue=_snd.name, 
durable=DURABLE_DEFAULT))
-            _snd._exchange = ""
-            _snd._routing_key = _snd.name
-          else:
-            sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
-            return
-        else:
-          _snd._exchange = _snd.name
-          _snd._routing_key = _snd.subject
-        do_link()
+        snd.linked = True
 
-      sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
+      self.resolve_declare(sst, _snd, "sender", do_link)
       self._attachments[snd] = _snd
 
-    if snd.closing and not (snd.closed or _snd.closing):
+    if snd.linked and snd.closing and not (snd.closed or _snd.closing):
       _snd.closing = True
       def do_unlink():
         del self._attachments[snd]
@@ -545,38 +522,13 @@
       if _rcv.options is None:
         _rcv.options = {}
 
-      def do_link():
-        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, 
destination=rcv.destination))
-        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
-        rcv.linked = True
-
-      def do_queue_q(result):
-        if sst.detached:
-          return
-        if result.queue:
-          _rcv._queue = _rcv.name
-          do_link()
-        else:
-          rcv.error = ("no such queue: %s" % _rcv.name,)
-          del self._attachments[rcv]
-          rcv.closed = True
-
-      def do_exchange_q(result):
-        if sst.detached:
-          return
-        if result.not_found:
-          if _rcv.options.get("create") in ("always", "receiver"):
-            _rcv._queue = _rcv.name
-            sst.write_cmd(QueueDeclare(queue=_rcv._queue, 
durable=DURABLE_DEFAULT))
-          else:
-            sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
-            return
-        else:
+      def do_link(type, subtype):
+        if type == "topic":
           _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
           sst.write_cmd(QueueDeclare(queue=_rcv._queue, 
durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
           filter = _rcv.options.get("filter")
           if _rcv.subject is None and filter is None:
-            f = FILTER_DEFAULTS[result.type]
+            f = FILTER_DEFAULTS[subtype]
           elif _rcv.subject and filter:
             # XXX
             raise Exception("can't supply both subject and filter")
@@ -587,41 +539,101 @@
           else:
             f = filter
           f._bind(sst, _rcv.name, _rcv._queue)
-        do_link()
-      sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
+        elif type == "queue":
+          _rcv._queue = _rcv.name
+
+        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, 
destination=rcv.destination))
+        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+        rcv.linked = True
+
+      self.resolve_declare(sst, _rcv, "receiver", do_link)
       self._attachments[rcv] = _rcv
 
-    if rcv.closing and not rcv.closed:
-      if rcv.linked:
-        if not _rcv.canceled:
-          def do_unlink():
-            del self._attachments[rcv]
-            rcv.closed = True
-          if _rcv.options.get("delete") in ("always", "receiver"):
-            sst.write_cmd(MessageCancel(rcv.destination))
-            self.delete(sst, _rcv.name, do_unlink)
-          else:
-            sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
-          _rcv.canceled = True
-      else:
-        rcv.closed = True
+    if rcv.linked and rcv.closing and not rcv.closed:
+      if not _rcv.canceled:
+        def do_unlink():
+          del self._attachments[rcv]
+          rcv.closed = True
+        if _rcv.options.get("delete") in ("always", "receiver"):
+          sst.write_cmd(MessageCancel(rcv.destination))
+          self.delete(sst, _rcv.name, do_unlink)
+        else:
+          sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
+        _rcv.canceled = True
 
-  def delete(self, sst, name, completion):
-    def do_queue_delq(result):
-      if sst.detached:
-        return
-      if result.queue:
-        sst.write_cmd(QueueDelete(name), completion)
+  def resolve_declare(self, sst, lnk, dir, action):
+    def do_resolved(er, qr):
+      if er.not_found and not qr.queue:
+        if lnk.options.get("create") in ("always", dir):
+          err = self.declare(sst, lnk.name, lnk.options, action)
+        else:
+          err = ("no such queue: %s" % lnk.name,)
+
+        if err:
+          tgt = lnk.target
+          tgt.error = err
+          del self._attachments[tgt]
+          tgt.closed = True
+          return
+      elif qr.queue:
+        action("queue", None)
       else:
-        completion()
-    def do_exchange_delq(result):
-      if sst.detached:
-        return
-      if result.not_found:
-        sst.write_query(QueueQuery(name), do_queue_delq)
+        action("topic", er.type)
+    self.resolve(sst, lnk.name, do_resolved)
+
+  def resolve(self, sst, name, action):
+    args = []
+    def do_result(r):
+      args.append(r)
+    def do_action(r):
+      do_result(r)
+      action(*args)
+    sst.write_query(ExchangeQuery(name), do_result)
+    sst.write_query(QueueQuery(name), do_action)
+
+  def declare(self, sst, name, options, action):
+    opts = dict(options)
+    props = dict(opts.pop("node-properties", {}))
+    durable = props.pop("durable", DURABLE_DEFAULT)
+    type = props.pop("type", "queue")
+    xprops = dict(props.pop("x-properties", {}))
+
+    if props:
+      return ("unrecognized option(s): %s" % "".join(props.keys()),)
+
+    if type == "topic":
+      cmd = ExchangeDeclare(exchange=name, durable=durable)
+    elif type == "queue":
+      cmd = QueueDeclare(queue=name, durable=durable)
+    else:
+      return ("unrecognized type, must be topic or queue: %s" % type,)
+
+    for f in cmd.FIELDS:
+      if f.name != "arguments" and xprops.has_key(f.name):
+        cmd[f.name] = xprops.pop(f.name)
+    if xprops:
+      cmd.arguments = xprops
+
+    if type == "topic":
+      if cmd.type is None:
+        cmd.type = "topic"
+      subtype = cmd.type
+    else:
+      subtype = None
+
+    def do_action():
+      action(type, subtype)
+    sst.write_cmd(cmd, do_action)
+
+  def delete(self, sst, name, action):
+    def do_delete(er, qr):
+      if not er.not_found:
+        sst.write_cmd(ExchangeDelete(name), action)
+      elif qr.queue:
+        sst.write_cmd(QueueDelete(name), action)
       else:
-        sst.write_cmd(ExchangeDelete(name), completion)
-    sst.write_query(ExchangeQuery(name), do_exchange_delq)
+        action()
+    self.resolve(sst, name, do_delete)
 
   def process(self, ssn):
     if ssn.closed or ssn.closing: return

Modified: qpid/trunk/qpid/python/qpid/ops.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/ops.py?rev=880720&r1=880719&r2=880720&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/ops.py (original)
+++ qpid/trunk/qpid/python/qpid/ops.py Mon Nov 16 12:05:50 2009
@@ -259,8 +259,8 @@
 COMMANDS = {}
 CONTROLS = {}
 
-for name, bases, dict in types:
-  t = type(name, bases, dict)
+for name, bases, _dict in types:
+  t = type(name, bases, _dict)
   vars[name] = t
 
   if issubclass(t, Command):

Modified: qpid/trunk/qpid/python/qpid/tests/address.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/address.py?rev=880720&r1=880719&r2=880720&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/address.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/address.py Mon Nov 16 12:05:50 2009
@@ -18,10 +18,35 @@
 #
 
 from qpid.tests import Test
-from qpid.address import parse, ParseError
+from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE
 
 class AddressTests(Test):
 
+  def lex(self, addr, *types):
+    toks = [t.type for t in lex(addr) if t.type not in (WSPACE, EOF)]
+    assert list(types) == toks, "expected %s, got %s" % (types, toks)
+
+  def testDashInId1(self):
+    self.lex("foo-bar", ID)
+
+  def testDashInId2(self):
+    self.lex("foo-3", ID)
+
+  def testDashAlone1(self):
+    self.lex("foo - bar", ID, SYM, ID)
+
+  def testDashAlone2(self):
+    self.lex("foo - 3", ID, SYM, NUMBER)
+
+  def testLeadingDash(self):
+    self.lex("-foo", SYM, ID)
+
+  def testTrailingDash(self):
+    self.lex("foo-", ID, SYM)
+
+  def testNegativeNum(self):
+    self.lex("-3", NUMBER)
+
   def valid(self, addr, name=None, subject=None, options=None):
     expected = (name, subject, options)
     got = parse(addr)

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=880720&r1=880719&r2=880720&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Mon Nov 16 12:05:50 2009
@@ -201,7 +201,7 @@
     self.ssn.acknowledge(msg)
 
   def testReceiver(self):
-    rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}')
+    rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
     rcv2 = self.ssn.receiver(rcv.source)
     assert rcv is not rcv2
     rcv2.close()
@@ -212,6 +212,7 @@
     msg = rcv.fetch(0)
     assert msg.content == content
     self.ssn.acknowledge(msg)
+    snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
 
   def testNextReceiver(self):
     ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
@@ -551,6 +552,44 @@
   def setup_session(self):
     return self.conn.session()
 
+  def testBadOption(self):
+    snd = self.ssn.sender("test-bad-option; {create: always, node-properties: 
{this-property-does-not-exist: 3}}")
+    try:
+      snd.send("ping")
+    except SendError, e:
+      assert "unrecognized option" in str(e)
+
+  def testCreateQueue(self):
+    snd = self.ssn.sender("test-create-queue; {create: always, delete: always, 
"
+                          "node-properties: {type: queue, durable: False, "
+                          "x-properties: {auto_delete: true}}}")
+    content = self.content("testCreateQueue")
+    snd.send(content)
+    rcv = self.ssn.receiver("test-create-queue")
+    self.drain(rcv, expected=[content])
+
+  def testCreateExchange(self):
+    snd = self.ssn.sender("test-create-exchange; {create: always, "
+                          "delete: always, node-properties: {type: topic, "
+                          "durable: False, x-properties: {auto_delete: 
true}}}")
+    snd.send("ping")
+    rcv1 = self.ssn.receiver("test-create-exchange/first")
+    rcv2 = self.ssn.receiver("test-create-exchange/second")
+    rcv3 = self.ssn.receiver("test-create-exchange")
+    for r in (rcv1, rcv2, rcv3):
+      try:
+        r.fetch(0)
+        assert False
+      except Empty:
+        pass
+    msg1 = Message(self.content("testCreateExchange", 1), subject="first")
+    msg2 = Message(self.content("testCreateExchange", 1), subject="second")
+    snd.send(msg1)
+    snd.send(msg2)
+    self.drain(rcv1, expected=[msg1.content])
+    self.drain(rcv2, expected=[msg2.content])
+    self.drain(rcv3, expected=[msg1.content, msg2.content])
+
   def testDeleteBySender(self):
     snd = self.ssn.sender("test-delete; {create: always}")
     snd.send("ping")



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to