Author: rhs
Date: Thu Dec 20 16:17:45 2012
New Revision: 1424564

URL: http://svn.apache.org/viewvc?rev=1424564&view=rev
Log:
fixed messenger's windowing behavior; removed accept mode as a configuration 
option as with proper windowing behavior, setting the accept mode to manual 
seems mostly equivalent to setting a non-zero incoming window

Removed:
    
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/src/messenger.c
    
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
    qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
    qpid/proton/trunk/tests/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Dec 20 16:17:45 
2012
@@ -90,15 +90,6 @@ STATUSES = {
 AUTOMATIC = Constant("AUTOMATIC")
 MANUAL = Constant("MANUAL")
 
-_ACCEPT_MODE2CONST = {
-  PN_ACCEPT_MODE_AUTO: AUTOMATIC,
-  PN_ACCEPT_MODE_MANUAL: MANUAL
-  }
-_CONST2ACCEPT_MODE = {
-  AUTOMATIC: PN_ACCEPT_MODE_AUTO,
-  MANUAL: PN_ACCEPT_MODE_MANUAL
-  }
-
 class Messenger(object):
   """
   The L{Messenger} class defines a high level interface for sending
@@ -259,21 +250,6 @@ The timeout property contains the defaul
 operations performed by the L{Messenger}.
 """)
 
-  def _get_accept_mode(self):
-    return _ACCEPT_MODE2CONST(pn_messenger_get_accept_mode(self._mng))
-
-  def _set_accept_mode(self, mode):
-    mode = _CONST2ACCEPT_MODE[mode]
-    self._check(pn_messenger_set_accept_mode(self._mng, mode))
-
-  accept_mode = property(_get_accept_mode, _set_accept_mode,
-                         doc="""
-The accept mode for the messenger. Can be set to AUTOMATIC or MANUAL.
-The default is AUTOMATIC. If set to MANUAL, then every incoming
-message must be accepted or rejected (either individually or
-cummulatively) via the accept and reject methods.
-""")
-
   def _get_incoming_window(self):
     return pn_messenger_get_incoming_window(self._mng)
 

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Thu Dec 20 16:17:45 2012
@@ -155,32 +155,17 @@ void pn_queue_del(pn_queue_t *queue, pn_
   }
 }
 
-pn_sequence_t pn_queue_add(pn_queue_t *queue, pn_delivery_t *delivery)
-{
-  pn_sequence_t id = queue->hwm++;
-  size_t offset = id - queue->lwm;
-  PN_ENSUREZ(queue->deliveries, queue->capacity, offset + 1);
-  assert(offset >= 0 && offset < queue->capacity);
-  queue->deliveries[offset] = delivery;
-  pn_delivery_set_context(delivery, (void *) (intptr_t) id);
-  pn_connection_t *conn =
-    pn_session_connection(pn_link_session(pn_delivery_link(delivery)));
-  pn_incref(conn);
-  return id;
-}
-
 void pn_queue_slide(pn_queue_t *queue)
 {
   if (queue->window >= 0) {
     while (queue->hwm - queue->lwm > queue->window) {
       pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
       if (d) {
-        if (pn_delivery_local_state(d)) {
-          pn_delivery_settle(d);
-          pn_queue_del(queue, d);
-        } else {
-          break;
+        if (!pn_delivery_local_state(d)) {
+          pn_delivery_update(d, PN_ACCEPTED);
         }
+        pn_delivery_settle(d);
+        pn_queue_del(queue, d);
       } else {
         pn_queue_gc(queue);
       }
@@ -189,6 +174,21 @@ void pn_queue_slide(pn_queue_t *queue)
   pn_queue_gc(queue);
 }
 
+pn_sequence_t pn_queue_add(pn_queue_t *queue, pn_delivery_t *delivery)
+{
+  pn_sequence_t id = queue->hwm++;
+  size_t offset = id - queue->lwm;
+  PN_ENSUREZ(queue->deliveries, queue->capacity, offset + 1);
+  assert(offset >= 0 && offset < queue->capacity);
+  queue->deliveries[offset] = delivery;
+  pn_delivery_set_context(delivery, (void *) (intptr_t) id);
+  pn_connection_t *conn =
+    pn_session_connection(pn_link_session(pn_delivery_link(delivery)));
+  pn_incref(conn);
+  pn_queue_slide(queue);
+  return id;
+}
+
 int pn_queue_update(pn_queue_t *queue, pn_sequence_t id, pn_status_t status,
                     int flags, bool settle, bool match)
 {
@@ -229,7 +229,7 @@ int pn_queue_update(pn_queue_t *queue, p
     }
   }
 
-  pn_queue_slide(queue);
+  pn_queue_gc(queue);
 
   return 0;
 }
@@ -472,8 +472,6 @@ void pn_messenger_endpoints(pn_messenger
     d = pn_work_next(d);
   }
 
-  pn_queue_slide(&messenger->outgoing);
-
   if (pn_work_head(conn)) {
     return;
   }
@@ -989,7 +987,7 @@ pn_queue_t *pn_tracker_queue(pn_messenge
 
 static pn_status_t disp2status(pn_disposition_t disp)
 {
-  if (!disp) return PN_STATUS_PENDING;
+  if (!disp) return PN_STATUS_UNKNOWN;
 
   switch (disp) {
   case PN_ACCEPTED:
@@ -1127,22 +1125,10 @@ int pn_messenger_get(pn_messenger_t *mes
             return pn_error_format(messenger->error, err, "error decoding 
message: %s",
                                    pn_message_error(msg));
           } else {
-            if (messenger->accept_mode == PN_ACCEPT_MODE_AUTO) {
-              return pn_messenger_accept(messenger,
-                                         
pn_messenger_incoming_tracker(messenger),
-                                         0);
-            } else {
-              return 0;
-            }
-          }
-        } else {
-          if (messenger->accept_mode == PN_ACCEPT_MODE_AUTO) {
-            return pn_messenger_accept(messenger,
-                                       
pn_messenger_incoming_tracker(messenger),
-                                       0);
-          } else {
             return 0;
           }
+        } else {
+          return 0;
         }
       }
       d = pn_work_next(d);

Modified: 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
 Thu Dec 20 16:17:45 2012
@@ -145,9 +145,6 @@ public interface Messenger
      */
     int incoming();
 
-    AcceptMode getAcceptMode();
-    void setAcceptMode(AcceptMode mode);
-
     int getIncomingWindow();
     void setIncomingWindow(int window);
 

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 Thu Dec 20 16:17:45 2012
@@ -43,7 +43,6 @@ import org.apache.qpid.proton.driver.imp
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.apache.qpid.proton.messenger.AcceptMode;
 import org.apache.qpid.proton.messenger.Messenger;
 import org.apache.qpid.proton.messenger.MessengerException;
 import org.apache.qpid.proton.messenger.Status;
@@ -68,7 +67,6 @@ public class MessengerImpl implements Me
     private Driver _driver;
     private int _credit;
     private int _distributed;
-    private AcceptMode _acceptMode = AcceptMode.AUTO;
     private TrackerQueue _incoming = new TrackerQueue();
     private TrackerQueue _outgoing = new TrackerQueue();
 
@@ -194,9 +192,6 @@ public class MessengerImpl implements Me
                     Message message = new MessageImpl();
                     message.decode(_buffer, 0, size);
                     _incoming.add(delivery);
-                    if (_acceptMode == AcceptMode.AUTO) {
-                        _incoming.accept(incomingTracker());
-                    }
                     _distributed--;
                     delivery.getLink().advance();
                     return message;
@@ -250,15 +245,6 @@ public class MessengerImpl implements Me
     }
 
 
-    public AcceptMode getAcceptMode()
-    {
-        return _acceptMode;
-    }
-    public void setAcceptMode(AcceptMode mode)
-    {
-        _acceptMode = mode;
-    }
-
     public int getIncomingWindow()
     {
         return _incoming.getWindow();

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
 Thu Dec 20 16:17:45 2012
@@ -77,6 +77,7 @@ class TrackerQueue
         }
         int sequence = _hwm++;
         _deliveries.add(delivery);
+        slide();
     }
 
     Status getStatus(Tracker tracker)
@@ -139,7 +140,7 @@ class TrackerQueue
                 Delivery d = _deliveries.get(0);
                 if (d.getLocalState() == null)
                 {
-                    return;
+                    d.disposition(ACCEPTED);
                 }
 
                 d.settle();
@@ -186,7 +187,6 @@ class TrackerQueue
                 operation.apply(d);
             }
         }
-        slide();
     }
 
     private static interface DeliveryOperation

Modified: qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py Thu Dec 20 
16:17:45 2012
@@ -24,7 +24,7 @@ from org.apache.qpid.proton.engine.impl 
 from org.apache.qpid.proton.engine.impl.ssl import SslDomainImpl, 
SslPeerDetailsImpl
 from org.apache.qpid.proton.message import MessageFormat
 from org.apache.qpid.proton.message.impl import MessageImpl
-from org.apache.qpid.proton.messenger import AcceptMode, MessengerException, 
Status
+from org.apache.qpid.proton.messenger import MessengerException, Status
 from org.apache.qpid.proton.messenger.impl import MessengerImpl
 from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, 
AmqpValue
 from org.apache.qpid.proton.amqp import UnsignedInteger
@@ -52,15 +52,6 @@ STATUSES = {
 MANUAL = "MANUAL"
 AUTOMATIC = "AUTOMATIC"
 
-_ACCEPT_MODE2CONST = {
-  AcceptMode.AUTO: AUTOMATIC,
-  AcceptMode.MANUAL: MANUAL
-  }
-_CONST2ACCEPT_MODE = {
-  AUTOMATIC: AcceptMode.AUTO,
-  MANUAL: AcceptMode.MANUAL
-  }
-
 class Endpoint(object):
 
   LOCAL_UNINIT = 1
@@ -553,12 +544,11 @@ class Messenger(object):
   def incoming(self):
     return self.impl.incoming()
 
-  def _get_accept_mode(self):
-    return _ACCEPT_MODE2CONST(self.impl.getAcceptMode())
-  def _set_accept_mode(self, mode):
-    mode = _CONST2ACCEPT_MODE[mode]
-    self.impl.setAcceptMode(mode)
-  accept_mode = property(_get_accept_mode, _set_accept_mode)
+  def _get_timeout(self):
+    return self.impl.getTimeout()
+  def _set_timeout(self, t):
+    self.impl.setTimeout(t)
+  timeout = property(_get_timeout, _set_timeout)
 
   def accept(self, tracker=None):
     if tracker is None:

Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Thu Dec 20 16:17:45 2012
@@ -136,7 +136,7 @@ class MessengerTest(Test):
       assert "unable to send to address: totally-bogus-address" in err, err
 
   def testOutgoingWindow(self):
-    self.server.accept_mode = MANUAL
+    self.server.incoming_window = 10
     self.start()
     msg = Message()
     msg.address="amqp://0.0.0.0:12345"
@@ -157,23 +157,28 @@ class MessengerTest(Test):
     for i in range(10):
       trackers.append(self.client.put(msg))
 
-    for t in trackers:
+    for i in range(5):
+      t = trackers[i]
+      assert self.client.status(t) is None, (t, self.client.status(t))
+
+    for i in range(5, 10):
+      t = trackers[i]
       assert self.client.status(t) is PENDING, (t, self.client.status(t))
 
     self.client.send()
 
-    count = 0
-    for t in trackers:
-      count += 1
-      if count > 5:
-        assert self.client.status(t) is ACCEPTED
-      else:
-        assert self.client.status(t) is None
+    for i in range(5):
+      t = trackers[i]
+      assert self.client.status(t) is None
+
+    for i in range(5, 10):
+      t = trackers[i]
+      assert self.client.status(t) is ACCEPTED
 
   def testReject(self, process_incoming=None):
     if process_incoming:
       self.process_incoming = process_incoming
-    self.server.accept_mode = MANUAL
+    self.server.incoming_window = 10
     self.start()
     msg = Message()
     msg.address="amqp://0.0.0.0:12345"
@@ -214,7 +219,7 @@ class MessengerTest(Test):
 
 
   def testIncomingWindow(self):
-    self.server.accept_mode = MANUAL
+    self.server.incoming_window = 10
     self.server.outgoing_window = 10
     self.start()
     msg = Message()
@@ -232,14 +237,15 @@ class MessengerTest(Test):
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
     self.client.incoming_window = 10
-
     remaining = 10
 
     trackers = []
     while remaining:
       self.client.recv(remaining)
       while self.client.incoming:
-        trackers.append(self.client.get())
+        t = self.client.get()
+        trackers.append(t)
+        self.client.accept(t)
         remaining -= 1
     for t in trackers:
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to