Author: rhs
Date: Thu Dec 20 16:17:45 2012
New Revision: 1424564
URL: http://svn.apache.org/viewvc?rev=1424564&view=rev
Log:
fixed messenger's windowing behavior; removed accept mode as a configuration
option as with proper windowing behavior, setting the accept mode to manual
seems mostly equivalent to setting a non-zero incoming window
Removed:
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
qpid/proton/trunk/tests/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Dec 20 16:17:45
2012
@@ -90,15 +90,6 @@ STATUSES = {
AUTOMATIC = Constant("AUTOMATIC")
MANUAL = Constant("MANUAL")
-_ACCEPT_MODE2CONST = {
- PN_ACCEPT_MODE_AUTO: AUTOMATIC,
- PN_ACCEPT_MODE_MANUAL: MANUAL
- }
-_CONST2ACCEPT_MODE = {
- AUTOMATIC: PN_ACCEPT_MODE_AUTO,
- MANUAL: PN_ACCEPT_MODE_MANUAL
- }
-
class Messenger(object):
"""
The L{Messenger} class defines a high level interface for sending
@@ -259,21 +250,6 @@ The timeout property contains the defaul
operations performed by the L{Messenger}.
""")
- def _get_accept_mode(self):
- return _ACCEPT_MODE2CONST(pn_messenger_get_accept_mode(self._mng))
-
- def _set_accept_mode(self, mode):
- mode = _CONST2ACCEPT_MODE[mode]
- self._check(pn_messenger_set_accept_mode(self._mng, mode))
-
- accept_mode = property(_get_accept_mode, _set_accept_mode,
- doc="""
-The accept mode for the messenger. Can be set to AUTOMATIC or MANUAL.
-The default is AUTOMATIC. If set to MANUAL, then every incoming
-message must be accepted or rejected (either individually or
-cummulatively) via the accept and reject methods.
-""")
-
def _get_incoming_window(self):
return pn_messenger_get_incoming_window(self._mng)
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Thu Dec 20 16:17:45 2012
@@ -155,32 +155,17 @@ void pn_queue_del(pn_queue_t *queue, pn_
}
}
-pn_sequence_t pn_queue_add(pn_queue_t *queue, pn_delivery_t *delivery)
-{
- pn_sequence_t id = queue->hwm++;
- size_t offset = id - queue->lwm;
- PN_ENSUREZ(queue->deliveries, queue->capacity, offset + 1);
- assert(offset >= 0 && offset < queue->capacity);
- queue->deliveries[offset] = delivery;
- pn_delivery_set_context(delivery, (void *) (intptr_t) id);
- pn_connection_t *conn =
- pn_session_connection(pn_link_session(pn_delivery_link(delivery)));
- pn_incref(conn);
- return id;
-}
-
void pn_queue_slide(pn_queue_t *queue)
{
if (queue->window >= 0) {
while (queue->hwm - queue->lwm > queue->window) {
pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
if (d) {
- if (pn_delivery_local_state(d)) {
- pn_delivery_settle(d);
- pn_queue_del(queue, d);
- } else {
- break;
+ if (!pn_delivery_local_state(d)) {
+ pn_delivery_update(d, PN_ACCEPTED);
}
+ pn_delivery_settle(d);
+ pn_queue_del(queue, d);
} else {
pn_queue_gc(queue);
}
@@ -189,6 +174,21 @@ void pn_queue_slide(pn_queue_t *queue)
pn_queue_gc(queue);
}
+pn_sequence_t pn_queue_add(pn_queue_t *queue, pn_delivery_t *delivery)
+{
+ pn_sequence_t id = queue->hwm++;
+ size_t offset = id - queue->lwm;
+ PN_ENSUREZ(queue->deliveries, queue->capacity, offset + 1);
+ assert(offset >= 0 && offset < queue->capacity);
+ queue->deliveries[offset] = delivery;
+ pn_delivery_set_context(delivery, (void *) (intptr_t) id);
+ pn_connection_t *conn =
+ pn_session_connection(pn_link_session(pn_delivery_link(delivery)));
+ pn_incref(conn);
+ pn_queue_slide(queue);
+ return id;
+}
+
int pn_queue_update(pn_queue_t *queue, pn_sequence_t id, pn_status_t status,
int flags, bool settle, bool match)
{
@@ -229,7 +229,7 @@ int pn_queue_update(pn_queue_t *queue, p
}
}
- pn_queue_slide(queue);
+ pn_queue_gc(queue);
return 0;
}
@@ -472,8 +472,6 @@ void pn_messenger_endpoints(pn_messenger
d = pn_work_next(d);
}
- pn_queue_slide(&messenger->outgoing);
-
if (pn_work_head(conn)) {
return;
}
@@ -989,7 +987,7 @@ pn_queue_t *pn_tracker_queue(pn_messenge
static pn_status_t disp2status(pn_disposition_t disp)
{
- if (!disp) return PN_STATUS_PENDING;
+ if (!disp) return PN_STATUS_UNKNOWN;
switch (disp) {
case PN_ACCEPTED:
@@ -1127,22 +1125,10 @@ int pn_messenger_get(pn_messenger_t *mes
return pn_error_format(messenger->error, err, "error decoding
message: %s",
pn_message_error(msg));
} else {
- if (messenger->accept_mode == PN_ACCEPT_MODE_AUTO) {
- return pn_messenger_accept(messenger,
-
pn_messenger_incoming_tracker(messenger),
- 0);
- } else {
- return 0;
- }
- }
- } else {
- if (messenger->accept_mode == PN_ACCEPT_MODE_AUTO) {
- return pn_messenger_accept(messenger,
-
pn_messenger_incoming_tracker(messenger),
- 0);
- } else {
return 0;
}
+ } else {
+ return 0;
}
}
d = pn_work_next(d);
Modified:
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
(original)
+++
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
Thu Dec 20 16:17:45 2012
@@ -145,9 +145,6 @@ public interface Messenger
*/
int incoming();
- AcceptMode getAcceptMode();
- void setAcceptMode(AcceptMode mode);
-
int getIncomingWindow();
void setIncomingWindow(int window);
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Thu Dec 20 16:17:45 2012
@@ -43,7 +43,6 @@ import org.apache.qpid.proton.driver.imp
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.apache.qpid.proton.messenger.AcceptMode;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerException;
import org.apache.qpid.proton.messenger.Status;
@@ -68,7 +67,6 @@ public class MessengerImpl implements Me
private Driver _driver;
private int _credit;
private int _distributed;
- private AcceptMode _acceptMode = AcceptMode.AUTO;
private TrackerQueue _incoming = new TrackerQueue();
private TrackerQueue _outgoing = new TrackerQueue();
@@ -194,9 +192,6 @@ public class MessengerImpl implements Me
Message message = new MessageImpl();
message.decode(_buffer, 0, size);
_incoming.add(delivery);
- if (_acceptMode == AcceptMode.AUTO) {
- _incoming.accept(incomingTracker());
- }
_distributed--;
delivery.getLink().advance();
return message;
@@ -250,15 +245,6 @@ public class MessengerImpl implements Me
}
- public AcceptMode getAcceptMode()
- {
- return _acceptMode;
- }
- public void setAcceptMode(AcceptMode mode)
- {
- _acceptMode = mode;
- }
-
public int getIncomingWindow()
{
return _incoming.getWindow();
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
Thu Dec 20 16:17:45 2012
@@ -77,6 +77,7 @@ class TrackerQueue
}
int sequence = _hwm++;
_deliveries.add(delivery);
+ slide();
}
Status getStatus(Tracker tracker)
@@ -139,7 +140,7 @@ class TrackerQueue
Delivery d = _deliveries.get(0);
if (d.getLocalState() == null)
{
- return;
+ d.disposition(ACCEPTED);
}
d.settle();
@@ -186,7 +187,6 @@ class TrackerQueue
operation.apply(d);
}
}
- slide();
}
private static interface DeliveryOperation
Modified: qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py Thu Dec 20
16:17:45 2012
@@ -24,7 +24,7 @@ from org.apache.qpid.proton.engine.impl
from org.apache.qpid.proton.engine.impl.ssl import SslDomainImpl,
SslPeerDetailsImpl
from org.apache.qpid.proton.message import MessageFormat
from org.apache.qpid.proton.message.impl import MessageImpl
-from org.apache.qpid.proton.messenger import AcceptMode, MessengerException,
Status
+from org.apache.qpid.proton.messenger import MessengerException, Status
from org.apache.qpid.proton.messenger.impl import MessengerImpl
from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted,
AmqpValue
from org.apache.qpid.proton.amqp import UnsignedInteger
@@ -52,15 +52,6 @@ STATUSES = {
MANUAL = "MANUAL"
AUTOMATIC = "AUTOMATIC"
-_ACCEPT_MODE2CONST = {
- AcceptMode.AUTO: AUTOMATIC,
- AcceptMode.MANUAL: MANUAL
- }
-_CONST2ACCEPT_MODE = {
- AUTOMATIC: AcceptMode.AUTO,
- MANUAL: AcceptMode.MANUAL
- }
-
class Endpoint(object):
LOCAL_UNINIT = 1
@@ -553,12 +544,11 @@ class Messenger(object):
def incoming(self):
return self.impl.incoming()
- def _get_accept_mode(self):
- return _ACCEPT_MODE2CONST(self.impl.getAcceptMode())
- def _set_accept_mode(self, mode):
- mode = _CONST2ACCEPT_MODE[mode]
- self.impl.setAcceptMode(mode)
- accept_mode = property(_get_accept_mode, _set_accept_mode)
+ def _get_timeout(self):
+ return self.impl.getTimeout()
+ def _set_timeout(self, t):
+ self.impl.setTimeout(t)
+ timeout = property(_get_timeout, _set_timeout)
def accept(self, tracker=None):
if tracker is None:
Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1424564&r1=1424563&r2=1424564&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Thu Dec 20 16:17:45 2012
@@ -136,7 +136,7 @@ class MessengerTest(Test):
assert "unable to send to address: totally-bogus-address" in err, err
def testOutgoingWindow(self):
- self.server.accept_mode = MANUAL
+ self.server.incoming_window = 10
self.start()
msg = Message()
msg.address="amqp://0.0.0.0:12345"
@@ -157,23 +157,28 @@ class MessengerTest(Test):
for i in range(10):
trackers.append(self.client.put(msg))
- for t in trackers:
+ for i in range(5):
+ t = trackers[i]
+ assert self.client.status(t) is None, (t, self.client.status(t))
+
+ for i in range(5, 10):
+ t = trackers[i]
assert self.client.status(t) is PENDING, (t, self.client.status(t))
self.client.send()
- count = 0
- for t in trackers:
- count += 1
- if count > 5:
- assert self.client.status(t) is ACCEPTED
- else:
- assert self.client.status(t) is None
+ for i in range(5):
+ t = trackers[i]
+ assert self.client.status(t) is None
+
+ for i in range(5, 10):
+ t = trackers[i]
+ assert self.client.status(t) is ACCEPTED
def testReject(self, process_incoming=None):
if process_incoming:
self.process_incoming = process_incoming
- self.server.accept_mode = MANUAL
+ self.server.incoming_window = 10
self.start()
msg = Message()
msg.address="amqp://0.0.0.0:12345"
@@ -214,7 +219,7 @@ class MessengerTest(Test):
def testIncomingWindow(self):
- self.server.accept_mode = MANUAL
+ self.server.incoming_window = 10
self.server.outgoing_window = 10
self.start()
msg = Message()
@@ -232,14 +237,15 @@ class MessengerTest(Test):
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
self.client.incoming_window = 10
-
remaining = 10
trackers = []
while remaining:
self.client.recv(remaining)
while self.client.incoming:
- trackers.append(self.client.get())
+ t = self.client.get()
+ trackers.append(t)
+ self.client.accept(t)
remaining -= 1
for t in trackers:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]