Author: rhs
Date: Tue Jan 19 21:29:33 2010
New Revision: 900967

URL: http://svn.apache.org/viewvc?rev=900967&view=rev
Log:
fixed bug in destination/receiver correlation

Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=900967&r1=900966&r2=900967&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Tue Jan 19 21:29:33 2010
@@ -101,6 +101,8 @@
 
     # XXX: need to periodically exchange completion/known_completion
 
+    self.destinations = {}
+
   def write_query(self, query, handler):
     id = self.sent
     self.write_cmd(query, lambda: handler(self.results.pop(id)))
@@ -500,6 +502,8 @@
     _rcv = self._attachments.get(rcv)
     if _rcv is None and not rcv.closing and not rcv.closed:
       _rcv = Attachment(rcv)
+      _rcv.destination = str(rcv.id)
+      sst.destinations[_rcv.destination] = _rcv
       _rcv.canceled = False
       _rcv.draining = False
 
@@ -525,7 +529,7 @@
 
       def do_link(type, subtype):
         if type == "topic":
-          _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+          _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
           sst.write_cmd(QueueDeclare(queue=_rcv._queue, 
durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
           filter = _rcv.options.get("filter")
           if _rcv.subject is None and filter is None:
@@ -543,8 +547,8 @@
         elif type == "queue":
           _rcv._queue = _rcv.name
 
-        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, 
destination=rcv.destination))
-        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, 
destination=_rcv.destination))
+        sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit))
         rcv.linked = True
 
       self.resolve_declare(sst, _rcv, "receiver", do_link)
@@ -554,12 +558,13 @@
       if not _rcv.canceled:
         def do_unlink():
           del self._attachments[rcv]
+          del sst.destinations[_rcv.destination]
           rcv.closed = True
         if _rcv.options.get("delete") in ("always", "receiver"):
-          sst.write_cmd(MessageCancel(rcv.destination))
+          sst.write_cmd(MessageCancel(_rcv.destination))
           self.delete(sst, _rcv.name, do_unlink)
         else:
-          sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
+          sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
         _rcv.canceled = True
 
   def resolve_declare(self, sst, lnk, dir, action):
@@ -725,7 +730,8 @@
         sst.aborting = False
 
       for rcv in ssn.receivers:
-        sst.write_cmd(MessageStop(rcv.destination))
+        _rcv = self._attachments[rcv]
+        sst.write_cmd(MessageStop(_rcv.destination))
       sst.write_cmd(ExecutionSync(), do_rb)
 
   def grant(self, rcv):
@@ -745,12 +751,12 @@
       delta = max(rcv.granted, rcv.received) - rcv.impending
 
     if delta is UNLIMITED:
-      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, 
UNLIMITED.value))
-      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, 
UNLIMITED.value))
+      sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, 
UNLIMITED.value))
+      sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, 
UNLIMITED.value))
       rcv.impending = UNLIMITED
     elif delta > 0:
-      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, 
UNLIMITED.value))
-      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
+      sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, 
UNLIMITED.value))
+      sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
       rcv.impending += delta
     elif delta < 0 and not rcv.draining:
       _rcv.draining = True
@@ -758,7 +764,7 @@
         rcv.impending = rcv.received
         _rcv.draining = False
         self.grant(rcv)
-      sst.write_cmd(MessageStop(rcv.destination), do_stop)
+      sst.write_cmd(MessageStop(_rcv.destination), do_stop)
 
     if rcv.draining:
       _rcv.draining = True
@@ -767,7 +773,7 @@
         rcv.granted = rcv.impending
         _rcv.draining = False
         rcv.draining = False
-      sst.write_cmd(MessageFlush(rcv.destination), do_flush)
+      sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
 
 
   def process_receiver(self, rcv):
@@ -821,7 +827,7 @@
     ssn = sst.session
 
     msg = self._decode(xfr)
-    rcv = ssn.receivers[int(xfr.destination)]
+    rcv = sst.destinations[xfr.destination].target
     msg._receiver = rcv
     if rcv.impending is not UNLIMITED:
       assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, 
rcv.impending)

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=900967&r1=900966&r2=900967&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Tue Jan 19 21:29:33 2010
@@ -432,7 +432,9 @@
     self.aborting = False
     self.aborted = False
 
+    self.next_sender_id = 0
     self.senders = []
+    self.next_receiver_id = 0
     self.receivers = []
     self.outgoing = []
     self.incoming = []
@@ -477,7 +479,8 @@
     @rtype: Sender
     @return: a new Sender for the specified target
     """
-    sender = Sender(self, len(self.senders), target, options)
+    sender = Sender(self, self.next_sender_id, target, options)
+    self.next_sender_id += 1
     self.senders.append(sender)
     self._wakeup()
     # XXX: because of the lack of waiting here we can end up getting
@@ -497,7 +500,8 @@
     @rtype: Receiver
     @return: a new Receiver for the specified source
     """
-    receiver = Receiver(self, len(self.receivers), source, options)
+    receiver = Receiver(self, self.next_receiver_id, source, options)
+    self.next_receiver_id += 1
     self.receivers.append(receiver)
     self._wakeup()
     return receiver
@@ -630,9 +634,9 @@
   Sends outgoing messages.
   """
 
-  def __init__(self, session, index, target, options):
+  def __init__(self, session, id, target, options):
     self.session = session
-    self.index = index
+    self.id = id
     self.target = target
     self.options = options
     self.capacity = options.get("capacity", UNLIMITED)
@@ -753,10 +757,9 @@
   fetched with L{fetch}.
   """
 
-  def __init__(self, session, index, source, options):
+  def __init__(self, session, id, source, options):
     self.session = session
-    self.index = index
-    self.destination = str(self.index)
+    self.id = id
     self.source = source
     self.options = options
 

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=900967&r1=900966&r2=900967&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Tue Jan 19 21:29:33 2010
@@ -541,6 +541,28 @@
 
     self.ssn.acknowledge()
 
+  def testDoubleClose(self):
+    m1 = self.content("testDoubleClose", 1)
+    m2 = self.content("testDoubleClose", 2)
+
+    snd = self.ssn.sender("""test-double-close; {
+  create: always,
+  delete: sender,
+  node-properties: {
+    type: topic
+  }
+}
+""")
+    r1 = self.ssn.receiver(snd.target)
+    r2 = self.ssn.receiver(snd.target)
+    snd.send(m1)
+    self.drain(r1, expected=[m1])
+    self.drain(r2, expected=[m1])
+    r1.close()
+    snd.send(m2)
+    self.drain(r2, expected=[m2])
+    r2.close()
+
   # XXX: need testClose
 
 class AddressTests(Base):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to