Author: rhs
Date: Thu Oct  4 18:30:18 2012
New Revision: 1394200

URL: http://svn.apache.org/viewvc?rev=1394200&view=rev
Log:
PROTON-39: partial implementation of offered/available based on patch from Ted

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-j/src/main/scripts/proton.py
    qpid/proton/trunk/tests/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Oct  4 18:30:18 
2012
@@ -1273,6 +1273,10 @@ class Connection(Endpoint):
   def state(self):
     return pn_connection_state(self._conn)
 
+  @property
+  def writable(self):
+    return pn_connection_writable(self._conn)
+
   def session(self):
     return wrap_session(pn_session(self._conn))
 
@@ -1414,6 +1418,10 @@ class Link(Endpoint):
     return pn_link_credit(self._link)
 
   @property
+  def available(self):
+    return pn_link_available(self._link)
+
+  @property
   def queued(self):
     return pn_link_queued(self._link)
 
@@ -1422,6 +1430,9 @@ class Link(Endpoint):
 
 class Sender(Link):
 
+  def offered(self, n):
+    pn_link_offered(self._link, n)
+
   def send(self, bytes):
     return self._check(pn_link_send(self._link, bytes))
 

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Oct  4 18:30:18 2012
@@ -225,6 +225,8 @@ pn_link_t *pn_link_head(pn_connection_t 
  */
 pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state);
 
+bool pn_connection_writable(pn_connection_t *connection);
+
 void pn_connection_open(pn_connection_t *connection);
 void pn_connection_close(pn_connection_t *connection);
 void pn_connection_free(pn_connection_t *connection);
@@ -284,6 +286,7 @@ pn_delivery_t *pn_link_current(pn_link_t
 bool pn_link_advance(pn_link_t *link);
 int pn_link_credit(pn_link_t *link);
 int pn_link_queued(pn_link_t *link);
+int pn_link_available(pn_link_t *link);
 
 int pn_link_unsettled(pn_link_t *link);
 pn_delivery_t *pn_unsettled_head(pn_link_t *link);
@@ -296,7 +299,7 @@ void *pn_link_get_context(pn_link_t *lin
 void pn_link_set_context(pn_link_t *link, void *context);
 
 // sender
-//void pn_link_offer(pn_sender_t *sender, int credits);
+void pn_link_offered(pn_link_t *sender, int credit);
 ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n);
 void pn_link_drained(pn_link_t *sender);
 //void pn_link_abort(pn_sender_t *sender);

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Thu Oct  4 18:30:18 2012
@@ -687,9 +687,10 @@ static void pn_driver_rebuild(pn_driver_
   for (int i = 0; i < d->connector_count; i++)
   {
     if (!c->closed) {
+      bool has_writable_links = c->connection ? 
pn_connection_writable(c->connection) : false;
       d->fds[d->nfds].fd = c->fd;
       d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
-        (c->status & PN_SEL_WR ? POLLOUT : 0);
+        ((has_writable_links || (c->status & PN_SEL_WR)) ? POLLOUT : 0);
       d->fds[d->nfds].revents = 0;
       c->idx = d->nfds;
       d->nfds++;

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Oct  4 18:30:18 
2012
@@ -155,6 +155,7 @@ struct pn_link_t {
   pn_delivery_t *settled_head;
   pn_delivery_t *settled_tail;
   size_t unsettled_count;
+  pn_sequence_t available;
   pn_sequence_t credit;
   pn_sequence_t queued;
   bool drain;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Oct  4 18:30:18 2012
@@ -654,6 +654,25 @@ pn_link_t *pn_link_next(pn_link_t *link,
   return NULL;
 }
 
+bool pn_connection_writable(pn_connection_t *conn)
+{
+  if (!conn)
+    return false;
+
+  pn_endpoint_t *endpoint = conn->endpoint_head;
+
+  while (endpoint) {
+    if (pn_matches(endpoint, SENDER, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) {
+      pn_link_t *link = (pn_link_t*) endpoint;
+      if ((link->available > 0) && (pn_link_credit(link) > 0))
+        return true;
+    }
+    endpoint = endpoint->endpoint_next;
+  }
+
+  return false;
+}
+
 pn_session_t *pn_session(pn_connection_t *conn)
 {
   if (!conn) return NULL;
@@ -814,6 +833,7 @@ void pn_link_init(pn_link_t *link, int t
   link->settled_head = link->settled_tail = NULL;
   link->unsettled_head = link->unsettled_tail = link->current = NULL;
   link->unsettled_count = 0;
+  link->available = 0;
   link->credit = 0;
   link->queued = 0;
   link->drain = false;
@@ -1072,6 +1092,11 @@ int pn_link_credit(pn_link_t *link)
   return link ? link->credit : 0;
 }
 
+int pn_link_available(pn_link_t *link)
+{
+  return link ? link->available : 0;
+}
+
 int pn_link_queued(pn_link_t *link)
 {
   return link ? link->queued : 0;
@@ -1660,7 +1685,7 @@ int pn_post_flow(pn_transport_t *transpo
 {
   ssn_state->incoming_window = 
pn_delivery_buffer_available(&ssn_state->incoming);
   bool link = (bool) state;
-  return pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[?IIII?I?I?In?o]", FLOW,
+  return pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[?IIII?I?I?II?o]", FLOW,
                        (int16_t) ssn_state->remote_channel >= 0, 
ssn_state->incoming_transfer_count,
                        ssn_state->incoming_window,
                        ssn_state->outgoing.next,
@@ -2072,6 +2097,11 @@ void pn_transport_trace(pn_transport_t *
   transport->disp->trace = trace;
 }
 
+void pn_link_offered(pn_link_t *sender, int credit)
+{
+  sender->available = credit;
+}
+
 ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
 {
   pn_delivery_t *current = pn_link_current(sender);

Modified: qpid/proton/trunk/proton-j/src/main/scripts/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/scripts/proton.py?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/src/main/scripts/proton.py Thu Oct  4 18:30:18 
2012
@@ -96,6 +96,10 @@ class Connection(Endpoint):
   def __init__(self, _impl=None):
     self.impl = _impl or ConnectionImpl()
 
+  @property
+  def writable(self):
+    raise Skipped()
+
   def session(self):
     return wrap_session(self.impl.session())
 
@@ -212,6 +216,10 @@ class Link(Endpoint):
     return self.impl.getCredit()
 
   @property
+  def available(self):
+    raise Skipped()
+
+  @property
   def queued(self):
     return self.impl.getQueued()
 
@@ -220,6 +228,9 @@ class Link(Endpoint):
 
 class Sender(Link):
 
+  def offered(self, n):
+    raise Skipped()
+
   def send(self, bytes):
     return self.impl.send(bytes, 0, len(bytes))
 

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1394200&r1=1394199&r2=1394200&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Thu Oct  4 18:30:18 2012
@@ -490,6 +490,28 @@ class CreditTest(Test):
   def teardown(self):
     self.cleanup()
 
+  def testOffered1(self):
+    credit = self.snd.credit
+    assert credit == 0, credit
+    assert not self.c1.writable  # No credit, no offer
+    self.snd.offered(5)
+    self.pump()
+    assert not self.c1.writable  # Offered but no receiver credit
+    self.rcv.flow(10)
+    self.pump()
+    assert self.c1.writable      # Offered and receiver credit
+
+  def testOffered2(self):
+    credit = self.snd.credit
+    assert credit == 0, credit
+    assert not self.c1.writable  # No credit, no offer
+    self.rcv.flow(10)
+    self.pump()
+    assert not self.c1.writable  # Receiver credit but no offer
+    self.snd.offered(5)
+    self.pump()
+    assert self.c1.writable      # Offered and receiver credit
+
   def testCreditSender(self):
     credit = self.snd.credit
     assert credit == 0, credit



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to