Author: rhs
Date: Mon Mar 26 18:36:10 2012
New Revision: 1305486

URL: http://svn.apache.org/viewvc?rev=1305486&view=rev
Log:
updated driver interface

Modified:
    qpid/proton/trunk/proton-c/cproton.i
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/proton.c

Modified: qpid/proton/trunk/proton-c/cproton.i
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?rev=1305486&r1=1305485&r2=1305486&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/cproton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Mon Mar 26 18:36:10 2012
@@ -83,43 +83,81 @@ ssize_t pn_send(pn_link_t *transport, ch
 %}
 %ignore pn_message_data;
 
-%rename(pn_acceptor) wrap_pn_acceptor;
+%rename(pn_listener) wrap_pn_listener;
 %inline {
-  pn_selectable_t *wrap_pn_acceptor(pn_driver_t *driver, const char *host, 
const char *port, PyObject *context) {
+  pn_listener_t *wrap_pn_listener(pn_driver_t *driver, const char *host, const 
char *port, PyObject *context) {
     Py_XINCREF(context);
-    return pn_acceptor(driver, host, port, NULL, context);
+    return pn_listener(driver, host, port, context);
   }
 }
-%ignore pn_acceptor;
+%ignore pn_listener;
+
+%rename(pn_listener_context) wrap_pn_listener_context;
+%inline {
+  PyObject *wrap_pn_listener_context(pn_listener_t *l) {
+    PyObject *result = pn_listener_context(l);
+    if (result) {
+      Py_INCREF(result);
+      return result;
+    } else {
+      Py_RETURN_NONE;
+    }
+  }
+}
+%ignore pn_listener_context;
+
+%rename(pn_listener_destroy) wrap_pn_listener_destroy;
+%inline %{
+  void wrap_pn_listener_destroy(pn_listener_t *l) {
+    PyObject *obj = pn_listener_context(l);
+    Py_XDECREF(obj);
+    pn_listener_destroy(l);
+  }
+%}
+%ignore pn_listener_destroy;
 
 %rename(pn_connector) wrap_pn_connector;
 %inline {
-  pn_selectable_t *wrap_pn_connector(pn_driver_t *driver, const char *host, 
const char *port, PyObject *context) {
+  pn_connector_t *wrap_pn_connector(pn_driver_t *driver, const char *host, 
const char *port, PyObject *context) {
     Py_XINCREF(context);
-    return pn_connector(driver, host, port, NULL, context);
+    return pn_connector(driver, host, port, context);
   }
 }
 %ignore pn_connector;
 
-%rename(pn_selectable_context) wrap_pn_selectable_context;
+%rename(pn_connector_context) wrap_pn_connector_context;
 %inline {
-  PyObject *wrap_pn_selectable_context(pn_selectable_t *sel) {
-    PyObject *result = pn_selectable_context(sel);
-    Py_XINCREF(result);
-    return result;
+  PyObject *wrap_pn_connector_context(pn_connector_t *c) {
+    PyObject *result = pn_connector_context(c);
+    if (result) {
+      Py_INCREF(result);
+      return result;
+    } else {
+      Py_RETURN_NONE;
+    }
+  }
+}
+%ignore pn_connector_context;
+
+%rename(pn_connector_set_context) wrap_pn_connector_set_context;
+%inline {
+  void wrap_pn_connector_set_context(pn_connector_t *ctor, PyObject *context) {
+    Py_XDECREF(pn_connector_context(ctor));
+    Py_XINCREF(context);
+    pn_connector_set_context(ctor, context);
   }
 }
-%ignore pn_selectable_context;
+%ignore pn_connector_set_context;
 
-%rename(pn_selectable_destroy) wrap_pn_selectable_destroy;
+%rename(pn_connector_destroy) wrap_pn_connector_destroy;
 %inline %{
-  void wrap_pn_selectable_destroy(pn_selectable_t *selectable) {
-    PyObject *obj = pn_selectable_context(selectable);
+  void wrap_pn_connector_destroy(pn_connector_t *c) {
+    PyObject *obj = pn_connector_context(c);
     Py_XDECREF(obj);
-    pn_selectable_destroy(selectable);
+    pn_connector_destroy(c);
   }
 %}
-%ignore pn_selectable_destroy;
+%ignore pn_connector_destroy;
 
 /* Parse the header file to generate wrappers */
 %include "proton/engine.h"

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1305486&r1=1305485&r2=1305486&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Mon Mar 26 18:36:10 2012
@@ -24,33 +24,38 @@
 
 #include <proton/engine.h>
 #include <proton/sasl.h>
-#include <stdlib.h>
 
 typedef struct pn_driver_t pn_driver_t;
-typedef struct pn_selectable_t pn_selectable_t;
-typedef void (pn_callback_t)(pn_selectable_t *);
-
-#define PN_SEL_RD (0x0001)
-#define PN_SEL_WR (0x0002)
+typedef struct pn_listener_t pn_listener_t;
+typedef struct pn_connector_t pn_connector_t;
 
 pn_driver_t *pn_driver(void);
 void pn_driver_trace(pn_driver_t *d, pn_trace_t trace);
 void pn_driver_wakeup(pn_driver_t *d);
 void pn_driver_wait(pn_driver_t *d);
-pn_selectable_t *pn_driver_next(pn_driver_t *d);
-void pn_driver_run(pn_driver_t *d);
-void pn_driver_stop(pn_driver_t *d);
+pn_connector_t *pn_driver_listen(pn_driver_t *d);
+pn_connector_t *pn_driver_process(pn_driver_t *d);
 void pn_driver_destroy(pn_driver_t *d);
 
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, const char *host, const char 
*port,
-                             pn_callback_t *callback, void* context);
-pn_selectable_t *pn_connector(pn_driver_t *driver, const char *host, const 
char *port,
-                              pn_callback_t *callback, void* context);
-void pn_selectable_trace(pn_selectable_t *sel, pn_trace_t trace);
-pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel);
-pn_connection_t *pn_selectable_connection(pn_selectable_t *sel);
-void *pn_selectable_context(pn_selectable_t *sel);
-void pn_selectable_close(pn_selectable_t *sel);
-void pn_selectable_destroy(pn_selectable_t *sel);
+pn_listener_t *pn_listener(pn_driver_t *driver, const char *host,
+                           const char *port, void* context);
+pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context);
+void pn_listener_trace(pn_listener_t *listener, pn_trace_t trace);
+void *pn_listener_context(pn_listener_t *listener);
+void pn_listener_close(pn_listener_t *listener);
+void pn_listener_destroy(pn_listener_t *listener);
+
+pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
+                             const char *port, void* context);
+pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context);
+void pn_connector_trace(pn_connector_t *ctor, pn_trace_t trace);
+pn_listener_t *pn_connector_listener(pn_connector_t *ctor);
+pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor);
+pn_connection_t *pn_connector_connection(pn_connector_t *ctor);
+void *pn_connector_context(pn_connector_t *ctor);
+void pn_connector_set_context(pn_connector_t *ctor, void *context);
+void pn_connector_close(pn_connector_t *ctor);
+bool pn_connector_closed(pn_connector_t *ctor);
+void pn_connector_destroy(pn_connector_t *ctor);
 
 #endif /* driver.h */

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1305486&r1=1305485&r2=1305486&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Mon Mar 26 18:36:10 2012
@@ -37,11 +37,18 @@
 
 /* Decls */
 
+#define PN_SEL_RD (0x0001)
+#define PN_SEL_WR (0x0002)
+
 struct pn_driver_t {
-  pn_selectable_t *head;
-  pn_selectable_t *tail;
-  pn_selectable_t *current;
-  size_t size;
+  pn_listener_t *listener_head;
+  pn_listener_t *listener_tail;
+  pn_listener_t *listener_next;
+  pn_connector_t *connector_head;
+  pn_connector_t *connector_tail;
+  pn_connector_t *connector_next;
+  size_t listener_count;
+  size_t connector_count;
   size_t capacity;
   struct pollfd *fds;
   int ctrl[2]; //pipe for updating selectable status
@@ -49,19 +56,29 @@ struct pn_driver_t {
   pn_trace_t trace;
 };
 
+struct pn_listener_t {
+  pn_driver_t *driver;
+  pn_listener_t *next;
+  pn_listener_t *prev;
+  int idx;
+  int fd;
+  void *context;
+};
+
 #define IO_BUF_SIZE (4*1024)
 
-struct pn_selectable_t {
+struct pn_connector_t {
   pn_driver_t *driver;
-  pn_selectable_t *next;
-  pn_selectable_t *prev;
+  pn_connector_t *next;
+  pn_connector_t *prev;
   int idx;
   int fd;
   int status;
+  bool closed;
   time_t wakeup;
-  pn_callback_t *read;
-  pn_callback_t *write;
-  time_t (*tick)(pn_selectable_t *sel, time_t now);
+  void (*read)(pn_connector_t *);
+  void (*write) (pn_connector_t *);
+  time_t (*tick)(pn_connector_t *sel, time_t now);
   size_t input_size;
   char input[IO_BUF_SIZE];
   size_t output_size;
@@ -69,160 +86,323 @@ struct pn_selectable_t {
   pn_sasl_t *sasl;
   pn_connection_t *connection;
   pn_transport_t *transport;
-  ssize_t (*process_input)(pn_selectable_t *sel);
-  ssize_t (*process_output)(pn_selectable_t *sel);
-  pn_callback_t *callback;
+  ssize_t (*process_input)(pn_connector_t *);
+  ssize_t (*process_output)(pn_connector_t *);
+  pn_listener_t *listener;
   void *context;
 };
 
 /* Impls */
 
-static void pn_driver_add(pn_driver_t *d, pn_selectable_t *s)
+// listener
+
+static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l)
 {
-  LL_ADD(d->head, d->tail, s);
-  s->driver = d;
-  d->size++;
+  LL_ADD(d->listener_head, d->listener_tail, l);
+  l->driver = d;
+  d->listener_count++;
 }
 
-static void pn_driver_remove(pn_driver_t *d, pn_selectable_t *s)
+static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l)
 {
-  if (s == d->current) {
-    d->current = s->next;
+  if (l == d->listener_next) {
+    d->listener_next = l->next;
   }
 
-  LL_REMOVE(d->head, d->tail, s);
-  s->driver = NULL;
-  d->size--;
+  LL_REMOVE(d->listener_head, d->listener_tail, l);
+  l->driver = NULL;
+  d->listener_count--;
 }
 
-static void pn_selectable_read(pn_selectable_t *sel);
-static void pn_selectable_write(pn_selectable_t *sel);
-static time_t pn_selectable_tick(pn_selectable_t *sel, time_t now);
+pn_listener_t *pn_listener(pn_driver_t *driver, const char *host,
+                           const char *port, void* context)
+{
+  struct addrinfo *addr;
+  int code = getaddrinfo(host, port, NULL, &addr);
+  if (code) {
+    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
+    return NULL;
+  }
+
+  int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
+  if (sock == -1)
+    return NULL;
+
+  int optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == 
-1)
+    return NULL;
+
+  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    freeaddrinfo(addr);
+    return NULL;
+  }
+
+  freeaddrinfo(addr);
+
+  if (listen(sock, 50) == -1)
+    return NULL;
 
-static ssize_t pn_selectable_read_sasl_header(pn_selectable_t *sel);
-static ssize_t pn_selectable_read_sasl(pn_selectable_t *sel);
-static ssize_t pn_selectable_read_amqp_header(pn_selectable_t *sel);
-static ssize_t pn_selectable_read_amqp(pn_selectable_t *sel);
-static ssize_t pn_selectable_write_sasl_header(pn_selectable_t *sel);
-static ssize_t pn_selectable_write_sasl(pn_selectable_t *sel);
-static ssize_t pn_selectable_write_amqp_header(pn_selectable_t *sel);
-static ssize_t pn_selectable_write_amqp(pn_selectable_t *sel);
+  pn_listener_t *l = pn_listener_fd(driver, sock, context);
 
-pn_selectable_t *pn_selectable(pn_driver_t *driver, int fd, pn_callback_t 
*callback, void *context)
+  printf("Listening on %s:%s\n", host, port);
+  return l;
+}
+
+pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context)
 {
-  pn_selectable_t *s = malloc(sizeof(pn_selectable_t));
-  if (!s) return NULL;
-  s->driver = driver;
-  s->next = NULL;
-  s->prev = NULL;
-  s->fd = fd;
-  s->status = 0;
-  s->wakeup = 0;
-  s->read = pn_selectable_read;
-  s->write = pn_selectable_write;
-  s->tick = pn_selectable_tick;
-  s->input_size = 0;
-  s->output_size = 0;
-  s->sasl = pn_sasl();
-  s->connection = pn_connection();
-  s->transport = pn_transport(s->connection);
-  s->process_input = pn_selectable_read_sasl_header;
-  s->process_output = pn_selectable_write_sasl_header;
-  s->callback = callback;
-  s->context = context;
-  s->idx = 0;
+  pn_listener_t *l = malloc(sizeof(pn_listener_t));
+  if (!l) return NULL;
+  l->driver = driver;
+  l->next = NULL;
+  l->prev = NULL;
+  l->fd = fd;
+  l->context = context;
+  l->idx = 0;
 
-  pn_selectable_trace(s, driver->trace);
+  pn_driver_add_listener(driver, l);
+  return l;
+}
 
-  pn_driver_add(driver, s);
+void pn_listener_trace(pn_listener_t *l, pn_trace_t trace) {
+  // XXX
+}
 
-  return s;
+void *pn_listener_context(pn_listener_t *l) {
+  return l ? l->context : NULL;
 }
 
-void pn_selectable_trace(pn_selectable_t *sel, pn_trace_t trace)
+static pn_connector_t *pn_listener_accept(pn_listener_t *l)
 {
-  pn_sasl_trace(sel->sasl, trace);
-  pn_trace(sel->transport, trace);
+  struct sockaddr_in addr = {0};
+  addr.sin_family = AF_INET;
+  socklen_t addrlen = sizeof(addr);
+  int sock = accept(l->fd, (struct sockaddr *) &addr, &addrlen);
+  if (sock == -1) {
+    perror("accept");
+    return NULL;
+  } else {
+    char host[1024], serv[64];
+    int code;
+    if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, 1024, 
serv, 64, 0))) {
+      fprintf(stderr, "getnameinfo: %s\n", gai_strerror(code));
+      if (close(sock) == -1)
+        perror("close");
+      return NULL;
+    } else {
+      printf("accepted from %s:%s\n", host, serv);
+      pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
+      c->listener = l;
+      return c;
+    }
+  }
 }
 
-pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel)
+
+void pn_listener_close(pn_listener_t *l)
 {
-  return sel ? sel->sasl : NULL;
+  if (!l) return;
+
+  if (close(l->fd) == -1)
+    perror("close");
 }
 
-pn_connection_t *pn_selectable_connection(pn_selectable_t *sel)
+void pn_listener_destroy(pn_listener_t *l)
 {
-  return sel ? sel->connection : NULL;
+  if (!l) return;
+
+  if (l->driver) pn_driver_remove_listener(l->driver, l);
+  free(l);
+}
+
+// connector
+
+static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c)
+{
+  LL_ADD(d->connector_head, d->connector_tail, c);
+  c->driver = d;
+  d->connector_count++;
 }
 
-void *pn_selectable_context(pn_selectable_t *sel)
+static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c)
 {
-  return sel ? sel->context : NULL;
+  if (c == d->connector_next) {
+    d->connector_next = c->next;
+  }
+
+  LL_REMOVE(d->connector_head, d->connector_tail, c);
+  c->driver = NULL;
+  d->connector_count--;
 }
 
-void pn_selectable_destroy(pn_selectable_t *sel)
+pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
+                             const char *port, void *context)
 {
-  if (!sel) return;
+  struct addrinfo *addr;
+  int code = getaddrinfo(host, port, NULL, &addr);
+  if (code) {
+    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
+    return NULL;
+  }
 
-  if (sel->driver) pn_driver_remove(sel->driver, sel);
-  pn_connection_destroy(sel->connection);
-  pn_sasl_destroy(sel->sasl);
-  free(sel);
+  int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
+  if (sock == -1)
+    return NULL;
+
+  if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    freeaddrinfo(addr);
+    return NULL;
+  }
+
+  freeaddrinfo(addr);
+
+  pn_connector_t *c = pn_connector_fd(driver, sock, context);
+  printf("Connected to %s:%s\n", host, port);
+  return c;
+}
+
+static void pn_connector_read(pn_connector_t *ctor);
+static void pn_connector_write(pn_connector_t *ctor);
+static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
+
+static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor);
+static ssize_t pn_connector_read_sasl(pn_connector_t *ctor);
+static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor);
+static ssize_t pn_connector_read_amqp(pn_connector_t *ctor);
+static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor);
+static ssize_t pn_connector_write_sasl(pn_connector_t *ctor);
+static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor);
+static ssize_t pn_connector_write_amqp(pn_connector_t *ctor);
+
+pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
+{
+  pn_connector_t *c = malloc(sizeof(pn_connector_t));
+  if (!c) return NULL;
+  c->driver = driver;
+  c->next = NULL;
+  c->prev = NULL;
+  c->fd = fd;
+  c->status = PN_SEL_RD | PN_SEL_WR;
+  c->closed = false;
+  c->wakeup = 0;
+  c->read = pn_connector_read;
+  c->write = pn_connector_write;
+  c->tick = pn_connector_tick;
+  c->input_size = 0;
+  c->output_size = 0;
+  c->sasl = pn_sasl();
+  c->connection = pn_connection();
+  c->transport = pn_transport(c->connection);
+  c->process_input = pn_connector_read_sasl_header;
+  c->process_output = pn_connector_write_sasl_header;
+  c->context = context;
+  c->listener = NULL;
+  c->idx = 0;
+
+  pn_connector_trace(c, driver->trace);
+
+  pn_driver_add_connector(driver, c);
+  return c;
+}
+
+void pn_connector_trace(pn_connector_t *ctor, pn_trace_t trace)
+{
+  pn_sasl_trace(ctor->sasl, trace);
+  pn_trace(ctor->transport, trace);
+}
+
+pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor)
+{
+  return ctor ? ctor->sasl : NULL;
+}
+
+pn_connection_t *pn_connector_connection(pn_connector_t *ctor)
+{
+  return ctor ? ctor->connection : NULL;
+}
+
+void *pn_connector_context(pn_connector_t *ctor)
+{
+  return ctor ? ctor->context : NULL;
 }
 
-void pn_selectable_close(pn_selectable_t *sel)
+void pn_connector_set_context(pn_connector_t *ctor, void *context)
+{
+  if (!ctor) return;
+  ctor->context = context;
+}
+
+pn_listener_t *pn_connector_listener(pn_connector_t *ctor)
+{
+  return ctor ? ctor->listener : NULL;
+}
+
+void pn_connector_close(pn_connector_t *ctor)
 {
   // XXX: should probably signal engine and callback here
-  if (!sel) return;
+  if (!ctor) return;
 
-  sel->status = 0;
-  if (close(sel->fd) == -1)
+  ctor->status = 0;
+  if (close(ctor->fd) == -1)
     perror("close");
+  ctor->closed = true;
+}
+
+bool pn_connector_closed(pn_connector_t *ctor)
+{
+  return ctor ? ctor->closed : true;
+}
+
+void pn_connector_destroy(pn_connector_t *ctor)
+{
+  if (!ctor) return;
+
+  if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor);
+  pn_connection_destroy(ctor->connection);
+  pn_sasl_destroy(ctor->sasl);
+  free(ctor);
 }
 
-static void pn_selectable_consume(pn_selectable_t *sel, int n)
+static void pn_connector_consume(pn_connector_t *ctor, int n)
 {
-  sel->input_size -= n;
-  memmove(sel->input, sel->input + n, sel->input_size);
+  ctor->input_size -= n;
+  memmove(ctor->input, ctor->input + n, ctor->input_size);
 }
 
-static void pn_selectable_read(pn_selectable_t *sel)
+static void pn_connector_read(pn_connector_t *ctor)
 {
-  ssize_t n = recv(sel->fd, sel->input + sel->input_size, IO_BUF_SIZE - 
sel->input_size, 0);
+  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - 
ctor->input_size, 0);
 
   if (n <= 0) {
     printf("disconnected: %zi\n", n);
-    pn_selectable_close(sel);
-    pn_selectable_destroy(sel);
+    pn_connector_close(ctor);
     return;
   } else {
-    sel->input_size += n;
+    ctor->input_size += n;
   }
 
-  while (sel->input_size > 0) {
-    n = sel->process_input(sel);
+  while (ctor->input_size > 0) {
+    n = ctor->process_input(ctor);
     if (n > 0) {
-      pn_selectable_consume(sel, n);
+      pn_connector_consume(ctor, n);
     } else if (n == 0) {
       return;
     } else {
       if (n != PN_EOS) printf("error in process_input: %zi\n", n);
-      pn_selectable_close(sel);
-      pn_selectable_destroy(sel);
+      pn_connector_close(ctor);
       return;
     }
   }
 }
 
-static ssize_t pn_selectable_read_sasl_header(pn_selectable_t *sel)
+static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor)
 {
-  if (sel->input_size >= 8) {
-    if (memcmp(sel->input, "AMQP\x03\x01\x00\x00", 8)) {
+  if (ctor->input_size >= 8) {
+    if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) {
       fprintf(stderr, "sasl header missmatch\n");
       return PN_ERR;
     } else {
       fprintf(stderr, "    <- AMQP SASL 1.0\n");
-      sel->process_input = pn_selectable_read_sasl;
+      ctor->process_input = pn_connector_read_sasl;
       return 8;
     }
   }
@@ -230,27 +410,29 @@ static ssize_t pn_selectable_read_sasl_h
   return 0;
 }
 
-static ssize_t pn_selectable_read_sasl(pn_selectable_t *sel)
+static ssize_t pn_connector_read_sasl(pn_connector_t *ctor)
 {
-  pn_sasl_t *sasl = sel->sasl;
-  ssize_t n = pn_sasl_input(sasl, sel->input, sel->input_size);
+  pn_sasl_t *sasl = ctor->sasl;
+  ssize_t n = pn_sasl_input(sasl, ctor->input, ctor->input_size);
   if (n == PN_EOS) {
-    sel->process_input = pn_selectable_read_amqp_header;
-    return sel->process_input(sel);
+    ctor->process_input = pn_connector_read_amqp_header;
+    return ctor->process_input(ctor);
   } else {
     return n;
   }
 }
 
-static ssize_t pn_selectable_read_amqp_header(pn_selectable_t *sel)
+static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor)
 {
-  if (sel->input_size >= 8) {
-    if (memcmp(sel->input, "AMQP\x00\x01\x00\x00", 8)) {
-      fprintf(stderr, "amqp header missmatch\n");
+  if (ctor->input_size >= 8) {
+    if (memcmp(ctor->input, "AMQP\x00\x01\x00\x00", 8)) {
+      fprintf(stderr, "amqp header missmatch: ");
+      pn_fprint_data(stderr, ctor->input, ctor->input_size);
+      fprintf(stderr, "\n");
       return PN_ERR;
     } else {
       fprintf(stderr, "    <- AMQP 1.0\n");
-      sel->process_input = pn_selectable_read_amqp;
+      ctor->process_input = pn_connector_read_amqp;
       return 8;
     }
   }
@@ -258,110 +440,123 @@ static ssize_t pn_selectable_read_amqp_h
   return 0;
 }
 
-static ssize_t pn_selectable_read_amqp(pn_selectable_t *sel)
+static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
 {
-  pn_transport_t *transport = sel->transport;
-  return pn_input(transport, sel->input, sel->input_size);
+  pn_transport_t *transport = ctor->transport;
+  return pn_input(transport, ctor->input, ctor->input_size);
 }
 
-static char *pn_selectable_output(pn_selectable_t *sel)
+static char *pn_connector_output(pn_connector_t *ctor)
 {
-  return sel->output + sel->output_size;
+  return ctor->output + ctor->output_size;
 }
 
-static size_t pn_selectable_available(pn_selectable_t *sel)
+static size_t pn_connector_available(pn_connector_t *ctor)
 {
-  return IO_BUF_SIZE - sel->output_size;
+  return IO_BUF_SIZE - ctor->output_size;
 }
 
-static void pn_selectable_write(pn_selectable_t *sel)
+static void pn_connector_write(pn_connector_t *ctor)
 {
-  while (pn_selectable_available(sel) > 0) {
-    ssize_t n = sel->process_output(sel);
+  while (pn_connector_available(ctor) > 0) {
+    ssize_t n = ctor->process_output(ctor);
     if (n > 0) {
-      sel->output_size += n;
+      ctor->output_size += n;
     } else if (n == 0) {
       break;
     } else {
       if (n != PN_EOS) fprintf(stderr, "error in process_output: %zi", n);
-      pn_selectable_close(sel);
-      pn_selectable_destroy(sel);
+      pn_connector_close(ctor);
       return;
     }
   }
 
-  if (sel->output_size > 0) {
-    ssize_t n = send(sel->fd, sel->output, sel->output_size, 0);
+  if (ctor->output_size > 0) {
+    ssize_t n = send(ctor->fd, ctor->output, ctor->output_size, 0);
     if (n < 0) {
       // XXX
       perror("send");
-      pn_selectable_close(sel);
-      pn_selectable_destroy(sel);
+      pn_connector_close(ctor);
       return;
     } else {
-      sel->output_size -= n;
-      memmove(sel->output, sel->output + n, sel->output_size);
+      ctor->output_size -= n;
+      memmove(ctor->output, ctor->output + n, ctor->output_size);
     }
 
-    if (sel->output_size)
-      sel->status |= PN_SEL_WR;
+    if (ctor->output_size)
+      ctor->status |= PN_SEL_WR;
     else
-      sel->status &= ~PN_SEL_WR;
+      ctor->status &= ~PN_SEL_WR;
   }
 }
 
-static ssize_t pn_selectable_write_sasl_header(pn_selectable_t *sel)
+static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor)
 {
   fprintf(stderr, "    -> AMQP SASL 1.0\n");
-  memmove(pn_selectable_output(sel), "AMQP\x03\x01\x00\x00", 8);
-  sel->process_output = pn_selectable_write_sasl;
+  memmove(pn_connector_output(ctor), "AMQP\x03\x01\x00\x00", 8);
+  ctor->process_output = pn_connector_write_sasl;
   return 8;
 }
 
-static ssize_t pn_selectable_write_sasl(pn_selectable_t *sel)
+static ssize_t pn_connector_write_sasl(pn_connector_t *ctor)
 {
-  pn_sasl_t *sasl = sel->sasl;
-  ssize_t n = pn_sasl_output(sasl, pn_selectable_output(sel), 
pn_selectable_available(sel));
+  pn_sasl_t *sasl = ctor->sasl;
+  ssize_t n = pn_sasl_output(sasl, pn_connector_output(ctor), 
pn_connector_available(ctor));
   if (n == PN_EOS) {
-    sel->process_output = pn_selectable_write_amqp_header;
-    return sel->process_output(sel);
+    ctor->process_output = pn_connector_write_amqp_header;
+    return ctor->process_output(ctor);
   } else {
     return n;
   }
 }
 
-static ssize_t pn_selectable_write_amqp_header(pn_selectable_t *sel)
+static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor)
 {
   fprintf(stderr, "    -> AMQP 1.0\n");
-  memmove(pn_selectable_output(sel), "AMQP\x00\x01\x00\x00", 8);
-  sel->process_output = pn_selectable_write_amqp;
-  pn_transport_open(sel->transport);
+  memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8);
+  ctor->process_output = pn_connector_write_amqp;
+  pn_transport_open(ctor->transport);
   return 8;
 }
 
-static ssize_t pn_selectable_write_amqp(pn_selectable_t *sel)
+static ssize_t pn_connector_write_amqp(pn_connector_t *ctor)
 {
-  pn_transport_t *transport = sel->transport;
-  return pn_output(transport, pn_selectable_output(sel), 
pn_selectable_available(sel));
+  pn_transport_t *transport = ctor->transport;
+  return pn_output(transport, pn_connector_output(ctor), 
pn_connector_available(ctor));
 }
 
-static time_t pn_selectable_tick(pn_selectable_t *sel, time_t now)
+static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   // XXX: should probably have a function pointer for this and switch it with 
different layers
-  time_t result = pn_tick(sel->transport, now);
-  if (sel->callback) sel->callback(sel);
-  pn_selectable_write(sel);
+  time_t result = pn_tick(ctor->transport, now);
+  pn_connector_write(ctor);
   return result;
 }
 
+void pn_connector_work(pn_connector_t *c) {
+  int idx = c->idx;
+  if (!idx) return;
+  pn_driver_t *d = c->driver;
+  if (d->fds[idx].revents & POLLIN)
+    c->read(c);
+  if (d->fds[idx].revents & POLLOUT)
+    c->write(c);
+}
+
+// driver
+
 pn_driver_t *pn_driver()
 {
   pn_driver_t *d = malloc(sizeof(pn_driver_t));
   if (!d) return NULL;
-  d->head = NULL;
-  d->tail = NULL;
-  d->current = NULL;
-  d->size = 0;
+  d->listener_head = NULL;
+  d->listener_tail = NULL;
+  d->listener_next = NULL;
+  d->connector_head = NULL;
+  d->connector_tail = NULL;
+  d->connector_next = NULL;
+  d->listener_count = 0;
+  d->connector_count = 0;
   d->capacity = 0;
   d->fds = NULL;
   d->ctrl[0] = 0;
@@ -389,8 +584,10 @@ void pn_driver_destroy(pn_driver_t *d)
 
   close(d->ctrl[0]);
   close(d->ctrl[1]);
-  while (d->head)
-    pn_selectable_destroy(d->head);
+  while (d->connector_head)
+    pn_connector_destroy(d->connector_head);
+  while (d->listener_head)
+    pn_listener_destroy(d->listener_head);
   free(d->fds);
   free(d);
 }
@@ -402,8 +599,9 @@ void pn_driver_wakeup(pn_driver_t *d)
 
 static void pn_driver_rebuild(pn_driver_t *d)
 {
-  if (d->size == 0) return;
-  while (d->capacity < d->size + 1) {
+  size_t size = d->listener_count + d->connector_count;
+  if (size == 0) return;
+  while (d->capacity < size + 1) {
     d->capacity = d->capacity ? 2*d->capacity : 16;
     d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd));
   }
@@ -412,30 +610,40 @@ static void pn_driver_rebuild(pn_driver_
   d->fds[0].events = POLLIN;
   d->fds[0].revents = 0;
 
-  pn_selectable_t *s = d->head;
-  for (int i = 1; i <= d->size; i++)
-  {
-    d->fds[i].fd = s->fd;
-    d->fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
-      (s->status & PN_SEL_WR ? POLLOUT : 0);
-    d->fds[i].revents = 0;
-    s->idx = i;
-    s = s->next;
+  pn_listener_t *l = d->listener_head;
+  for (int i = 0; i < d->listener_count; i++) {
+    int idx = 1 + i;
+    d->fds[idx].fd = l->fd;
+    d->fds[idx].events = POLLIN;
+    d->fds[idx].revents = 0;
+    l->idx = idx;
+    l = l->next;
   }
 
+  pn_connector_t *c = d->connector_head;
+  for (int i = 0; i < d->connector_count; i++)
+  {
+    int idx = 1 + d->listener_count + i;
+    d->fds[idx].fd = c->fd;
+    d->fds[idx].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
+      (c->status & PN_SEL_WR ? POLLOUT : 0);
+    d->fds[idx].revents = 0;
+    c->idx = idx;
+    c = c->next;
+  }
 }
 
 void pn_driver_wait(pn_driver_t *d) {
   pn_driver_rebuild(d);
 
-  pn_selectable_t *s = d->head;
-  while (s) {
+  pn_connector_t *c = d->connector_head;
+  while (c) {
     // XXX
-    s->tick(s, 0);
-    s = s->next;
+    c->tick(c, 0);
+    c = c->next;
   }
 
-  DIE_IFE(poll(d->fds, d->size+1, -1));
+  DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, -1));
 
   if (d->fds[0].revents & POLLIN) {
     //clear the pipe
@@ -443,134 +651,31 @@ void pn_driver_wait(pn_driver_t *d) {
     while (read(d->ctrl[0], buffer, 512) == 512);
   }
 
-  d->current = d->head;
-}
-
-void pn_selectable_work(pn_selectable_t *s) {
-  // XXX: this is necessary because read or write might close the
-  // selectable, should probably fix this by making them mark it
-  // and keeping close/destroy/etc entirely outside the driver
-  int idx = s->idx;
-  pn_driver_t *d = s->driver;
-  if (d->fds[idx].revents & POLLIN)
-    s->read(s);
-  if (d->fds[idx].revents & POLLOUT)
-    s->write(s);
+  d->listener_next = d->listener_head;
+  d->connector_next = d->connector_head;
 }
 
-pn_selectable_t *pn_driver_next(pn_driver_t *d) {
-  pn_selectable_t *s = d->current;
-  if (s) {
-    d->current = s->next;
-    pn_selectable_work(s);
-  }
-  return s;
-}
-
-void pn_driver_run(pn_driver_t *d)
-{
-  while (!d->stopping)
-  {
-    pn_driver_wait(d);
-    while (pn_driver_next(d));
-  }
-}
-
-void pn_driver_stop(pn_driver_t *d)
-{
-  d->stopping = true;
-  pn_driver_wakeup(d);
-}
-
-pn_selectable_t *pn_connector(pn_driver_t *driver, const char *host, const 
char *port,
-                              pn_callback_t *callback, void *context)
-{
-  struct addrinfo *addr;
-  int code = getaddrinfo(host, port, NULL, &addr);
-  if (code) {
-    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
-    return NULL;
-  }
-
-  int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
-  if (sock == -1)
-    return NULL;
-
-  if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    freeaddrinfo(addr);
-    return NULL;
-  }
-
-  freeaddrinfo(addr);
-
-  pn_selectable_t *s = pn_selectable(driver, sock, callback, context);
-  s->status = PN_SEL_RD | PN_SEL_WR;
-
-  printf("Connected to %s:%s\n", host, port);
-  return s;
-}
+pn_connector_t *pn_driver_listen(pn_driver_t *d) {
+  if (!d) return NULL;
 
-static void do_accept(pn_selectable_t *s)
-{
-  struct sockaddr_in addr = {0};
-  addr.sin_family = AF_INET;
-  socklen_t addrlen = sizeof(addr);
-  int sock = accept(s->fd, (struct sockaddr *) &addr, &addrlen);
-  if (sock == -1) {
-    perror("accept");
-  } else {
-    char host[1024], serv[64];
-    int code;
-    if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, 1024, 
serv, 64, 0))) {
-      fprintf(stderr, "getnameinfo: %s\n", gai_strerror(code));
-      if (close(sock) == -1)
-        perror("close");
-    } else {
-      printf("accepted from %s:%s\n", host, serv);
-      pn_selectable_t *a = pn_selectable(s->driver, sock, s->callback, 
s->context);
-      a->status = PN_SEL_RD | PN_SEL_WR;
+  pn_listener_t *l = d->listener_next;
+  if (l) {
+    d->listener_next = l->next;
+    if (l->idx && d->fds[l->idx].revents & POLLIN) {
+      pn_connector_t *c = pn_listener_accept(l);
+      return c;
     }
   }
+  return NULL;
 }
 
-static void do_nothing(pn_selectable_t *s) {}
-static time_t never_tick(pn_selectable_t *s, time_t now) { return 0; }
-
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, const char *host, const char 
*port,
-                             pn_callback_t *callback, void* context)
-{
-  struct addrinfo *addr;
-  int code = getaddrinfo(host, port, NULL, &addr);
-  if (code) {
-    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
-    return NULL;
-  }
-
-  int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
-  if (sock == -1)
-    return NULL;
-
-  int optval = 1;
-  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == 
-1)
-    return NULL;
+pn_connector_t *pn_driver_process(pn_driver_t *d) {
+  if (!d) return NULL;
 
-  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    freeaddrinfo(addr);
-    return NULL;
+  pn_connector_t *c = d->connector_next;
+  if (c) {
+    d->connector_next = c->next;
+    pn_connector_work(c);
   }
-
-  freeaddrinfo(addr);
-
-  if (listen(sock, 50) == -1)
-    return NULL;
-
-  // XXX: should factor into pure selectable and separate subclass
-  pn_selectable_t *s = pn_selectable(driver, sock, callback, context);
-  s->read = do_accept;
-  s->write = do_nothing;
-  s->tick = never_tick;
-  s->status = PN_SEL_RD;
-
-  printf("Listening on %s:%s\n", host, port);
-  return s;
+  return c;
 }

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1305486&r1=1305485&r2=1305486&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Mar 26 18:36:10 2012
@@ -31,7 +31,9 @@
 #include <stdarg.h>
 #include <stdio.h>
 
-wchar_t *wcsdup(const wchar_t *src)
+#define wcsdup my_wcsdup
+
+wchar_t *my_wcsdup(const wchar_t *src)
 {
   if (src) {
     wchar_t *dest = malloc((wcslen(src)+1)*sizeof(wchar_t));

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1305486&r1=1305485&r2=1305486&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Mon Mar 26 18:36:10 2012
@@ -81,9 +81,9 @@ struct server_context {
   int count;
 };
 
-void server_callback(pn_selectable_t *sel)
+void server_callback(pn_connector_t *ctor)
 {
-  pn_sasl_t *sasl = pn_selectable_sasl(sel);
+  pn_sasl_t *sasl = pn_connector_sasl(ctor);
 
   if (!pn_sasl_init(sasl)) {
     pn_sasl_server(sasl);
@@ -110,8 +110,8 @@ void server_callback(pn_selectable_t *se
     return;
   }
 
-  pn_connection_t *conn = pn_selectable_connection(sel);
-  struct server_context *ctx = pn_selectable_context(sel);
+  pn_connection_t *conn = pn_connector_connection(ctor);
+  struct server_context *ctx = pn_connector_context(ctor);
   char tagstr[1024];
   char msg[1024];
   char data[1024];
@@ -200,6 +200,7 @@ void server_callback(pn_selectable_t *se
 
 struct client_context {
   bool init;
+  bool done;
   int recv_count;
   int send_count;
   pn_driver_t *driver;
@@ -210,11 +211,11 @@ struct client_context {
   wchar_t address[1024];
 };
 
-void client_callback(pn_selectable_t *sel)
+void client_callback(pn_connector_t *ctor)
 {
-  struct client_context *ctx = pn_selectable_context(sel);
+  struct client_context *ctx = pn_connector_context(ctor);
 
-  pn_sasl_t *sasl = pn_selectable_sasl(sel);
+  pn_sasl_t *sasl = pn_connector_sasl(ctor);
   if (!pn_sasl_init(sasl)) {
     pn_sasl_client(sasl, ctx->mechanism, ctx->username, ctx->password);
   }
@@ -226,11 +227,11 @@ void client_callback(pn_selectable_t *se
     break;
   default:
     fprintf(stderr, "auth failed\n");
-    pn_driver_stop(ctx->driver);
+    ctx->done = true;
     return;
   }
 
-  pn_connection_t *connection = pn_selectable_connection(sel);
+  pn_connection_t *connection = pn_connector_connection(ctor);
   char tagstr[1024];
   char msg[1024];
   char data[1024];
@@ -321,7 +322,7 @@ void client_callback(pn_selectable_t *se
   }
 
   if (pn_connection_state(connection) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) 
{
-    pn_driver_stop(ctx->driver);
+    ctx->done = true;
   }
 }
 
@@ -375,19 +376,43 @@ int main(int argc, char **argv)
 
   pn_driver_t *drv = pn_driver();
   if (url) {
-    struct client_context ctx = {false, 10, 10, drv};
+    struct client_context ctx = {false, false, 10, 10, drv};
     ctx.username = user;
     ctx.password = pass;
     ctx.mechanism = mechanism;
     mbstowcs(ctx.hostname, host, 1024);
     mbstowcs(ctx.address, address, 1024);
-    if (!pn_connector(drv, host, port, client_callback, &ctx)) 
pn_fatal("connector failed\n");
+    if (!pn_connector(drv, host, port, &ctx)) pn_fatal("connector failed\n");
+    while (!ctx.done) {
+      pn_driver_wait(drv);
+      pn_connector_t *c;
+      while ((c = pn_driver_process(drv))) {
+        client_callback(c);
+        if (pn_connector_closed(c)) {
+          pn_connector_destroy(c);
+        }
+      }
+    }
   } else {
     struct server_context ctx = {0};
-    if (!pn_acceptor(drv, host, port, server_callback, &ctx)) 
pn_fatal("acceptor failed\n");
+    if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n");
+    while (true) {
+      pn_driver_wait(drv);
+      pn_connector_t *c;
+
+      while ((c = pn_driver_listen(drv))) {
+        pn_connector_set_context(c, &ctx);
+      }
+
+      while ((c = pn_driver_process(drv))) {
+        server_callback(c);
+        if (pn_connector_closed(c)) {
+          pn_connector_destroy(c);
+        }
+      }
+    }
   }
 
-  pn_driver_run(drv);
   pn_driver_destroy(drv);
 
   return 0;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to