Author: rhs
Date: Sat Mar 24 12:08:21 2012
New Revision: 1304777

URL: http://svn.apache.org/viewvc?rev=1304777&view=rev
Log:
modified driver interface to avoid callbacks; expanded swig interface

Modified:
    qpid/proton/trunk/proton-c/cproton.i
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine.c

Modified: qpid/proton/trunk/proton-c/cproton.i
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/cproton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Sat Mar 24 12:08:21 2012
@@ -2,6 +2,9 @@
 %{
 /* Includes the header in the wrapper code */
 #include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/sasl.h>
+#include <proton/driver.h>
 %}
 
 typedef unsigned int size_t;
@@ -65,5 +68,61 @@ ssize_t pn_send(pn_link_t *transport, ch
 %}
 %ignore pn_delivery_tag;
 
+%rename(pn_message_data) wrap_pn_message_data;
+%inline %{
+  int wrap_pn_message_data(char *STRING, size_t LENGTH, char *OUTPUT, size_t 
*OUTPUT_SIZE) {
+    ssize_t sz = pn_message_data(OUTPUT, *OUTPUT_SIZE, STRING, LENGTH);
+    if (sz >= 0) {
+      *OUTPUT_SIZE = sz;
+      return 0;
+    } else {
+      *OUTPUT_SIZE = 0;
+      return sz;
+    }
+  }
+%}
+%ignore pn_message_data;
+
+%rename(pn_acceptor) wrap_pn_acceptor;
+%inline {
+  pn_selectable_t *wrap_pn_acceptor(pn_driver_t *driver, const char *host, 
const char *port, PyObject *context) {
+    Py_XINCREF(context);
+    return pn_acceptor(driver, host, port, NULL, context);
+  }
+}
+%ignore pn_acceptor;
+
+%rename(pn_connector) wrap_pn_connector;
+%inline {
+  pn_selectable_t *wrap_pn_connector(pn_driver_t *driver, const char *host, 
const char *port, PyObject *context) {
+    Py_XINCREF(context);
+    return pn_connector(driver, host, port, NULL, context);
+  }
+}
+%ignore pn_connector;
+
+%rename(pn_selectable_context) wrap_pn_selectable_context;
+%inline {
+  PyObject *wrap_pn_selectable_context(pn_selectable_t *sel) {
+    PyObject *result = pn_selectable_context(sel);
+    Py_XINCREF(result);
+    return result;
+  }
+}
+%ignore pn_selectable_context;
+
+%rename(pn_selectable_destroy) wrap_pn_selectable_destroy;
+%inline %{
+  void wrap_pn_selectable_destroy(pn_selectable_t *selectable) {
+    PyObject *obj = pn_selectable_context(selectable);
+    Py_XDECREF(obj);
+    pn_selectable_destroy(selectable);
+  }
+%}
+%ignore pn_selectable_destroy;
+
 /* Parse the header file to generate wrappers */
 %include "proton/engine.h"
+%include "proton/message.h"
+%include "proton/sasl.h"
+%include "proton/driver.h"

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Sat Mar 24 12:08:21 2012
@@ -35,16 +35,22 @@ typedef void (pn_callback_t)(pn_selectab
 
 pn_driver_t *pn_driver(void);
 void pn_driver_trace(pn_driver_t *d, pn_trace_t trace);
+void pn_driver_wakeup(pn_driver_t *d);
+void pn_driver_wait(pn_driver_t *d);
+pn_selectable_t *pn_driver_next(pn_driver_t *d);
 void pn_driver_run(pn_driver_t *d);
 void pn_driver_stop(pn_driver_t *d);
 void pn_driver_destroy(pn_driver_t *d);
 
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void* context);
-pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void* context);
+pn_selectable_t *pn_acceptor(pn_driver_t *driver, const char *host, const char 
*port,
+                             pn_callback_t *callback, void* context);
+pn_selectable_t *pn_connector(pn_driver_t *driver, const char *host, const 
char *port,
+                              pn_callback_t *callback, void* context);
 void pn_selectable_trace(pn_selectable_t *sel, pn_trace_t trace);
 pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel);
 pn_connection_t *pn_selectable_connection(pn_selectable_t *sel);
 void *pn_selectable_context(pn_selectable_t *sel);
+void pn_selectable_close(pn_selectable_t *sel);
 void pn_selectable_destroy(pn_selectable_t *sel);
 
 #endif /* driver.h */

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Sat Mar 24 12:08:21 2012
@@ -40,7 +40,10 @@
 struct pn_driver_t {
   pn_selectable_t *head;
   pn_selectable_t *tail;
+  pn_selectable_t *current;
   size_t size;
+  size_t capacity;
+  struct pollfd *fds;
   int ctrl[2]; //pipe for updating selectable status
   bool stopping;
   pn_trace_t trace;
@@ -52,6 +55,7 @@ struct pn_selectable_t {
   pn_driver_t *driver;
   pn_selectable_t *next;
   pn_selectable_t *prev;
+  int idx;
   int fd;
   int status;
   time_t wakeup;
@@ -82,6 +86,10 @@ static void pn_driver_add(pn_driver_t *d
 
 static void pn_driver_remove(pn_driver_t *d, pn_selectable_t *s)
 {
+  if (s == d->current) {
+    d->current = s->next;
+  }
+
   LL_REMOVE(d->head, d->tail, s);
   s->driver = NULL;
   d->size--;
@@ -122,6 +130,7 @@ pn_selectable_t *pn_selectable(pn_driver
   s->process_output = pn_selectable_write_sasl_header;
   s->callback = callback;
   s->context = context;
+  s->idx = 0;
 
   pn_selectable_trace(s, driver->trace);
 
@@ -138,17 +147,17 @@ void pn_selectable_trace(pn_selectable_t
 
 pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel)
 {
-  return sel->sasl;
+  return sel ? sel->sasl : NULL;
 }
 
 pn_connection_t *pn_selectable_connection(pn_selectable_t *sel)
 {
-  return sel->connection;
+  return sel ? sel->connection : NULL;
 }
 
 void *pn_selectable_context(pn_selectable_t *sel)
 {
-  return sel->context;
+  return sel ? sel->context : NULL;
 }
 
 void pn_selectable_destroy(pn_selectable_t *sel)
@@ -161,9 +170,11 @@ void pn_selectable_destroy(pn_selectable
   free(sel);
 }
 
-static void pn_selectable_close(pn_selectable_t *sel)
+void pn_selectable_close(pn_selectable_t *sel)
 {
   // XXX: should probably signal engine and callback here
+  if (!sel) return;
+
   sel->status = 0;
   if (close(sel->fd) == -1)
     perror("close");
@@ -349,12 +360,21 @@ pn_driver_t *pn_driver()
   if (!d) return NULL;
   d->head = NULL;
   d->tail = NULL;
+  d->current = NULL;
   d->size = 0;
+  d->capacity = 0;
+  d->fds = NULL;
   d->ctrl[0] = 0;
   d->ctrl[1] = 0;
   d->stopping = false;
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
               (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
+
+  // XXX
+  if (pipe(d->ctrl)) {
+    perror("Can't create control pipe");
+  }
+
   return d;
 }
 
@@ -365,78 +385,105 @@ void pn_driver_trace(pn_driver_t *d, pn_
 
 void pn_driver_destroy(pn_driver_t *d)
 {
+  if (!d) return;
+
+  close(d->ctrl[0]);
+  close(d->ctrl[1]);
   while (d->head)
     pn_selectable_destroy(d->head);
+  free(d->fds);
   free(d);
 }
 
-void pn_driver_run(pn_driver_t *d)
+void pn_driver_wakeup(pn_driver_t *d)
 {
-  int i, nfds = 0;
-  struct pollfd *fds = NULL;
+  write(d->ctrl[1], "x", 1);
+}
 
-  if (pipe(d->ctrl)) {
-      perror("Can't create control pipe");
+static void pn_driver_rebuild(pn_driver_t *d)
+{
+  if (d->size == 0) return;
+  while (d->capacity < d->size + 1) {
+    d->capacity = d->capacity ? 2*d->capacity : 16;
+    d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd));
   }
-  while (!d->stopping)
+
+  d->fds[0].fd = d->ctrl[0];
+  d->fds[0].events = POLLIN;
+  d->fds[0].revents = 0;
+
+  pn_selectable_t *s = d->head;
+  for (int i = 1; i <= d->size; i++)
   {
-    int n = d->size;
-    if (n == 0) break;
-    if (n > nfds) {
-      fds = realloc(fds, (n+1)*sizeof(struct pollfd));
-      nfds = n;
-    }
+    d->fds[i].fd = s->fd;
+    d->fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
+      (s->status & PN_SEL_WR ? POLLOUT : 0);
+    d->fds[i].revents = 0;
+    s->idx = i;
+    s = s->next;
+  }
 
-    pn_selectable_t *s = d->head;
-    for (i = 0; i < n; i++)
-    {
-      fds[i].fd = s->fd;
-      fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
-        (s->status & PN_SEL_WR ? POLLOUT : 0);
-      fds[i].revents = 0;
-      // XXX
-      s->tick(s, 0);
-      s = s->next;
-    }
-    fds[n].fd = d->ctrl[0];
-    fds[n].events = POLLIN;
-    fds[n].revents = 0;
-
-    DIE_IFE(poll(fds, n+1, -1));
-
-    s = d->head;
-    for (i = 0; i < n; i++)
-    {
-      // XXX: this is necessary because read or write might close the
-      // selectable, should probably fix this by making them mark it
-      // as closed and closing from this loop
-      pn_selectable_t *next = s->next;
-      if (fds[i].revents & POLLIN)
-        s->read(s);
-      if (fds[i].revents & POLLOUT)
-        s->write(s);
-      s = next;
-    }
+}
 
-    if (fds[n].revents & POLLIN) {
-      //clear the pipe
-      char buffer[512];
-      while (read(d->ctrl[0], buffer, 512) == 512);
-    }
+void pn_driver_wait(pn_driver_t *d) {
+  pn_driver_rebuild(d);
+
+  pn_selectable_t *s = d->head;
+  while (s) {
+    // XXX
+    s->tick(s, 0);
+    s = s->next;
   }
 
-  close(d->ctrl[0]);
-  close(d->ctrl[1]);
-  free(fds);
+  DIE_IFE(poll(d->fds, d->size+1, -1));
+
+  if (d->fds[0].revents & POLLIN) {
+    //clear the pipe
+    char buffer[512];
+    while (read(d->ctrl[0], buffer, 512) == 512);
+  }
+
+  d->current = d->head;
+}
+
+void pn_selectable_work(pn_selectable_t *s) {
+  // XXX: this is necessary because read or write might close the
+  // selectable, should probably fix this by making them mark it
+  // and keeping close/destroy/etc entirely outside the driver
+  int idx = s->idx;
+  pn_driver_t *d = s->driver;
+  if (d->fds[idx].revents & POLLIN)
+    s->read(s);
+  if (d->fds[idx].revents & POLLOUT)
+    s->write(s);
+}
+
+pn_selectable_t *pn_driver_next(pn_driver_t *d) {
+  pn_selectable_t *s = d->current;
+  if (s) {
+    d->current = s->next;
+    pn_selectable_work(s);
+  }
+  return s;
+}
+
+void pn_driver_run(pn_driver_t *d)
+{
+  while (!d->stopping)
+  {
+    pn_driver_wait(d);
+    while (pn_driver_next(d));
+  }
 }
 
 void pn_driver_stop(pn_driver_t *d)
 {
   d->stopping = true;
-  write(d->ctrl[1], "x", 1);
+  pn_driver_wakeup(d);
 }
 
-pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void *context)
+pn_selectable_t *pn_connector(pn_driver_t *driver, const char *host, const 
char *port,
+                              pn_callback_t *callback, void *context)
 {
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
@@ -489,7 +536,8 @@ static void do_accept(pn_selectable_t *s
 static void do_nothing(pn_selectable_t *s) {}
 static time_t never_tick(pn_selectable_t *s, time_t now) { return 0; }
 
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void* context)
+pn_selectable_t *pn_acceptor(pn_driver_t *driver, const char *host, const char 
*port,
+                             pn_callback_t *callback, void* context)
 {
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Sat Mar 24 12:08:21 2012
@@ -443,11 +443,14 @@ void pn_connection_set_hostname(pn_conne
 
 pn_delivery_t *pn_work_head(pn_connection_t *connection)
 {
+  if (!connection) return NULL;
   return connection->work_head;
 }
 
 pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
 {
+  if (!delivery) return NULL;
+
   if (delivery->work)
     return delivery->work_next;
   else
@@ -888,6 +891,7 @@ pn_delivery_tag_t pn_delivery_tag(pn_del
 
 pn_delivery_t *pn_current(pn_link_t *link)
 {
+  if (!link) return NULL;
   return link->current;
 }
 
@@ -907,7 +911,7 @@ void pn_advance_receiver(pn_link_t *link
 
 bool pn_advance(pn_link_t *link)
 {
-  if (link->current) {
+  if (link && link->current) {
     pn_delivery_t *prev = link->current;
     if (link->endpoint.type == SENDER) {
       pn_advance_sender(link);
@@ -1568,6 +1572,7 @@ time_t pn_tick(pn_transport_t *engine, t
 
 pn_link_t *pn_link(pn_delivery_t *delivery)
 {
+  if (!delivery) return NULL;
   return delivery->link;
 }
 
@@ -1600,6 +1605,8 @@ void pn_disposition(pn_delivery_t *deliv
 
 bool pn_writable(pn_delivery_t *delivery)
 {
+  if (!delivery) return false;
+
   pn_link_t *link = delivery->link;
   return pn_is_sender(link) && pn_is_current(delivery) && link->credit > 0;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to