Author: rhs
Date: Sat Mar 24 12:08:21 2012
New Revision: 1304777
URL: http://svn.apache.org/viewvc?rev=1304777&view=rev
Log:
modified driver interface to avoid callbacks; expanded swig interface
Modified:
qpid/proton/trunk/proton-c/cproton.i
qpid/proton/trunk/proton-c/include/proton/driver.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/engine/engine.c
Modified: qpid/proton/trunk/proton-c/cproton.i
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/cproton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Sat Mar 24 12:08:21 2012
@@ -2,6 +2,9 @@
%{
/* Includes the header in the wrapper code */
#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/sasl.h>
+#include <proton/driver.h>
%}
typedef unsigned int size_t;
@@ -65,5 +68,61 @@ ssize_t pn_send(pn_link_t *transport, ch
%}
%ignore pn_delivery_tag;
+%rename(pn_message_data) wrap_pn_message_data;
+%inline %{
+ int wrap_pn_message_data(char *STRING, size_t LENGTH, char *OUTPUT, size_t
*OUTPUT_SIZE) {
+ ssize_t sz = pn_message_data(OUTPUT, *OUTPUT_SIZE, STRING, LENGTH);
+ if (sz >= 0) {
+ *OUTPUT_SIZE = sz;
+ return 0;
+ } else {
+ *OUTPUT_SIZE = 0;
+ return sz;
+ }
+ }
+%}
+%ignore pn_message_data;
+
+%rename(pn_acceptor) wrap_pn_acceptor;
+%inline {
+ pn_selectable_t *wrap_pn_acceptor(pn_driver_t *driver, const char *host,
const char *port, PyObject *context) {
+ Py_XINCREF(context);
+ return pn_acceptor(driver, host, port, NULL, context);
+ }
+}
+%ignore pn_acceptor;
+
+%rename(pn_connector) wrap_pn_connector;
+%inline {
+ pn_selectable_t *wrap_pn_connector(pn_driver_t *driver, const char *host,
const char *port, PyObject *context) {
+ Py_XINCREF(context);
+ return pn_connector(driver, host, port, NULL, context);
+ }
+}
+%ignore pn_connector;
+
+%rename(pn_selectable_context) wrap_pn_selectable_context;
+%inline {
+ PyObject *wrap_pn_selectable_context(pn_selectable_t *sel) {
+ PyObject *result = pn_selectable_context(sel);
+ Py_XINCREF(result);
+ return result;
+ }
+}
+%ignore pn_selectable_context;
+
+%rename(pn_selectable_destroy) wrap_pn_selectable_destroy;
+%inline %{
+ void wrap_pn_selectable_destroy(pn_selectable_t *selectable) {
+ PyObject *obj = pn_selectable_context(selectable);
+ Py_XDECREF(obj);
+ pn_selectable_destroy(selectable);
+ }
+%}
+%ignore pn_selectable_destroy;
+
/* Parse the header file to generate wrappers */
%include "proton/engine.h"
+%include "proton/message.h"
+%include "proton/sasl.h"
+%include "proton/driver.h"
Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Sat Mar 24 12:08:21 2012
@@ -35,16 +35,22 @@ typedef void (pn_callback_t)(pn_selectab
pn_driver_t *pn_driver(void);
void pn_driver_trace(pn_driver_t *d, pn_trace_t trace);
+void pn_driver_wakeup(pn_driver_t *d);
+void pn_driver_wait(pn_driver_t *d);
+pn_selectable_t *pn_driver_next(pn_driver_t *d);
void pn_driver_run(pn_driver_t *d);
void pn_driver_stop(pn_driver_t *d);
void pn_driver_destroy(pn_driver_t *d);
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port,
pn_callback_t *callback, void* context);
-pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port,
pn_callback_t *callback, void* context);
+pn_selectable_t *pn_acceptor(pn_driver_t *driver, const char *host, const char
*port,
+ pn_callback_t *callback, void* context);
+pn_selectable_t *pn_connector(pn_driver_t *driver, const char *host, const
char *port,
+ pn_callback_t *callback, void* context);
void pn_selectable_trace(pn_selectable_t *sel, pn_trace_t trace);
pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel);
pn_connection_t *pn_selectable_connection(pn_selectable_t *sel);
void *pn_selectable_context(pn_selectable_t *sel);
+void pn_selectable_close(pn_selectable_t *sel);
void pn_selectable_destroy(pn_selectable_t *sel);
#endif /* driver.h */
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Sat Mar 24 12:08:21 2012
@@ -40,7 +40,10 @@
struct pn_driver_t {
pn_selectable_t *head;
pn_selectable_t *tail;
+ pn_selectable_t *current;
size_t size;
+ size_t capacity;
+ struct pollfd *fds;
int ctrl[2]; //pipe for updating selectable status
bool stopping;
pn_trace_t trace;
@@ -52,6 +55,7 @@ struct pn_selectable_t {
pn_driver_t *driver;
pn_selectable_t *next;
pn_selectable_t *prev;
+ int idx;
int fd;
int status;
time_t wakeup;
@@ -82,6 +86,10 @@ static void pn_driver_add(pn_driver_t *d
static void pn_driver_remove(pn_driver_t *d, pn_selectable_t *s)
{
+ if (s == d->current) {
+ d->current = s->next;
+ }
+
LL_REMOVE(d->head, d->tail, s);
s->driver = NULL;
d->size--;
@@ -122,6 +130,7 @@ pn_selectable_t *pn_selectable(pn_driver
s->process_output = pn_selectable_write_sasl_header;
s->callback = callback;
s->context = context;
+ s->idx = 0;
pn_selectable_trace(s, driver->trace);
@@ -138,17 +147,17 @@ void pn_selectable_trace(pn_selectable_t
pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel)
{
- return sel->sasl;
+ return sel ? sel->sasl : NULL;
}
pn_connection_t *pn_selectable_connection(pn_selectable_t *sel)
{
- return sel->connection;
+ return sel ? sel->connection : NULL;
}
void *pn_selectable_context(pn_selectable_t *sel)
{
- return sel->context;
+ return sel ? sel->context : NULL;
}
void pn_selectable_destroy(pn_selectable_t *sel)
@@ -161,9 +170,11 @@ void pn_selectable_destroy(pn_selectable
free(sel);
}
-static void pn_selectable_close(pn_selectable_t *sel)
+void pn_selectable_close(pn_selectable_t *sel)
{
// XXX: should probably signal engine and callback here
+ if (!sel) return;
+
sel->status = 0;
if (close(sel->fd) == -1)
perror("close");
@@ -349,12 +360,21 @@ pn_driver_t *pn_driver()
if (!d) return NULL;
d->head = NULL;
d->tail = NULL;
+ d->current = NULL;
d->size = 0;
+ d->capacity = 0;
+ d->fds = NULL;
d->ctrl[0] = 0;
d->ctrl[1] = 0;
d->stopping = false;
d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
(pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
+
+ // XXX
+ if (pipe(d->ctrl)) {
+ perror("Can't create control pipe");
+ }
+
return d;
}
@@ -365,78 +385,105 @@ void pn_driver_trace(pn_driver_t *d, pn_
void pn_driver_destroy(pn_driver_t *d)
{
+ if (!d) return;
+
+ close(d->ctrl[0]);
+ close(d->ctrl[1]);
while (d->head)
pn_selectable_destroy(d->head);
+ free(d->fds);
free(d);
}
-void pn_driver_run(pn_driver_t *d)
+void pn_driver_wakeup(pn_driver_t *d)
{
- int i, nfds = 0;
- struct pollfd *fds = NULL;
+ write(d->ctrl[1], "x", 1);
+}
- if (pipe(d->ctrl)) {
- perror("Can't create control pipe");
+static void pn_driver_rebuild(pn_driver_t *d)
+{
+ if (d->size == 0) return;
+ while (d->capacity < d->size + 1) {
+ d->capacity = d->capacity ? 2*d->capacity : 16;
+ d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd));
}
- while (!d->stopping)
+
+ d->fds[0].fd = d->ctrl[0];
+ d->fds[0].events = POLLIN;
+ d->fds[0].revents = 0;
+
+ pn_selectable_t *s = d->head;
+ for (int i = 1; i <= d->size; i++)
{
- int n = d->size;
- if (n == 0) break;
- if (n > nfds) {
- fds = realloc(fds, (n+1)*sizeof(struct pollfd));
- nfds = n;
- }
+ d->fds[i].fd = s->fd;
+ d->fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
+ (s->status & PN_SEL_WR ? POLLOUT : 0);
+ d->fds[i].revents = 0;
+ s->idx = i;
+ s = s->next;
+ }
- pn_selectable_t *s = d->head;
- for (i = 0; i < n; i++)
- {
- fds[i].fd = s->fd;
- fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
- (s->status & PN_SEL_WR ? POLLOUT : 0);
- fds[i].revents = 0;
- // XXX
- s->tick(s, 0);
- s = s->next;
- }
- fds[n].fd = d->ctrl[0];
- fds[n].events = POLLIN;
- fds[n].revents = 0;
-
- DIE_IFE(poll(fds, n+1, -1));
-
- s = d->head;
- for (i = 0; i < n; i++)
- {
- // XXX: this is necessary because read or write might close the
- // selectable, should probably fix this by making them mark it
- // as closed and closing from this loop
- pn_selectable_t *next = s->next;
- if (fds[i].revents & POLLIN)
- s->read(s);
- if (fds[i].revents & POLLOUT)
- s->write(s);
- s = next;
- }
+}
- if (fds[n].revents & POLLIN) {
- //clear the pipe
- char buffer[512];
- while (read(d->ctrl[0], buffer, 512) == 512);
- }
+void pn_driver_wait(pn_driver_t *d) {
+ pn_driver_rebuild(d);
+
+ pn_selectable_t *s = d->head;
+ while (s) {
+ // XXX
+ s->tick(s, 0);
+ s = s->next;
}
- close(d->ctrl[0]);
- close(d->ctrl[1]);
- free(fds);
+ DIE_IFE(poll(d->fds, d->size+1, -1));
+
+ if (d->fds[0].revents & POLLIN) {
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
+
+ d->current = d->head;
+}
+
+void pn_selectable_work(pn_selectable_t *s) {
+ // XXX: this is necessary because read or write might close the
+ // selectable, should probably fix this by making them mark it
+ // and keeping close/destroy/etc entirely outside the driver
+ int idx = s->idx;
+ pn_driver_t *d = s->driver;
+ if (d->fds[idx].revents & POLLIN)
+ s->read(s);
+ if (d->fds[idx].revents & POLLOUT)
+ s->write(s);
+}
+
+pn_selectable_t *pn_driver_next(pn_driver_t *d) {
+ pn_selectable_t *s = d->current;
+ if (s) {
+ d->current = s->next;
+ pn_selectable_work(s);
+ }
+ return s;
+}
+
+void pn_driver_run(pn_driver_t *d)
+{
+ while (!d->stopping)
+ {
+ pn_driver_wait(d);
+ while (pn_driver_next(d));
+ }
}
void pn_driver_stop(pn_driver_t *d)
{
d->stopping = true;
- write(d->ctrl[1], "x", 1);
+ pn_driver_wakeup(d);
}
-pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port,
pn_callback_t *callback, void *context)
+pn_selectable_t *pn_connector(pn_driver_t *driver, const char *host, const
char *port,
+ pn_callback_t *callback, void *context)
{
struct addrinfo *addr;
int code = getaddrinfo(host, port, NULL, &addr);
@@ -489,7 +536,8 @@ static void do_accept(pn_selectable_t *s
static void do_nothing(pn_selectable_t *s) {}
static time_t never_tick(pn_selectable_t *s, time_t now) { return 0; }
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port,
pn_callback_t *callback, void* context)
+pn_selectable_t *pn_acceptor(pn_driver_t *driver, const char *host, const char
*port,
+ pn_callback_t *callback, void* context)
{
struct addrinfo *addr;
int code = getaddrinfo(host, port, NULL, &addr);
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1304777&r1=1304776&r2=1304777&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Sat Mar 24 12:08:21 2012
@@ -443,11 +443,14 @@ void pn_connection_set_hostname(pn_conne
pn_delivery_t *pn_work_head(pn_connection_t *connection)
{
+ if (!connection) return NULL;
return connection->work_head;
}
pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
{
+ if (!delivery) return NULL;
+
if (delivery->work)
return delivery->work_next;
else
@@ -888,6 +891,7 @@ pn_delivery_tag_t pn_delivery_tag(pn_del
pn_delivery_t *pn_current(pn_link_t *link)
{
+ if (!link) return NULL;
return link->current;
}
@@ -907,7 +911,7 @@ void pn_advance_receiver(pn_link_t *link
bool pn_advance(pn_link_t *link)
{
- if (link->current) {
+ if (link && link->current) {
pn_delivery_t *prev = link->current;
if (link->endpoint.type == SENDER) {
pn_advance_sender(link);
@@ -1568,6 +1572,7 @@ time_t pn_tick(pn_transport_t *engine, t
pn_link_t *pn_link(pn_delivery_t *delivery)
{
+ if (!delivery) return NULL;
return delivery->link;
}
@@ -1600,6 +1605,8 @@ void pn_disposition(pn_delivery_t *deliv
bool pn_writable(pn_delivery_t *delivery)
{
+ if (!delivery) return false;
+
pn_link_t *link = delivery->link;
return pn_is_sender(link) && pn_is_current(delivery) && link->credit > 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]