Author: rhs
Date: Sat Nov 14 16:08:05 2009
New Revision: 836200

URL: http://svn.apache.org/viewvc?rev=836200&view=rev
Log:
added support for sender/receiver delete, made tests clean up after themselves, 
split logging of raw bytes and unencoded ops into distinct categories

Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=836200&r1=836199&r2=836200&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Sat Nov 14 16:08:05 2009
@@ -30,6 +30,8 @@
 from util import connect
 
 log = getLogger("qpid.messaging")
+rawlog = getLogger("qpid.messaging.io.raw")
+opslog = getLogger("qpid.messaging.io.ops")
 
 def addr2reply_to(addr):
   name, subject, options = address.parse(addr)
@@ -101,10 +103,11 @@
 
   def write_query(self, query, handler):
     id = self.sent
-    query.sync = True
     self.write_cmd(query, lambda: handler(self.results.pop(id)))
 
   def write_cmd(self, cmd, completion=noop):
+    if completion != noop:
+      cmd.sync = True
     if self.detached:
       raise Exception("detached")
     cmd.id = self.sent
@@ -195,9 +198,9 @@
     try:
       data = self._socket.recv(64*1024)
       if data:
-        log.debug("READ: %r", data)
+        rawlog.debug("READ: %r", data)
       else:
-        log.debug("ABORTED: %s", self._socket.getpeername())
+        rawlog.debug("ABORTED: %s", self._socket.getpeername())
         error = "connection aborted"
         recoverable = True
     except socket.error, e:
@@ -219,7 +222,7 @@
         self._op_dec.write(*self._seg_dec.read())
         for op in self._op_dec.read():
           self.assign_id(op)
-          log.debug("RCVD: %r", op)
+          opslog.debug("RCVD: %r", op)
           op.dispatch(self)
       except VersionError, e:
         error = e
@@ -244,7 +247,7 @@
   def writeable(self):
     try:
       n = self._socket.send(self._buf)
-      log.debug("SENT: %r", self._buf[:n])
+      rawlog.debug("SENT: %r", self._buf[:n])
       self._buf = self._buf[n:]
     except socket.error, e:
       self._error(e, True)
@@ -268,7 +271,7 @@
       self.connection.error = (err,)
 
   def write_op(self, op):
-    log.debug("SENT: %r", op)
+    opslog.debug("SENT: %r", op)
     self._op_enc.write(op)
     self._seg_enc.write(*self._op_enc.read())
     self._frame_enc.write(*self._seg_enc.read())
@@ -446,6 +449,7 @@
     _snd = self._attachments.get(snd)
     if _snd is None and not snd.closing and not snd.closed:
       _snd = Attachment(snd)
+      _snd.closing = False
 
       if snd.target is None:
         snd.error = ("target is None",)
@@ -488,7 +492,7 @@
           return
 
         if result.not_found:
-          if _snd.options.get("create") in ("always", "receiver"):
+          if _snd.options.get("create") in ("always", "sender"):
             sst.write_cmd(QueueDeclare(queue=_snd.name, 
durable=DURABLE_DEFAULT))
             _snd._exchange = ""
             _snd._routing_key = _snd.name
@@ -503,9 +507,15 @@
       sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
       self._attachments[snd] = _snd
 
-    if snd.closing and not snd.closed:
-      del self._attachments[snd]
-      snd.closed = True
+    if snd.closing and not (snd.closed or _snd.closing):
+      _snd.closing = True
+      def do_unlink():
+        del self._attachments[snd]
+        snd.closed = True
+      if _snd.options.get("delete") in ("always", "sender"):
+        self.delete(sst, _snd.name, do_unlink)
+      else:
+        do_unlink()
 
   def link_in(self, rcv):
     sst = self._attachments.get(rcv.session)
@@ -584,16 +594,37 @@
     if rcv.closing and not rcv.closed:
       if rcv.linked:
         if not _rcv.canceled:
-          def close_rcv():
+          def do_unlink():
             del self._attachments[rcv]
             rcv.closed = True
-          sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+          if _rcv.options.get("delete") in ("always", "receiver"):
+            sst.write_cmd(MessageCancel(rcv.destination))
+            self.delete(sst, _rcv.name, do_unlink)
+          else:
+            sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
           _rcv.canceled = True
       else:
         rcv.closed = True
 
+  def delete(self, sst, name, completion):
+    def do_queue_delq(result):
+      if sst.detached:
+        return
+      if result.queue:
+        sst.write_cmd(QueueDelete(name), completion)
+      else:
+        completion()
+    def do_exchange_delq(result):
+      if sst.detached:
+        return
+      if result.not_found:
+        sst.write_query(QueueQuery(name), do_queue_delq)
+      else:
+        sst.write_cmd(ExchangeDelete(name), completion)
+    sst.write_query(ExchangeQuery(name), do_exchange_delq)
+
   def process(self, ssn):
-    if ssn.closing: return
+    if ssn.closed or ssn.closing: return
 
     sst = self._attachments[ssn]
 
@@ -625,7 +656,7 @@
             ssn.acked.remove(m)
             if not ssn.transactional:
               sst.acked.remove(m)
-        sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+        sst.write_cmd(MessageAccept(ids), ack_ack)
         sst.acked.extend(messages)
 
     if ssn.committing and not sst.committing:
@@ -635,7 +666,7 @@
         ssn.committed = True
         ssn.aborting = False
         ssn.aborted = False
-      sst.write_cmd(TxCommit(sync=True), commit_ok)
+      sst.write_cmd(TxCommit(), commit_ok)
       sst.committing = True
 
     if ssn.aborting and not sst.aborting:
@@ -647,7 +678,7 @@
           sst.executed.add_range(range)
         sst.write_op(SessionCompleted(sst.executed))
         sst.write_cmd(MessageRelease(ids))
-        sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+        sst.write_cmd(TxRollback(), do_rb_ok)
 
       def do_rb_ok():
         del ssn.incoming[:]
@@ -670,7 +701,7 @@
 
       for rcv in ssn.receivers:
         sst.write_cmd(MessageStop(rcv.destination))
-      sst.write_cmd(ExecutionSync(sync=True), do_rb)
+      sst.write_cmd(ExecutionSync(), do_rb)
 
   def grant(self, rcv):
     sst = self._attachments[rcv.session]
@@ -702,7 +733,7 @@
         rcv.impending = rcv.received
         _rcv.draining = False
         self.grant(rcv)
-      sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+      sst.write_cmd(MessageStop(rcv.destination), do_stop)
 
     if rcv.draining:
       _rcv.draining = True
@@ -711,7 +742,7 @@
         rcv.granted = rcv.impending
         _rcv.draining = False
         rcv.draining = False
-      sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+      sst.write_cmd(MessageFlush(rcv.destination), do_flush)
 
 
   def process_receiver(self, rcv):
@@ -758,7 +789,7 @@
       sst.outgoing_idx -= 1
       assert msg == m
     sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
-                                  payload=body, sync=True), msg_acked)
+                                  payload=body), msg_acked)
 
   def do_message_transfer(self, xfr):
     sst = self.get_sst(xfr)

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=836200&r1=836199&r2=836200&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sat Nov 14 16:08:05 2009
@@ -436,14 +436,15 @@
     """
     Close the session.
     """
+    # XXX: should be able to express this condition through API calls
+    self._ewait(lambda: not self.outgoing and not self.acked)
+
     for link in self.receivers + self.senders:
       link.close()
 
     self.closing = True
     self._wakeup()
     self._ewait(lambda: self.closed)
-    # XXX: should be able to express this condition through API calls
-    self._ewait(lambda: not self.outgoing and not self.acked)
     self.connection._remove_session(self)
 
 class SendError(SessionError):
@@ -557,10 +558,12 @@
     """
     Close the Sender.
     """
-    # XXX: should make driver do something here
-    if not self.closed:
+    self.closing = True
+    self._wakeup()
+    try:
+      self.session._ewait(lambda: self.closed)
+    finally:
       self.session.senders.remove(self)
-      self.closed = True
 
 class ReceiveError(SessionError):
   pass

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=836200&r1=836199&r2=836200&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sat Nov 14 16:08:05 2009
@@ -25,7 +25,8 @@
 from qpid.tests import Test
 from qpid.harness import Skipped
 from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
-    InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
+    InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \
+    UNLIMITED, uuid4
 from Queue import Queue, Empty as QueueEmpty
 
 class Base(Test):
@@ -66,7 +67,7 @@
       return "%s[%s, %s]" % (base, count, self.test_id)
 
   def ping(self, ssn):
-    PING_Q = 'ping-queue; {create: always}'
+    PING_Q = 'ping-queue; {create: always, delete: always}'
     # send a message
     sender = ssn.sender(PING_Q, durable=self.durable())
     content = self.content("ping")
@@ -173,7 +174,8 @@
     self.conn.close()
     assert not self.conn.connected()
 
-ACK_Q = 'test-ack-queue; {create: always}'
+ACK_QC = 'test-ack-queue; {create: always}'
+ACK_QD = 'test-ack-queue; {delete: always}'
 
 class SessionTests(Base):
 
@@ -185,7 +187,7 @@
     return self.conn.session()
 
   def testSender(self):
-    snd = self.ssn.sender('test-snd-queue; {create: always}',
+    snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
                           durable=self.durable())
     snd2 = self.ssn.sender(snd.target, durable=self.durable())
     assert snd is not snd2
@@ -199,7 +201,7 @@
     self.ssn.acknowledge(msg)
 
   def testReceiver(self):
-    rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
+    rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}')
     rcv2 = self.ssn.receiver(rcv.source)
     assert rcv is not rcv2
     rcv2.close()
@@ -212,7 +214,7 @@
     self.ssn.acknowledge(msg)
 
   def testNextReceiver(self):
-    ADDR = 'test-next-rcv-queue; {create: always}'
+    ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
     rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
     rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
     rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
@@ -241,14 +243,14 @@
   # empty on setup, and possibly also to drain queues on teardown
   def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
-    snd = self.ssn.sender(ACK_Q, durable=self.durable())
+    snd = self.ssn.sender(ACK_QC, durable=self.durable())
     contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
 
     # drain the queue, verify the messages are there and then close
     # without acking
-    rcv = self.ssn.receiver(ACK_Q)
+    rcv = self.ssn.receiver(ACK_QC)
     self.drain(rcv, expected=contents)
     self.ssn.close()
 
@@ -257,7 +259,7 @@
     self.ssn = self.conn.session()
     if ack_capacity is not None:
       self.ssn.ack_capacity = ack_capacity
-    rcv = self.ssn.receiver(ACK_Q)
+    rcv = self.ssn.receiver(ACK_QC)
     self.drain(rcv, expected=contents)
     acker(self.ssn)
     self.ssn.close()
@@ -265,7 +267,7 @@
     # drain the queue a final time and verify that the messages were
     # dequeued
     self.ssn = self.conn.session()
-    rcv = self.ssn.receiver(ACK_Q)
+    rcv = self.ssn.receiver(ACK_QD)
     self.assertEmpty(rcv)
 
   def testAcknowledge(self):
@@ -283,7 +285,7 @@
         pass
     finally:
       self.ssn.ack_capacity = UNLIMITED
-      self.drain(self.ssn.receiver(ACK_Q))
+      self.drain(self.ssn.receiver(ACK_QD))
       self.ssn.acknowledge()
 
   def testAcknowledgeAsyncAckCap1(self):
@@ -306,8 +308,8 @@
     return contents
 
   def txTest(self, commit):
-    TX_Q = 'test-tx-queue; {create: always}'
-    TX_Q_COPY = 'test-tx-queue-copy; {create: always}'
+    TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
+    TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
     txssn = self.conn.session(transactional=True)
     contents = self.send(self.ssn, TX_Q, "txTest", 3)
     txrcv = txssn.receiver(TX_Q)
@@ -337,7 +339,7 @@
     self.txTest(False)
 
   def txTestSend(self, commit):
-    TX_SEND_Q = 'test-tx-send-queue; {create: always}'
+    TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
     txssn = self.conn.session(transactional=True)
     contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
     rcv = self.ssn.receiver(TX_SEND_Q)
@@ -360,11 +362,12 @@
     self.txTestSend(False)
 
   def txTestAck(self, commit):
-    TX_ACK_Q = 'test-tx-ack-queue; {create: always}'
+    TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
+    TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
     txssn = self.conn.session(transactional=True)
-    txrcv = txssn.receiver(TX_ACK_Q)
+    txrcv = txssn.receiver(TX_ACK_QC)
     self.assertEmpty(txrcv)
-    contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
+    contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
     assert contents == self.drain(txrcv)
 
     if commit:
@@ -382,11 +385,11 @@
     txssn.close()
 
     txssn = self.conn.session(transactional=True)
-    txrcv = txssn.receiver(TX_ACK_Q)
+    txrcv = txssn.receiver(TX_ACK_QC)
     assert contents == self.drain(txrcv)
     txssn.acknowledge()
     txssn.commit()
-    rcv = self.ssn.receiver(TX_ACK_Q)
+    rcv = self.ssn.receiver(TX_ACK_QD)
     self.assertEmpty(rcv)
     txssn.close()
     self.assertEmpty(rcv)
@@ -405,7 +408,7 @@
     except Disconnected:
       pass
 
-RECEIVER_Q = 'test-receiver-queue; {create: always}'
+RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
 
 class ReceiverTests(Base):
 
@@ -482,28 +485,6 @@
 
     self.ssn.acknowledge()
 
-  def testPending(self):
-    self.rcv.capacity = UNLIMITED
-    assert self.rcv.pending() == 0
-
-    for i in range(3):
-      self.send("testPending", i)
-    self.sleep()
-    assert self.rcv.pending() == 3
-
-    for i in range(3, 10):
-      self.send("testPending", i)
-    self.sleep()
-    assert self.rcv.pending() == 10
-
-    self.drain(self.rcv, limit=3)
-    assert self.rcv.pending() == 7
-
-    self.drain(self.rcv)
-    assert self.rcv.pending() == 0
-
-    self.ssn.acknowledge()
-
   def testCapacity(self):
     self.rcv.capacity = 5
     self.assertPending(self.rcv, 0)
@@ -537,8 +518,75 @@
 
     self.ssn.acknowledge()
 
+  def testPending(self):
+    self.rcv.capacity = UNLIMITED
+    assert self.rcv.pending() == 0
+
+    for i in range(3):
+      self.send("testPending", i)
+    self.sleep()
+    assert self.rcv.pending() == 3
+
+    for i in range(3, 10):
+      self.send("testPending", i)
+    self.sleep()
+    assert self.rcv.pending() == 10
+
+    self.drain(self.rcv, limit=3)
+    assert self.rcv.pending() == 7
+
+    self.drain(self.rcv)
+    assert self.rcv.pending() == 0
+
+    self.ssn.acknowledge()
+
   # XXX: need testClose
 
+class AddressTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def testDeleteBySender(self):
+    snd = self.ssn.sender("test-delete; {create: always}")
+    snd.send("ping")
+    snd.close()
+    snd = self.ssn.sender("test-delete; {delete: always}")
+    snd.send("ping")
+    snd.close()
+    snd = self.ssn.sender("test-delete")
+    try:
+      snd.send("ping")
+    except SendError, e:
+      assert "no such queue" in str(e)
+
+  def testDeleteByReceiver(self):
+    rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
+    try:
+      rcv.fetch(0)
+    except Empty:
+      pass
+    rcv.close()
+
+    try:
+      self.ssn.receiver("test-delete")
+    except SendError, e:
+      assert "no such queue" in str(e)
+
+  def testDeleteSpecial(self):
+    snd = self.ssn.sender("amq.topic; {delete: always}")
+    snd.send("asdf")
+    try:
+      snd.close()
+    except SessionError, e:
+      assert "Cannot delete default exchange" in str(e)
+    # XXX: need to figure out close after error
+    self.conn._remove_session(self.ssn)
+
 NOSUCH_Q = "this-queue-should-not-exist"
 UNPARSEABLE_ADDR = "name/subject; {bad options"
 UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
@@ -606,7 +654,7 @@
     self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
                         lambda e: "unrecognized characters" in str(e))
 
-SENDER_Q = 'test-sender-q; {create: always}'
+SENDER_Q = 'test-sender-q; {create: always, delete: always}'
 
 class SenderTests(Base):
 
@@ -715,7 +763,7 @@
     m.content = u"<html/>"
     assert m.content_type == "text/html; charset=utf8"
 
-ECHO_Q = 'test-message-echo-queue; {create: always}'
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
 
 class MessageEchoTests(Base):
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to