Author: rhs
Date: Thu Oct 4 18:30:18 2012
New Revision: 1394200
URL: http://svn.apache.org/viewvc?rev=1394200&view=rev
Log:
PROTON-39: partial implementation of offered/available based on patch from Ted
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-j/src/main/scripts/proton.py
qpid/proton/trunk/tests/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Oct 4 18:30:18
2012
@@ -1273,6 +1273,10 @@ class Connection(Endpoint):
def state(self):
return pn_connection_state(self._conn)
+ @property
+ def writable(self):
+ return pn_connection_writable(self._conn)
+
def session(self):
return wrap_session(pn_session(self._conn))
@@ -1414,6 +1418,10 @@ class Link(Endpoint):
return pn_link_credit(self._link)
@property
+ def available(self):
+ return pn_link_available(self._link)
+
+ @property
def queued(self):
return pn_link_queued(self._link)
@@ -1422,6 +1430,9 @@ class Link(Endpoint):
class Sender(Link):
+ def offered(self, n):
+ pn_link_offered(self._link, n)
+
def send(self, bytes):
return self._check(pn_link_send(self._link, bytes))
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Oct 4 18:30:18 2012
@@ -225,6 +225,8 @@ pn_link_t *pn_link_head(pn_connection_t
*/
pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state);
+bool pn_connection_writable(pn_connection_t *connection);
+
void pn_connection_open(pn_connection_t *connection);
void pn_connection_close(pn_connection_t *connection);
void pn_connection_free(pn_connection_t *connection);
@@ -284,6 +286,7 @@ pn_delivery_t *pn_link_current(pn_link_t
bool pn_link_advance(pn_link_t *link);
int pn_link_credit(pn_link_t *link);
int pn_link_queued(pn_link_t *link);
+int pn_link_available(pn_link_t *link);
int pn_link_unsettled(pn_link_t *link);
pn_delivery_t *pn_unsettled_head(pn_link_t *link);
@@ -296,7 +299,7 @@ void *pn_link_get_context(pn_link_t *lin
void pn_link_set_context(pn_link_t *link, void *context);
// sender
-//void pn_link_offer(pn_sender_t *sender, int credits);
+void pn_link_offered(pn_link_t *sender, int credit);
ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n);
void pn_link_drained(pn_link_t *sender);
//void pn_link_abort(pn_sender_t *sender);
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Thu Oct 4 18:30:18 2012
@@ -687,9 +687,10 @@ static void pn_driver_rebuild(pn_driver_
for (int i = 0; i < d->connector_count; i++)
{
if (!c->closed) {
+ bool has_writable_links = c->connection ?
pn_connection_writable(c->connection) : false;
d->fds[d->nfds].fd = c->fd;
d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
- (c->status & PN_SEL_WR ? POLLOUT : 0);
+ ((has_writable_links || (c->status & PN_SEL_WR)) ? POLLOUT : 0);
d->fds[d->nfds].revents = 0;
c->idx = d->nfds;
d->nfds++;
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Oct 4 18:30:18
2012
@@ -155,6 +155,7 @@ struct pn_link_t {
pn_delivery_t *settled_head;
pn_delivery_t *settled_tail;
size_t unsettled_count;
+ pn_sequence_t available;
pn_sequence_t credit;
pn_sequence_t queued;
bool drain;
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Oct 4 18:30:18 2012
@@ -654,6 +654,25 @@ pn_link_t *pn_link_next(pn_link_t *link,
return NULL;
}
+bool pn_connection_writable(pn_connection_t *conn)
+{
+ if (!conn)
+ return false;
+
+ pn_endpoint_t *endpoint = conn->endpoint_head;
+
+ while (endpoint) {
+ if (pn_matches(endpoint, SENDER, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) {
+ pn_link_t *link = (pn_link_t*) endpoint;
+ if ((link->available > 0) && (pn_link_credit(link) > 0))
+ return true;
+ }
+ endpoint = endpoint->endpoint_next;
+ }
+
+ return false;
+}
+
pn_session_t *pn_session(pn_connection_t *conn)
{
if (!conn) return NULL;
@@ -814,6 +833,7 @@ void pn_link_init(pn_link_t *link, int t
link->settled_head = link->settled_tail = NULL;
link->unsettled_head = link->unsettled_tail = link->current = NULL;
link->unsettled_count = 0;
+ link->available = 0;
link->credit = 0;
link->queued = 0;
link->drain = false;
@@ -1072,6 +1092,11 @@ int pn_link_credit(pn_link_t *link)
return link ? link->credit : 0;
}
+int pn_link_available(pn_link_t *link)
+{
+ return link ? link->available : 0;
+}
+
int pn_link_queued(pn_link_t *link)
{
return link ? link->queued : 0;
@@ -1660,7 +1685,7 @@ int pn_post_flow(pn_transport_t *transpo
{
ssn_state->incoming_window =
pn_delivery_buffer_available(&ssn_state->incoming);
bool link = (bool) state;
- return pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[?IIII?I?I?In?o]", FLOW,
+ return pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[?IIII?I?I?II?o]", FLOW,
(int16_t) ssn_state->remote_channel >= 0,
ssn_state->incoming_transfer_count,
ssn_state->incoming_window,
ssn_state->outgoing.next,
@@ -2072,6 +2097,11 @@ void pn_transport_trace(pn_transport_t *
transport->disp->trace = trace;
}
+void pn_link_offered(pn_link_t *sender, int credit)
+{
+ sender->available = credit;
+}
+
ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
{
pn_delivery_t *current = pn_link_current(sender);
Modified: qpid/proton/trunk/proton-j/src/main/scripts/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/scripts/proton.py?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/src/main/scripts/proton.py Thu Oct 4 18:30:18
2012
@@ -96,6 +96,10 @@ class Connection(Endpoint):
def __init__(self, _impl=None):
self.impl = _impl or ConnectionImpl()
+ @property
+ def writable(self):
+ raise Skipped()
+
def session(self):
return wrap_session(self.impl.session())
@@ -212,6 +216,10 @@ class Link(Endpoint):
return self.impl.getCredit()
@property
+ def available(self):
+ raise Skipped()
+
+ @property
def queued(self):
return self.impl.getQueued()
@@ -220,6 +228,9 @@ class Link(Endpoint):
class Sender(Link):
+ def offered(self, n):
+ raise Skipped()
+
def send(self, bytes):
return self.impl.send(bytes, 0, len(bytes))
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Thu Oct 4 18:30:18 2012
@@ -490,6 +490,28 @@ class CreditTest(Test):
def teardown(self):
self.cleanup()
+ def testOffered1(self):
+ credit = self.snd.credit
+ assert credit == 0, credit
+ assert not self.c1.writable # No credit, no offer
+ self.snd.offered(5)
+ self.pump()
+ assert not self.c1.writable # Offered but no receiver credit
+ self.rcv.flow(10)
+ self.pump()
+ assert self.c1.writable # Offered and receiver credit
+
+ def testOffered2(self):
+ credit = self.snd.credit
+ assert credit == 0, credit
+ assert not self.c1.writable # No credit, no offer
+ self.rcv.flow(10)
+ self.pump()
+ assert not self.c1.writable # Receiver credit but no offer
+ self.snd.offered(5)
+ self.pump()
+ assert self.c1.writable # Offered and receiver credit
+
def testCreditSender(self):
credit = self.snd.credit
assert credit == 0, credit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]