Author: rhs
Date: Mon Mar 12 19:56:39 2012
New Revision: 1299813

URL: http://svn.apache.org/viewvc?rev=1299813&view=rev
Log:
cleaned up driver code; integrated sasl engine into driver; fleshed out sasl 
impl a bit more

Modified:
    qpid/proton/proton-c/include/proton/driver.h
    qpid/proton/proton-c/include/proton/engine.h
    qpid/proton/proton-c/include/proton/sasl.h
    qpid/proton/proton-c/include/proton/value.h
    qpid/proton/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/proton-c/src/driver.c
    qpid/proton/proton-c/src/engine/engine.c
    qpid/proton/proton-c/src/proton.c
    qpid/proton/proton-c/src/sasl/sasl-internal.h
    qpid/proton/proton-c/src/sasl/sasl.c
    qpid/proton/proton-c/src/types/array.c
    qpid/proton/proton-c/src/types/binary.c
    qpid/proton/proton-c/src/types/decode.c
    qpid/proton/proton-c/src/types/symbol.c
    qpid/proton/proton-c/src/types/value.c

Modified: qpid/proton/proton-c/include/proton/driver.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/driver.h?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/driver.h (original)
+++ qpid/proton/proton-c/include/proton/driver.h Mon Mar 12 19:56:39 2012
@@ -23,10 +23,12 @@
  */
 
 #include <proton/engine.h>
+#include <proton/sasl.h>
 #include <stdlib.h>
 
 typedef struct pn_driver_t pn_driver_t;
 typedef struct pn_selectable_t pn_selectable_t;
+typedef void (pn_callback_t)(pn_selectable_t *);
 
 #define PN_SEL_RD (0x0001)
 #define PN_SEL_WR (0x0002)
@@ -36,13 +38,11 @@ void pn_driver_run(pn_driver_t *d);
 void pn_driver_stop(pn_driver_t *d);
 void pn_driver_destroy(pn_driver_t *d);
 
-pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port,
-                             void (*cb)(pn_connection_t*, void*),
-                             void* context);
-pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port,
-                              void (*cb)(pn_connection_t*, void*),
-                              void* context);
-
+pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void* context);
+pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void* context);
+pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel);
+pn_connection_t *pn_selectable_connection(pn_selectable_t *sel);
+void *pn_selectable_context(pn_selectable_t *sel);
 void pn_selectable_destroy(pn_selectable_t *sel);
 
 #endif /* driver.h */

Modified: qpid/proton/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/engine.h?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/engine.h (original)
+++ qpid/proton/proton-c/include/proton/engine.h Mon Mar 12 19:56:39 2012
@@ -73,7 +73,8 @@ pn_endpoint_t *pn_endpoint_next(pn_endpo
                                 pn_endpoint_state_t remote);
 
 // transport
-#define EOS (-1)
+#define PN_EOS (-1)
+#define PN_ERR (-2)
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available);
 ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size);
 time_t pn_tick(pn_transport_t *engine, time_t now);

Modified: qpid/proton/proton-c/include/proton/sasl.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/sasl.h?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/proton-c/include/proton/sasl.h Mon Mar 12 19:56:39 2012
@@ -30,4 +30,17 @@ typedef struct pn_sasl_t pn_sasl_t;
 typedef enum {SASL_NONE=-1, SASL_OK=0, SASL_AUTH=1, SASL_SYS=2, SASL_PERM=3,
               SASL_TEMP=4} pn_sasl_outcome_t;
 
+pn_sasl_t *pn_sasl();
+void pn_sasl_client(pn_sasl_t *sasl, const char *username, const char 
*password);
+void pn_sasl_server(pn_sasl_t *sasl);
+void pn_sasl_auth(pn_sasl_t *sasl, pn_sasl_outcome_t outcome);
+bool pn_sasl_init(pn_sasl_t *sasl);
+const char *pn_sasl_mechanism(pn_sasl_t *sasl);
+pn_binary_t *pn_sasl_challenge(pn_sasl_t *sasl);
+pn_binary_t *pn_sasl_response(pn_sasl_t *sasl);
+pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl);
+ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
+ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
+void pn_sasl_destroy(pn_sasl_t *sasl);
+
 #endif /* sasl.h */

Modified: qpid/proton/proton-c/include/proton/value.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/value.h?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/value.h (original)
+++ qpid/proton/proton-c/include/proton/value.h Mon Mar 12 19:56:39 2012
@@ -132,6 +132,7 @@ size_t pn_format_sizeof_list(pn_list_t *
 size_t pn_format_sizeof_map(pn_map_t *map);
 size_t pn_format_sizeof_tag(pn_tag_t *tag);
 
+int pn_format_symbol(char **pos, char *limit, pn_symbol_t *sym);
 int pn_format_binary(char **pos, char *limit, pn_binary_t *binary);
 int pn_format_array(char **pos, char *limit, pn_array_t *array);
 int pn_format_list(char **pos, char *limit, pn_list_t *list);
@@ -150,6 +151,7 @@ void pn_free_array(pn_array_t *a);
 void pn_free_list(pn_list_t *l);
 void pn_free_map(pn_map_t *m);
 void pn_free_tag(pn_tag_t *t);
+void pn_free_symbol(pn_symbol_t *s);
 void pn_free_binary(pn_binary_t *b);
 void pn_free_string(pn_string_t *s);
 
@@ -170,14 +172,15 @@ void pn_visit_tag(pn_tag_t *t, void (*vi
 #define pn_to_bool(V) ((V).u.as_boolean)
 #define pn_to_string(V) ((V).u.as_string)
 #define pn_to_binary(V) ((V).u.as_binary)
+#define pn_to_symbol(V) ((V).u.as_symbol)
 
 /* symbol */
-pn_symbol_t *pn_symbol(char *name);
+pn_symbol_t *pn_symbol(const char *name);
+pn_symbol_t *pn_symboln(const char *name, size_t size);
 size_t pn_symbol_size(pn_symbol_t *s);
-char *pn_symbol_name(pn_symbol_t *s);
+const char *pn_symbol_name(pn_symbol_t *s);
 pn_symbol_t *pn_symbol_dup(pn_symbol_t *s);
 
-
 /* string */
 
 pn_string_t *pn_string(wchar_t *wcs);
@@ -188,7 +191,7 @@ wchar_t *pn_string_wcs(pn_string_t *str)
 
 pn_binary_t *pn_binary(char *bytes, size_t size);
 size_t pn_binary_size(pn_binary_t *b);
-char *pn_binary_bytes(pn_binary_t *b);
+const char *pn_binary_bytes(pn_binary_t *b);
 pn_binary_t *pn_binary_dup(pn_binary_t *b);
 
 /* arrays */

Modified: qpid/proton/proton-c/src/dispatcher/dispatcher.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/dispatcher/dispatcher.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/proton-c/src/dispatcher/dispatcher.c Mon Mar 12 19:56:39 2012
@@ -26,10 +26,11 @@
 #include <proton/framing.h>
 #include "dispatcher.h"
 
-pn_dispatcher_t *pn_dispatcher(void *context)
+pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context)
 {
   pn_dispatcher_t *disp = calloc(sizeof(pn_dispatcher_t), 1);
 
+  disp->frame_type = frame_type;
   disp->context = context;
 
   disp->channel = 0;
@@ -160,7 +161,7 @@ void pn_post_frame(pn_dispatcher_t *disp
                    .value = pn_from_list(disp->output_args) };
   pn_trace(disp, ch, OUT, performative, disp->output_args, 
disp->output_payload,
            disp->output_size);
-  pn_frame_t frame = {0};
+  pn_frame_t frame = {disp->frame_type};
   char bytes[pn_encode_sizeof(pn_from_tag(&tag)) + disp->output_size];
   size_t size = pn_encode(pn_from_tag(&tag), bytes);
   for (int i = 0; i < pn_list_size(disp->output_args); i++)

Modified: qpid/proton/proton-c/src/dispatcher/dispatcher.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/dispatcher/dispatcher.h?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/proton-c/src/dispatcher/dispatcher.h Mon Mar 12 19:56:39 2012
@@ -35,6 +35,7 @@ typedef void (pn_action_t)(pn_dispatcher
 struct pn_dispatcher_t {
   pn_action_t *actions[256];
   const char *names[256];
+  uint8_t frame_type;
   uint16_t channel;
   uint8_t code;
   pn_list_t *args;
@@ -50,7 +51,7 @@ struct pn_dispatcher_t {
   char scratch[SCRATCH];
 };
 
-pn_dispatcher_t *pn_dispatcher(void *context);
+pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context);
 void pn_dispatcher_destroy(pn_dispatcher_t *disp);
 void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code, const char 
*name,
                           pn_action_t *action);

Modified: qpid/proton/proton-c/src/driver.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/driver.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/driver.c (original)
+++ qpid/proton/proton-c/src/driver.c Mon Mar 12 19:56:39 2012
@@ -31,6 +31,7 @@
 #include <unistd.h>
 
 #include <proton/driver.h>
+#include <proton/sasl.h>
 #include "util.h"
 
 
@@ -40,10 +41,12 @@ struct pn_driver_t {
   pn_selectable_t *head;
   pn_selectable_t *tail;
   size_t size;
-  int ctrl[2];//pipe for updating selectable status
+  int ctrl[2]; //pipe for updating selectable status
   bool stopping;
 };
 
+#define IO_BUF_SIZE (4*1024)
+
 struct pn_selectable_t {
   pn_driver_t *driver;
   pn_selectable_t *next;
@@ -51,35 +54,24 @@ struct pn_selectable_t {
   int fd;
   int status;
   time_t wakeup;
-  void (*readable)(pn_selectable_t *s);
-  void (*writable)(pn_selectable_t *s);
-  time_t (*tick)(pn_selectable_t *s, time_t now);
-  void (*destroy)(pn_selectable_t *s);
+  pn_callback_t *read;
+  pn_callback_t *write;
+  time_t (*tick)(pn_selectable_t *sel, time_t now);
+  size_t input_size;
+  char input[IO_BUF_SIZE];
+  size_t output_size;
+  char output[IO_BUF_SIZE];
+  pn_sasl_t *sasl;
+  pn_connection_t *connection;
+  pn_transport_t *transport;
+  ssize_t (*process_input)(pn_selectable_t *sel);
+  ssize_t (*process_output)(pn_selectable_t *sel);
+  pn_callback_t *callback;
   void *context;
 };
 
 /* Impls */
 
-pn_driver_t *pn_driver()
-{
-  pn_driver_t *d = malloc(sizeof(pn_driver_t));
-  if (!d) return NULL;
-  d->head = NULL;
-  d->tail = NULL;
-  d->size = 0;
-  d->ctrl[0] = 0;
-  d->ctrl[1] = 0;
-  d ->stopping = false;
-  return d;
-}
-
-void pn_driver_destroy(pn_driver_t *d)
-{
-  while (d->head)
-    pn_selectable_destroy(d->head);
-  free(d);
-}
-
 static void pn_driver_add(pn_driver_t *d, pn_selectable_t *s)
 {
   LL_ADD(d->head, d->tail, s);
@@ -94,246 +86,333 @@ static void pn_driver_remove(pn_driver_t
   d->size--;
 }
 
-void pn_driver_run(pn_driver_t *d)
-{
-  int i, nfds = 0;
-  struct pollfd *fds = NULL;
-
-  if (pipe(d->ctrl)) {
-      perror("Can't create control pipe");
-  }
-  while (!d->stopping)
-  {
-    int n = d->size;
-    if (n == 0) break;
-    if (n > nfds) {
-      fds = realloc(fds, (n+1)*sizeof(struct pollfd));
-      nfds = n;
-    }
-
-    pn_selectable_t *s = d->head;
-    for (i = 0; i < n; i++)
-    {
-      fds[i].fd = s->fd;
-      fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
-        (s->status & PN_SEL_WR ? POLLOUT : 0);
-      fds[i].revents = 0;
-      if (s->tick) {
-        // XXX
-        s->tick(s, 0);
-      }
-      s = s->next;
-    }
-    fds[n].fd = d->ctrl[0];
-    fds[n].events = POLLIN;
-    fds[n].revents = 0;
-
-    DIE_IFE(poll(fds, n+1, -1));
-
-    s = d->head;
-    for (i = 0; i < n; i++)
-    {
-      if (fds[i].revents & POLLIN)
-        s->readable(s);
-      if (fds[i].revents & POLLOUT)
-        s->writable(s);
-      s = s->next;
-    }
-
-    if (fds[n].revents & POLLIN) {
-      //clear the pipe
-      char buffer[512];
-      while (read(d->ctrl[0], buffer, 512) == 512);
-    }
-  }
-
-  close(d->ctrl[0]);
-  close(d->ctrl[1]);
-  free(fds);
-}
-
-void pn_driver_stop(pn_driver_t *d)
-{
-  d->stopping = true;
-  write(d->ctrl[1], "x", 1);
-}
+static void pn_selectable_read(pn_selectable_t *sel);
+static void pn_selectable_write(pn_selectable_t *sel);
+static time_t pn_selectable_tick(pn_selectable_t *sel, time_t now);
+
+static ssize_t pn_selectable_read_sasl_header(pn_selectable_t *sel);
+static ssize_t pn_selectable_read_sasl(pn_selectable_t *sel);
+static ssize_t pn_selectable_read_amqp_header(pn_selectable_t *sel);
+static ssize_t pn_selectable_read_amqp(pn_selectable_t *sel);
+static ssize_t pn_selectable_write_sasl_header(pn_selectable_t *sel);
+static ssize_t pn_selectable_write_sasl(pn_selectable_t *sel);
+static ssize_t pn_selectable_write_amqp_header(pn_selectable_t *sel);
+static ssize_t pn_selectable_write_amqp(pn_selectable_t *sel);
 
-static pn_selectable_t *pn_selectable()
+pn_selectable_t *pn_selectable(pn_driver_t *driver, int fd, pn_callback_t 
*callback, void *context)
 {
   pn_selectable_t *s = malloc(sizeof(pn_selectable_t));
   if (!s) return NULL;
-  s->driver = NULL;
+  s->driver = driver;
   s->next = NULL;
   s->prev = NULL;
+  s->fd = fd;
   s->status = 0;
   s->wakeup = 0;
-  s->readable = NULL;
-  s->writable = NULL;
-  s->tick = NULL;
-  s->destroy = NULL;
-  s->context = NULL;
+  s->read = pn_selectable_read;
+  s->write = pn_selectable_write;
+  s->tick = pn_selectable_tick;
+  s->input_size = 0;
+  s->output_size = 0;
+  s->sasl = pn_sasl();
+  s->connection = pn_connection();
+  s->transport = pn_transport(s->connection);
+  s->process_input = pn_selectable_read_sasl_header;
+  s->process_output = pn_selectable_write_sasl_header;
+  s->callback = callback;
+  s->context = context;
+
+  pn_driver_add(driver, s);
+
   return s;
 }
 
-void pn_selectable_destroy(pn_selectable_t *s)
+pn_sasl_t *pn_selectable_sasl(pn_selectable_t *sel)
 {
-  if (s->driver) pn_driver_remove(s->driver, s);
-  if (s->destroy) s->destroy(s);
-  free(s);
+  return sel->sasl;
 }
 
-// engine related
+pn_connection_t *pn_selectable_connection(pn_selectable_t *sel)
+{
+  return sel->connection;
+}
 
-#define IO_BUF_SIZE (4*1024)
+void *pn_selectable_context(pn_selectable_t *sel)
+{
+  return sel->context;
+}
 
-struct pn_engine_ctx {
-  pn_connection_t *connection;
-  pn_transport_t *transport;
-  int in_size;
-  int out_size;
-  char input[IO_BUF_SIZE];
-  char output[IO_BUF_SIZE];
-  void (*callback)(pn_connection_t*, void*);
-  void *context;
-};
+void pn_selectable_destroy(pn_selectable_t *sel)
+{
+  if (sel->driver) pn_driver_remove(sel->driver, sel);
+  if (sel->connection) pn_destroy((pn_endpoint_t *) sel->connection);
+  if (sel->sasl) pn_sasl_destroy(sel->sasl);
+  free(sel);
+}
 
-static void pn_selectable_engine_close(pn_selectable_t *sel)
+static void pn_selectable_close(pn_selectable_t *sel)
 {
+  // XXX: should probably signal engine and callback here
   sel->status = 0;
   if (close(sel->fd) == -1)
     perror("close");
-  pn_driver_remove(sel->driver, sel);
-  pn_selectable_destroy(sel);
 }
 
-static struct pn_engine_ctx *pn_selectable_engine_read(pn_selectable_t *sel)
+static void pn_selectable_consume(pn_selectable_t *sel, int n)
+{
+  sel->input_size -= n;
+  memmove(sel->input, sel->input + n, sel->input_size);
+}
+
+static void pn_selectable_read(pn_selectable_t *sel)
 {
-  struct pn_engine_ctx *ctx = sel->context;
-  ssize_t n = recv(sel->fd, ctx->input + ctx->in_size, IO_BUF_SIZE - 
ctx->in_size, 0);
+  ssize_t n = recv(sel->fd, sel->input + sel->input_size, IO_BUF_SIZE - 
sel->input_size, 0);
 
   if (n <= 0) {
     printf("disconnected: %zi\n", n);
-    pn_selectable_engine_close(sel);
-    return NULL;
+    pn_selectable_close(sel);
+    pn_selectable_destroy(sel);
   } else {
-    ctx->in_size += n;
+    sel->input_size += n;
+  }
+
+  while (sel->input_size > 0) {
+    n = sel->process_input(sel);
+    if (n > 0) {
+      pn_selectable_consume(sel, n);
+    } else if (n == 0) {
+      return;
+    } else {
+      if (n != PN_EOS) printf("error in process_input: %zi\n", n);
+      pn_selectable_close(sel);
+      pn_selectable_destroy(sel);
+      return;
+    }
   }
-  return ctx;
 }
 
-static void pn_selectable_engine_consume(struct pn_engine_ctx *ctx, int n)
+static ssize_t pn_selectable_read_sasl_header(pn_selectable_t *sel)
 {
-  ctx->in_size -= n;
-  memmove(ctx->input, ctx->input + n, ctx->in_size);
+  if (sel->input_size >= 8) {
+    if (memcmp(sel->input, "AMQP\x03\x01\x00\x00", 8)) {
+      fprintf(stderr, "sasl header missmatch\n");
+      return PN_ERR;
+    } else {
+      fprintf(stderr, "    <- AMQP SASL 1.0\n");
+      sel->process_input = pn_selectable_read_sasl;
+      return 8;
+    }
+  }
+
+  return 0;
 }
 
-static void pn_engine_readable_input(pn_selectable_t *sel, struct 
pn_engine_ctx *ctx)
+static ssize_t pn_selectable_read_sasl(pn_selectable_t *sel)
 {
-  pn_transport_t *transport = ctx->transport;
-  ssize_t n = pn_input(transport, ctx->input, ctx->in_size);
-  if (n < 0) {
-    if (n != EOS) {
-      printf("error: %zi\n", n);
-    }
-    pn_selectable_engine_close(sel);
+  pn_sasl_t *sasl = sel->sasl;
+  ssize_t n = pn_sasl_input(sasl, sel->input, sel->input_size);
+  if (n == PN_EOS) {
+    sel->process_input = pn_selectable_read_amqp_header;
+    return sel->process_input(sel);
   } else {
-    pn_selectable_engine_consume(ctx, n);
+    return n;
+  }
+}
+
+static ssize_t pn_selectable_read_amqp_header(pn_selectable_t *sel)
+{
+  if (sel->input_size >= 8) {
+    if (memcmp(sel->input, "AMQP\x00\x01\x00\x00", 8)) {
+      fprintf(stderr, "amqp header missmatch\n");
+      return PN_ERR;
+    } else {
+      fprintf(stderr, "    <- AMQP 1.0\n");
+      sel->process_input = pn_selectable_read_amqp;
+      return 8;
+    }
   }
+
+  return 0;
 }
 
-static void pn_engine_readable(pn_selectable_t *sel)
+static ssize_t pn_selectable_read_amqp(pn_selectable_t *sel)
 {
-  struct pn_engine_ctx *ctx = pn_selectable_engine_read(sel);
-  if (ctx) pn_engine_readable_input(sel, ctx);
+  pn_transport_t *transport = sel->transport;
+  return pn_input(transport, sel->input, sel->input_size);
 }
 
-static void pn_engine_readable_hdr(pn_selectable_t *sel)
+static char *pn_selectable_output(pn_selectable_t *sel)
 {
-  struct pn_engine_ctx *ctx = pn_selectable_engine_read(sel);
+  return sel->output + sel->output_size;
+}
 
-  if (!ctx)
-    return;
+static size_t pn_selectable_available(pn_selectable_t *sel)
+{
+  return IO_BUF_SIZE - sel->output_size;
+}
 
-  if (ctx->in_size >= 8) {
-    if (memcmp(ctx->input, "AMQP\x00\x01\x00\x00", 8)) {
-      printf("header missmatch");
-      pn_selectable_engine_close(sel);
+static void pn_selectable_write(pn_selectable_t *sel)
+{
+  while (pn_selectable_available(sel) > 0) {
+    ssize_t n = sel->process_output(sel);
+    if (n > 0) {
+      sel->output_size += n;
+    } else if (n == 0) {
+      break;
     } else {
-      pn_selectable_engine_consume(ctx, 8);
-      sel->readable = &pn_engine_readable;
-      pn_engine_readable_input(sel, ctx);
+      if (n != PN_EOS) fprintf(stderr, "error in process_output: %zi", n);
+      pn_selectable_close(sel);
+      pn_selectable_destroy(sel);
+      return;
     }
   }
-}
 
-static void pn_engine_writable(pn_selectable_t *sel)
-{
-  struct pn_engine_ctx *ctx = sel->context;
-  pn_transport_t *transport = ctx->transport;
-  ssize_t n = pn_output(transport, ctx->output + ctx->out_size, IO_BUF_SIZE - 
ctx->out_size);
-  if (n < 0) {
-    printf("internal error: %zi", n);
-    pn_selectable_engine_close(sel);
-  } else {
-    ctx->out_size += n;
-    n = send(sel->fd, ctx->output, ctx->out_size, 0);
+  if (sel->output_size > 0) {
+    ssize_t n = send(sel->fd, sel->output, sel->output_size, 0);
     if (n < 0) {
       // XXX
-      perror("writable");
+      perror("send");
+      pn_selectable_close(sel);
+      pn_selectable_destroy(sel);
+      return;
     } else {
-      ctx->out_size -= n;
-      memmove(ctx->output, ctx->output + n, ctx->out_size);
+      sel->output_size -= n;
+      memmove(sel->output, sel->output + n, sel->output_size);
     }
-    if (ctx->out_size)
+
+    if (sel->output_size)
       sel->status |= PN_SEL_WR;
     else
       sel->status &= ~PN_SEL_WR;
   }
 }
 
-static time_t pn_selectable_engine_tick(pn_selectable_t *sel, time_t now)
+static ssize_t pn_selectable_write_sasl_header(pn_selectable_t *sel)
+{
+  fprintf(stderr, "    -> AMQP SASL 1.0\n");
+  memmove(pn_selectable_output(sel), "AMQP\x03\x01\x00\x00", 8);
+  sel->process_output = pn_selectable_write_sasl;
+  return 8;
+}
+
+static ssize_t pn_selectable_write_sasl(pn_selectable_t *sel)
+{
+  pn_sasl_t *sasl = sel->sasl;
+  ssize_t n = pn_sasl_output(sasl, pn_selectable_output(sel), 
pn_selectable_available(sel));
+  if (n == PN_EOS) {
+    sel->process_output = pn_selectable_write_amqp_header;
+    return sel->process_output(sel);
+  } else {
+    return n;
+  }
+}
+
+static ssize_t pn_selectable_write_amqp_header(pn_selectable_t *sel)
 {
-  struct pn_engine_ctx *ctx = sel->context;
-  time_t result = pn_tick(ctx->transport, now);
-  if (ctx->callback) ctx->callback(ctx->connection, ctx->context);
-  pn_engine_writable(sel);
+  fprintf(stderr, "    -> AMQP 1.0\n");
+  memmove(pn_selectable_output(sel), "AMQP\x00\x01\x00\x00", 8);
+  sel->process_output = pn_selectable_write_amqp;
+  return 8;
+}
+
+static ssize_t pn_selectable_write_amqp(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = sel->transport;
+  return pn_output(transport, pn_selectable_output(sel), 
pn_selectable_available(sel));
+}
+
+static time_t pn_selectable_tick(pn_selectable_t *sel, time_t now)
+{
+  // XXX: should probably have a function pointer for this and switch it with 
different layers
+  time_t result = pn_tick(sel->transport, now);
+  if (sel->callback) sel->callback(sel);
+  pn_selectable_write(sel);
   return result;
 }
 
-static void pn_engine_destroy(pn_selectable_t *s)
+pn_driver_t *pn_driver()
+{
+  pn_driver_t *d = malloc(sizeof(pn_driver_t));
+  if (!d) return NULL;
+  d->head = NULL;
+  d->tail = NULL;
+  d->size = 0;
+  d->ctrl[0] = 0;
+  d->ctrl[1] = 0;
+  d ->stopping = false;
+  return d;
+}
+
+void pn_driver_destroy(pn_driver_t *d)
+{
+  while (d->head)
+    pn_selectable_destroy(d->head);
+  free(d);
+}
+
+void pn_driver_run(pn_driver_t *d)
 {
-  struct pn_engine_ctx *ctx = s->context;
-  if (ctx) {
-    pn_destroy((pn_endpoint_t *)ctx->connection);
-    free(ctx);
-    s->context = NULL;
+  int i, nfds = 0;
+  struct pollfd *fds = NULL;
+
+  if (pipe(d->ctrl)) {
+      perror("Can't create control pipe");
   }
+  while (!d->stopping)
+  {
+    int n = d->size;
+    if (n == 0) break;
+    if (n > nfds) {
+      fds = realloc(fds, (n+1)*sizeof(struct pollfd));
+      nfds = n;
+    }
+
+    pn_selectable_t *s = d->head;
+    for (i = 0; i < n; i++)
+    {
+      fds[i].fd = s->fd;
+      fds[i].events = (s->status & PN_SEL_RD ? POLLIN : 0) |
+        (s->status & PN_SEL_WR ? POLLOUT : 0);
+      fds[i].revents = 0;
+      // XXX
+      s->tick(s, 0);
+      s = s->next;
+    }
+    fds[n].fd = d->ctrl[0];
+    fds[n].events = POLLIN;
+    fds[n].revents = 0;
+
+    DIE_IFE(poll(fds, n+1, -1));
+
+    s = d->head;
+    for (i = 0; i < n; i++)
+    {
+      if (fds[i].revents & POLLIN)
+        s->read(s);
+      if (fds[i].revents & POLLOUT)
+        s->write(s);
+      s = s->next;
+    }
+
+    if (fds[n].revents & POLLIN) {
+      //clear the pipe
+      char buffer[512];
+      while (read(d->ctrl[0], buffer, 512) == 512);
+    }
+  }
+
+  close(d->ctrl[0]);
+  close(d->ctrl[1]);
+  free(fds);
 }
 
-static pn_selectable_t *pn_selectable_engine(int sock, pn_connection_t *conn,
-                                             void (*cb)(pn_connection_t*, 
void*), void* ctx)
-{
-  pn_selectable_t *sel = pn_selectable();
-  sel->fd = sock;
-  sel->readable = &pn_engine_readable_hdr;
-  sel->writable = &pn_engine_writable;
-  sel->destroy = &pn_engine_destroy;
-  sel->tick = &pn_selectable_engine_tick;
-  sel->status = PN_SEL_RD | PN_SEL_WR;
-  struct pn_engine_ctx *sctx = malloc(sizeof(struct pn_engine_ctx));
-  sctx->connection = conn;
-  sctx->transport = pn_transport(conn);
-  sctx->in_size = 0;
-  memmove(sctx->output, "AMQP\x00\x01\x00\x00", 8);
-  sctx->out_size = 8;
-  sctx->callback = cb;
-  sctx->context = ctx;
-  sel->context = sctx;
-  return sel;
+void pn_driver_stop(pn_driver_t *d)
+{
+  d->stopping = true;
+  write(d->ctrl[1], "x", 1);
 }
 
-pn_selectable_t *pn_connector(pn_driver_t *drv, char *host, char *port,
-                              void (*cb)(pn_connection_t*, void*), void* ctx)
+pn_selectable_t *pn_connector(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void *context)
 {
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
@@ -353,10 +432,9 @@ pn_selectable_t *pn_connector(pn_driver_
 
   freeaddrinfo(addr);
 
-  pn_connection_t *conn = pn_connection();
-  pn_selectable_t *s = pn_selectable_engine(sock, conn, cb, ctx);
+  pn_selectable_t *s = pn_selectable(driver, sock, callback, context);
+  s->status = PN_SEL_RD | PN_SEL_WR;
 
-  pn_driver_add(drv, s);
   printf("Connected to %s:%s\n", host, port);
   return s;
 }
@@ -378,17 +456,16 @@ static void do_accept(pn_selectable_t *s
         perror("close");
     } else {
       printf("accepted from %s:%s\n", host, serv);
-      pn_connection_t *conn = pn_connection();
-      struct pn_engine_ctx *ctx = s->context;
-      pn_selectable_t *a = pn_selectable_engine(sock, conn, ctx->callback, 
ctx->context);
+      pn_selectable_t *a = pn_selectable(s->driver, sock, s->callback, 
s->context);
       a->status = PN_SEL_RD | PN_SEL_WR;
-      pn_driver_add(s->driver, a);
     }
   }
 }
 
-pn_selectable_t *pn_acceptor(pn_driver_t *drv, char *host, char *port,
-                             void (*cb)(pn_connection_t*, void*), void* 
context)
+static void do_nothing(pn_selectable_t *s) {}
+static time_t never_tick(pn_selectable_t *s, time_t now) { return 0; }
+
+pn_selectable_t *pn_acceptor(pn_driver_t *driver, char *host, char *port, 
pn_callback_t *callback, void* context)
 {
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
@@ -415,17 +492,13 @@ pn_selectable_t *pn_acceptor(pn_driver_t
   if (listen(sock, 50) == -1)
     return NULL;
 
-  pn_selectable_t *s = pn_selectable();
-  s->fd = sock;
-  s->readable = &do_accept;
-  s->writable = NULL;
+  // XXX: should factor into pure selectable and separate subclass
+  pn_selectable_t *s = pn_selectable(driver, sock, callback, context);
+  s->read = do_accept;
+  s->write = do_nothing;
+  s->tick = never_tick;
   s->status = PN_SEL_RD;
-  struct pn_engine_ctx *ctx = malloc(sizeof(struct pn_engine_ctx));
-  ctx->callback = cb;
-  ctx->context = context;
-  s->context = ctx;
 
-  pn_driver_add(drv, s);
   printf("Listening on %s:%s\n", host, port);
   return s;
 }

Modified: qpid/proton/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/engine/engine.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/engine/engine.c (original)
+++ qpid/proton/proton-c/src/engine/engine.c Mon Mar 12 19:56:39 2012
@@ -260,13 +260,22 @@ void pn_clear_tag(pn_delivery_t *deliver
   }
 }
 
+void pn_clear_bytes(pn_delivery_t *delivery)
+{
+  if (delivery->capacity) {
+    free(delivery->bytes);
+    delivery->bytes = NULL;
+    delivery->capacity = 0;
+  }
+}
+
 void pn_free_deliveries(pn_delivery_t *delivery)
 {
   while (delivery)
   {
     pn_delivery_t *next = delivery->link_next;
     pn_clear_tag(delivery);
-    if (delivery->capacity) free(delivery->bytes);
+    pn_clear_bytes(delivery);
     free(delivery);
     delivery = next;
   }
@@ -548,7 +557,7 @@ void pn_transport_init(pn_transport_t *t
 {
   pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
 
-  transport->disp = pn_dispatcher(transport);
+  transport->disp = pn_dispatcher(0, transport);
 
   pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
   pn_dispatcher_action(transport->disp, BEGIN, "BEGIN", pn_do_begin);
@@ -804,7 +813,7 @@ void pn_real_settle(pn_delivery_t *deliv
   // TODO: what if we settle the current delivery?
   LL_ADD_PFX(link->settled_head, link->settled_tail, delivery, link_);
   pn_clear_tag(delivery);
-  delivery->size = 0;
+  pn_clear_bytes(delivery);
 }
 
 void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
@@ -1053,12 +1062,12 @@ void pn_do_close(pn_dispatcher_t *disp)
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
 {
   if (transport->endpoint.local_state == CLOSED) {
-    return EOS;
+    return PN_EOS;
   }
 
   if (transport->endpoint.remote_state == CLOSED) {
     pn_do_error(transport, "amqp:connection:framing-error", "data after 
close");
-    return EOS;
+    return PN_ERR;
   }
 
   return pn_dispatcher_input(transport->disp, bytes, available);
@@ -1375,7 +1384,7 @@ ssize_t pn_output(pn_transport_t *transp
   pn_process(transport);
 
   if (!transport->disp->available && transport->endpoint.local_state == 
CLOSED) {
-    return EOS;
+    return PN_EOS;
   }
 
   // XXX: errors?

Modified: qpid/proton/proton-c/src/proton.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/proton.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/proton.c (original)
+++ qpid/proton/proton-c/src/proton.c Mon Mar 12 19:56:39 2012
@@ -76,9 +76,37 @@ struct server_context {
   int count;
 };
 
-void server_callback(pn_connection_t *conn, void *context)
+void server_callback(pn_selectable_t *sel)
 {
-  struct server_context *ctx = context;
+  pn_sasl_t *sasl = pn_selectable_sasl(sel);
+
+  if (!pn_sasl_init(sasl)) {
+    pn_sasl_server(sasl);
+  }
+
+  switch (pn_sasl_outcome(sasl)) {
+  case SASL_NONE:
+    {
+      const char *mech = pn_sasl_mechanism(sasl);
+      if (mech && !strcmp(mech, "PLAIN")) {
+        pn_binary_t *response = pn_sasl_response(sasl);
+        char buf[1024];
+        pn_format(buf, 1024, pn_from_binary(response));
+        printf("response = %s\n", buf);
+        pn_sasl_auth(sasl, SASL_OK);
+        break;
+      } else {
+        return;
+      }
+    }
+  case SASL_OK:
+    break;
+  default:
+    return;
+  }
+
+  pn_connection_t *conn = pn_selectable_connection(sel);
+  struct server_context *ctx = pn_selectable_context(sel);
   char tagstr[1024];
   char msg[1024];
 
@@ -187,11 +215,31 @@ struct client_context {
   int recv_count;
   int send_count;
   pn_driver_t *driver;
+  const char *username;
+  const char *password;
 };
 
-void client_callback(pn_connection_t *connection, void *context)
+void client_callback(pn_selectable_t *sel)
 {
-  struct client_context *ctx = context;
+  struct client_context *ctx = pn_selectable_context(sel);
+
+  pn_sasl_t *sasl = pn_selectable_sasl(sel);
+  if (!pn_sasl_init(sasl)) {
+    pn_sasl_client(sasl, ctx->username, ctx->password);
+  }
+
+  switch (pn_sasl_outcome(sasl)) {
+  case SASL_NONE:
+    return;
+  case SASL_OK:
+    break;
+  default:
+    fprintf(stderr, "auth failed\n");
+    pn_driver_stop(ctx->driver);
+    return;
+  }
+
+  pn_connection_t *connection = pn_selectable_connection(sel);
   char tagstr[1024];
   char msg[1024];
 
@@ -302,6 +350,12 @@ int main(int argc, char **argv)
   pn_driver_t *drv = pn_driver();
   if (argc > 1) {
     struct client_context ctx = {false, 10, 10, drv};
+    ctx.username = argv[1];
+    if (argc > 2) {
+      ctx.password = argv[2];
+    } else {
+      ctx.password = "";
+    }
     if (!pn_connector(drv, "0.0.0.0", "5672", client_callback, &ctx)) 
perror("proton");
   } else {
     struct server_context ctx = {0};

Modified: qpid/proton/proton-c/src/sasl/sasl-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/sasl/sasl-internal.h?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/sasl/sasl-internal.h (original)
+++ qpid/proton/proton-c/src/sasl/sasl-internal.h Mon Mar 12 19:56:39 2012
@@ -29,6 +29,10 @@
 
 struct pn_sasl_t {
   pn_dispatcher_t *disp;
+  bool init;
+  pn_symbol_t *mechanism;
+  pn_binary_t *challenge;
+  pn_binary_t *response;
   pn_sasl_outcome_t outcome;
   char scratch[SCRATCH];
 };

Modified: qpid/proton/proton-c/src/sasl/sasl.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/sasl/sasl.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/proton-c/src/sasl/sasl.c Mon Mar 12 19:56:39 2012
@@ -24,27 +24,72 @@
 #include <string.h>
 #include <proton/framing.h>
 #include <proton/value.h>
+#include <proton/engine.h> // XXX: just needed for PN_EOS
 #include "sasl-internal.h"
 #include "../protocol.h"
 
 void pn_do_init(pn_dispatcher_t *disp);
 void pn_do_mechanisms(pn_dispatcher_t *disp);
+void pn_do_challenge(pn_dispatcher_t *disp);
+void pn_do_response(pn_dispatcher_t *disp);
 void pn_do_outcome(pn_dispatcher_t *disp);
 
 pn_sasl_t *pn_sasl()
 {
   pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
-  sasl->disp = pn_dispatcher(sasl);
+  sasl->disp = pn_dispatcher(1, sasl);
 
   pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
   pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", 
pn_do_mechanisms);
-  // XXX: challenge/response
+  pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", 
pn_do_challenge);
+  pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", 
pn_do_response);
   pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", 
pn_do_outcome);
 
+  sasl->init = false;
   sasl->outcome = SASL_NONE;
+  sasl->mechanism = NULL;
+  sasl->challenge = NULL;
+  sasl->response = NULL;
+
   return sasl;
 }
 
+void pn_sasl_mechanism_set(pn_sasl_t *sasl, const char *mechanism)
+{
+  if (sasl->mechanism) pn_free_symbol(sasl->mechanism);
+  sasl->mechanism = pn_symbol(mechanism);
+}
+
+const char *pn_sasl_mechanism(pn_sasl_t *sasl)
+{
+  if (sasl->mechanism)
+    return pn_symbol_name(sasl->mechanism);
+  else
+    return NULL;
+}
+
+void pn_sasl_challenge_set(pn_sasl_t *sasl, pn_binary_t *challenge)
+{
+  if (sasl->challenge) pn_free_binary(sasl->challenge);
+  sasl->challenge = pn_binary_dup(challenge);
+}
+
+pn_binary_t *pn_sasl_challenge(pn_sasl_t *sasl)
+{
+  return sasl->challenge;
+}
+
+void pn_sasl_response_set(pn_sasl_t *sasl, pn_binary_t *response)
+{
+  if (sasl->response) pn_free_binary(sasl->response);
+  sasl->response = pn_binary_dup(response);
+}
+
+pn_binary_t *pn_sasl_response(pn_sasl_t *sasl)
+{
+  return sasl->response;
+}
+
 void pn_sasl_client(pn_sasl_t *sasl, const char *username, const char 
*password)
 {
   size_t usize = strlen(username);
@@ -61,21 +106,56 @@ void pn_sasl_client(pn_sasl_t *sasl, con
   pn_field(sasl->disp, SASL_INIT_MECHANISM, 
pn_from_symbol(pn_symbol("PLAIN")));
   pn_field(sasl->disp, SASL_INIT_INITIAL_RESPONSE, 
pn_from_binary(pn_binary(iresp, size)));
   pn_post_frame(sasl->disp, 0, SASL_INIT);
+  sasl->init = true;
+}
+
+void pn_sasl_server(pn_sasl_t *sasl)
+{
+  pn_init_frame(sasl->disp);
+  pn_field(sasl->disp, SASL_MECHANISMS_SASL_SERVER_MECHANISMS, 
pn_value("@s[s]", "PLAIN"));
+  pn_post_frame(sasl->disp, 0, SASL_MECHANISMS);
+  sasl->init = true;
+}
+
+void pn_sasl_auth(pn_sasl_t *sasl, pn_sasl_outcome_t outcome)
+{
+  sasl->outcome = outcome;
+
+  pn_init_frame(sasl->disp);
+  pn_field(sasl->disp, SASL_OUTCOME_CODE, pn_value("B", outcome));
+  pn_post_frame(sasl->disp, 0, SASL_OUTCOME);
+}
+
+bool pn_sasl_init(pn_sasl_t *sasl)
+{
+  return sasl->init;
 }
 
 void pn_sasl_destroy(pn_sasl_t *sasl)
 {
+  pn_sasl_mechanism_set(sasl, NULL);
+  pn_sasl_challenge_set(sasl, NULL);
+  pn_sasl_response_set(sasl, NULL);
   pn_dispatcher_destroy(sasl->disp);
+  free(sasl);
 }
 
 ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
 {
-  return pn_dispatcher_input(sasl->disp, bytes, available);
+  if (sasl->outcome != SASL_NONE) {
+    return PN_EOS;
+  } else {
+    return pn_dispatcher_input(sasl->disp, bytes, available);
+  }
 }
 
 ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size)
 {
-  return pn_dispatcher_output(sasl->disp, bytes, size);
+  if (sasl->disp->available == 0 && sasl->outcome != SASL_NONE) {
+    return PN_EOS;
+  } else {
+    return pn_dispatcher_output(sasl->disp, bytes, size);
+  }
 }
 
 pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl)
@@ -85,8 +165,10 @@ pn_sasl_outcome_t pn_sasl_outcome(pn_sas
 
 void pn_do_init(pn_dispatcher_t *disp)
 {
-  //pn_sasl_t *sasl = disp->context;
-  
+  pn_sasl_t *sasl = disp->context;
+  pn_symbol_t *mech = pn_to_symbol(pn_list_get(disp->args, 
SASL_INIT_MECHANISM));
+  pn_sasl_mechanism_set(sasl, pn_symbol_name(mech));
+  pn_sasl_response_set(sasl, pn_to_binary(pn_list_get(disp->args, 
SASL_INIT_INITIAL_RESPONSE)));
 }
 
 void pn_do_mechanisms(pn_dispatcher_t *disp)
@@ -95,8 +177,18 @@ void pn_do_mechanisms(pn_dispatcher_t *d
   
 }
 
+void pn_do_challenge(pn_dispatcher_t *disp)
+{
+  
+}
+
+void pn_do_response(pn_dispatcher_t *resp)
+{
+  
+}
+
 void pn_do_outcome(pn_dispatcher_t *disp)
 {
   pn_sasl_t *sasl = disp->context;
-  sasl->outcome = pn_to_int32(pn_list_get(disp->args, SASL_OUTCOME_CODE));
+  sasl->outcome = pn_to_uint8(pn_list_get(disp->args, SASL_OUTCOME_CODE));
 }

Modified: qpid/proton/proton-c/src/types/array.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/array.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/array.c (original)
+++ qpid/proton/proton-c/src/types/array.c Mon Mar 12 19:56:39 2012
@@ -41,6 +41,7 @@ static char type_to_code(enum TYPE type)
   case FLOAT: return 'f';
   case DOUBLE: return 'd';
   case CHAR: return 'C';
+  case SYMBOL: return 's';
   case STRING: return 'S';
   case BINARY: return 'z';
   case LIST: return 't';
@@ -66,6 +67,7 @@ static uint8_t type_to_amqp_code(enum TY
   case ULONG: return PNE_ULONG;
   case FLOAT: return PNE_FLOAT;
   case DOUBLE: return PNE_DOUBLE;
+  case SYMBOL: return PNE_SYM32;
   case STRING: return PNE_STR32_UTF8;
   case BINARY: return PNE_VBIN32;
   case LIST: return PNE_LIST32;

Modified: qpid/proton/proton-c/src/types/binary.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/binary.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/binary.c (original)
+++ qpid/proton/proton-c/src/types/binary.c Mon Mar 12 19:56:39 2012
@@ -44,7 +44,7 @@ size_t pn_binary_size(pn_binary_t *b)
   return b->size;
 }
 
-char *pn_binary_bytes(pn_binary_t *b)
+const char *pn_binary_bytes(pn_binary_t *b)
 {
   return b->bytes;
 }
@@ -69,7 +69,10 @@ int pn_compare_binary(pn_binary_t *a, pn
 
 pn_binary_t *pn_binary_dup(pn_binary_t *b)
 {
-  return pn_binary(b->bytes, b->size);
+  if (b)
+    return pn_binary(b->bytes, b->size);
+  else
+    return NULL;
 }
 
 int pn_format_binary(char **pos, char *limit, pn_binary_t *binary)

Modified: qpid/proton/proton-c/src/types/decode.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/decode.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/decode.c (original)
+++ qpid/proton/proton-c/src/types/decode.c Mon Mar 12 19:56:39 2012
@@ -51,8 +51,8 @@ static enum TYPE amqp_code_to_type(uint8
   case PNE_VBIN32: return BINARY;
   case PNE_STR8_UTF8:
   case PNE_STR32_UTF8: return STRING;
-    //  case PNE_SYM8:
-    //  case PNE_SYM32: return SYMBOL;
+  case PNE_SYM8:
+  case PNE_SYM32: return SYMBOL;
   case PNE_LIST0:
   case PNE_LIST8:
   case PNE_LIST32: return LIST;
@@ -205,9 +205,9 @@ void pn_decode_utf8(void *ctx, size_t si
   value->u.as_string = pn_string(buf);
 }
 void pn_decode_symbol(void *ctx, size_t size, char *bytes) {
-  //  pn_value_t *value = next_value(ctx);
-  //  value->type = SYMBOL;
-  //  value->u.as_symbol = {.size = size, .bytes = bytes};
+  pn_value_t *value = next_value(ctx);
+  value->type = SYMBOL;
+  value->u.as_symbol = pn_symboln(bytes, size);
 }
 
 void pn_decode_start_array(void *ctx, size_t count, uint8_t code) {

Modified: qpid/proton/proton-c/src/types/symbol.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/symbol.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/symbol.c (original)
+++ qpid/proton/proton-c/src/types/symbol.c Mon Mar 12 19:56:39 2012
@@ -25,13 +25,22 @@
 #include <stdio.h>
 #include "value-internal.h"
 
-pn_symbol_t *pn_symbol(char *name)
+pn_symbol_t *pn_symbol(const char *name)
 {
-  size_t size = strlen(name);
-  pn_symbol_t *sym = malloc(sizeof(pn_symbol_t) + size + 1);
-  sym->size = size;
-  strcpy(sym->name, name);
-  return sym;
+  return pn_symboln(name, name ? strlen(name) : 0);
+}
+
+pn_symbol_t *pn_symboln(const char *name, size_t size)
+{
+  if (name) {
+    pn_symbol_t *sym = malloc(sizeof(pn_symbol_t) + size + 1);
+    sym->size = size;
+    strncpy(sym->name, name, size);
+    sym->name[size] = '\0';
+    return sym;
+  } else {
+    return NULL;
+  }
 }
 
 void pn_free_symbol(pn_symbol_t *s)
@@ -44,7 +53,7 @@ size_t pn_symbol_size(pn_symbol_t *s)
   return s->size;
 }
 
-char *pn_symbol_name(pn_symbol_t *s)
+const char *pn_symbol_name(pn_symbol_t *s)
 {
   return s->name;
 }

Modified: qpid/proton/proton-c/src/types/value.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/value.c?rev=1299813&r1=1299812&r2=1299813&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/value.c (original)
+++ qpid/proton/proton-c/src/types/value.c Mon Mar 12 19:56:39 2012
@@ -176,6 +176,7 @@ static enum TYPE code_to_type(char c)
   case 'f': return FLOAT;
   case 'd': return DOUBLE;
   case 'C': return CHAR;
+  case 's': return SYMBOL;
   case 'S': return STRING;
   case 'z': return BINARY;
   case 't': return LIST;
@@ -250,6 +251,10 @@ int pn_vscan(pn_value_t *value, const ch
       value->type = CHAR;
       value->u.as_char = va_arg(ap, wchar_t);
       break;
+    case 's':
+      value->type = SYMBOL;
+      value->u.as_symbol = pn_symbol(va_arg(ap, char *));
+      break;
     case 'S':
       value->type = STRING;
       wchar_t *wcs = va_arg(ap, wchar_t *);
@@ -458,7 +463,8 @@ int pn_format_value(char **pos, char *li
       if ((e = pn_fmt(pos, limit, "%lc", v.u.as_char))) return e;
       break;
     case SYMBOL:
-      if ((e = pn_fmt(pos, limit, "%s", v.u.as_symbol))) return e;
+      if ((e = pn_format_symbol(pos, limit, v.u.as_symbol))) return e;
+      break;
     case STRING:
       if ((e = pn_fmt(pos, limit, "%ls", v.u.as_string->wcs))) return e;
       break;
@@ -544,6 +550,8 @@ size_t pn_format_sizeof(pn_value_t v)
   case ULONG:
   case DOUBLE:
     return 64;
+  case SYMBOL:
+    return v.u.as_symbol->size;
   case STRING:
     return 4*v.u.as_string->size;
   case BINARY:
@@ -703,7 +711,9 @@ void pn_free_value(pn_value_t v)
   case ULONG:
   case DOUBLE:
   case REF:
+    break;
   case SYMBOL:
+    pn_free_symbol(v.u.as_symbol);
     break;
   case STRING:
     pn_free_string(v.u.as_string);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to