http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0bdba37d/c/tests/pn_test_proactor.cpp
----------------------------------------------------------------------
diff --git a/c/tests/pn_test_proactor.cpp b/c/tests/pn_test_proactor.cpp
new file mode 100644
index 0000000..0c06f72
--- /dev/null
+++ b/c/tests/pn_test_proactor.cpp
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "./pn_test_proactor.hpp"
+#include "./thread.h"
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/event.h>
+#include <proton/link.h>
+#include <proton/listener.h>
+#include <proton/message.h>
+#include <proton/netaddr.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+
+namespace pn_test {
+
+std::string listening_port(pn_listener_t *l) {
+  const pn_netaddr_t *na = pn_listener_addr(l);
+  char port[PN_MAX_ADDR];
+  pn_netaddr_host_port(na, NULL, 0, port, sizeof(port));
+  return port;
+}
+
+proactor::proactor(struct handler *h)
+    : auto_free<pn_proactor_t, pn_proactor_free>(pn_proactor()), handler(h) {}
+
+bool proactor::dispatch(pn_event_t *e) {
+  void *ctx = NULL;
+  if (pn_event_listener(e))
+    ctx = pn_listener_get_context(pn_event_listener(e));
+  else if (pn_event_connection(e))
+    ctx = pn_connection_get_context(pn_event_connection(e));
+  struct handler *h = ctx ? reinterpret_cast<struct handler *>(ctx) : handler;
+  bool ret = h ? h->dispatch(e) : false;
+  //  pn_test::handler doesn't know about listeners so save listener condition
+  //  here.
+  if (pn_event_listener(e)) {
+    pn_condition_copy(h->last_condition,
+                      pn_listener_condition(pn_event_listener(e)));
+  }
+  return ret;
+}
+
+// RAII for event batches
+class auto_batch {
+  pn_proactor_t *p_;
+  pn_event_batch_t *b_;
+
+public:
+  auto_batch(pn_proactor_t *p, pn_event_batch_t *b) : p_(p), b_(b) {}
+  ~auto_batch() {
+    if (b_) pn_proactor_done(p_, b_);
+  }
+  pn_event_t *next() { return b_ ? pn_event_batch_next(b_) : NULL; }
+  pn_event_batch_t *get() { return b_; }
+};
+
+pn_event_type_t proactor::run(pn_event_type_t stop) {
+  // This will hang in the underlying poll if test expectations are never met.
+  // Not ideal, but easier to debug than race conditions caused by buggy
+  // test-harness code that attempts to detect the problem and recover early.
+  // The larger test or CI harness will kill us after some time limit
+  while (true) {
+    auto_batch b(*this, pn_proactor_wait(*this));
+    if (b.get()) {
+      pn_event_t *e;
+      while ((e = b.next())) {
+        pn_event_type_t et = pn_event_type(e);
+        if (dispatch(e) || et == stop) return et;
+      }
+    }
+  }
+}
+
+pn_event_type_t proactor::flush(pn_event_type_t stop) {
+  auto_batch b(*this, pn_proactor_get(*this));
+  if (b.get()) {
+    pn_event_t *e;
+    while ((e = b.next())) {
+      pn_event_type_t et = pn_event_type(e);
+      if (dispatch(e) || et == stop) return et;
+    }
+  }
+  return PN_EVENT_NONE;
+}
+
+pn_event_type_t proactor::corun(proactor &other, pn_event_type_t stop) {
+  // We can't wait() on either proactor as it might be idle until
+  // something happens on the other, so spin between the two for a limited
+  // number of attempts that should be large enough if the test is going to
+  // past.
+  int spin_limit = 1000;
+  while (spin_limit > 0) {
+    pn_event_type_t et = flush(stop);
+    if (et) return et;
+    other.flush();
+    --spin_limit;
+  }
+  return PN_EVENT_NONE;
+}
+
+pn_event_type_t proactor::wait_next() {
+  // pn_proactor_wait() should never return an empty batch, so we shouldn't 
need
+  // a loop here. Due to bug https://issues.apache.org/jira/browse/PROTON-1964
+  // we need to re-wait if we get an empty batch.
+  //
+  // To reproduce PROTON-1964 remove the loop below and run
+  // TEST_CASE("proactor_proton_1586") from proactor_test.cpp
+  //
+  // You will pn_proactor_wait() return a non-NULL batch, but the
+  // first call to pn_event_batch_next() returns a NULL event.
+  //
+  while (true) {
+    auto_batch b(*this, pn_proactor_wait(*this));
+    pn_event_t *e = b.next();
+    if (e) {
+      dispatch(e);
+      return pn_event_type(e);
+    } // Try again on an empty batch.
+  }
+}
+
+pn_listener_t *proactor::listen(const std::string &addr,
+                                struct handler *handler) {
+  pn_listener_t *l = pn_listener();
+  pn_listener_set_context(l, handler);
+  pn_proactor_listen(*this, l, addr.c_str(), 4);
+  return l;
+}
+
+pn_connection_t *proactor::connect(const std::string &addr, struct handler *h,
+                                   pn_connection_t *c) {
+  if (!c) c = pn_connection();
+  if (h) pn_connection_set_context(c, h);
+  pn_proactor_connect2(*this, c, NULL, addr.c_str());
+  return c;
+}
+
+} // namespace pn_test

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0bdba37d/c/tests/pn_test_proactor.hpp
----------------------------------------------------------------------
diff --git a/c/tests/pn_test_proactor.hpp b/c/tests/pn_test_proactor.hpp
new file mode 100644
index 0000000..3703910
--- /dev/null
+++ b/c/tests/pn_test_proactor.hpp
@@ -0,0 +1,120 @@
+#ifndef TESTS_PN_TEST_PROACTOR_HPP
+#define TESTS_PN_TEST_PROACTOR_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/// @file
+///
+/// Wrapper for driving proactor tests.
+
+#include "./pn_test.hpp"
+#include <proton/proactor.h>
+
+namespace pn_test {
+
+// Get the listening port, l must be open.
+std::string listening_port(pn_listener_t *l);
+
+// Test proactor with an optional global handler.
+// For connection and listener events, if pn_*_get_context() is non-NULL
+// then it is cast to a handler and used instead of the global one.
+struct proactor : auto_free<pn_proactor_t, pn_proactor_free> {
+  struct handler *handler;
+
+  proactor(struct handler *h = 0);
+
+  // Listen on addr using optional listener handler lh
+  pn_listener_t *listen(const std::string &addr = ":0", struct handler *lh = 
0);
+
+  // Connect to addr use optional handler, and optionally providing the
+  // connection object.
+  pn_connection_t *connect(const std::string &addr, struct handler *h = 0,
+                           pn_connection_t *c = 0);
+
+  // Connect to listenr's address useing optional handler.
+  pn_connection_t *connect(pn_listener_t *l, struct handler *h = 0,
+                           pn_connection_t *c = 0) {
+    return connect(":" + pn_test::listening_port(l), h, c);
+  }
+
+  // Accept a connection, associate with optional connection handler.
+  pn_connection_t *accept(pn_listener_t *l, struct handler *h = 0);
+
+  // Wait for events and dispatch them until:
+  // * A handler returns true.
+  // * The `stop` event type is handled.
+  // Return the event-type of the last event handled or PN_EVENT_NONE
+  // if something went wrong.
+  pn_event_type_t run(pn_event_type_t stop = PN_EVENT_NONE);
+
+  // Dispatch immediately-available events until:
+  // * A handler returns true.
+  // * The `stop` event type is handled.
+  // * All available events are flushed.
+  //
+  // Return PN_EVENT_NONE if all events were flushed, the event-type of the 
last
+  // event handled otherwise.
+  pn_event_type_t flush(pn_event_type_t stop = PN_EVENT_NONE);
+
+  // Alternate flushing this proactor and `other` until
+  // * A handler on this proactor returns true.
+  // * The `stop` event type is handled by this proactor.
+  // Return the event-type of the last event handled or PN_EVENT_NONE
+  // if something went wrong.
+  pn_event_type_t corun(proactor &other, pn_event_type_t stop = PN_EVENT_NONE);
+
+  // Wait for and handle a single event, return it's type.
+  pn_event_type_t wait_next();
+
+private:
+  bool dispatch(pn_event_t *e);
+};
+
+// CHECK/REQUIRE macros to run a proactor up to an expected event and
+// include the last condition in the error message if the expected event is not
+// returned.
+
+#define CHECK_RUN(P, E)                                                        
\
+  CHECKED_IF((E) == (P).run(E)) {}                                             
\
+  else if ((P).handler) {                                                      
\
+    FAIL_CHECK(*(P).handler->last_condition);                                  
\
+  }
+
+#define REQUIRE_RUN(P, E)                                                      
\
+  CHECKED_IF((E) == (P).run(E)) {}                                             
\
+  else if ((P).handler) {                                                      
\
+    FAIL(*(P).handler->last_condition);                                        
\
+  }
+
+#define CHECK_CORUN(P, O, E)                                                   
\
+  CHECKED_IF((E) == (P).corun(O, E)) {}                                        
\
+  else if ((P).handler) {                                                      
\
+    FAIL_CHECK(*(P).handler->last_condition);                                  
\
+  }
+
+#define REQUIRE_CORUN(P, E)                                                    
\
+  CHECKED_IF((E) == (P).corun(O, E)) {}                                        
\
+  else if ((P).handler_) {                                                     
\
+    FAIL(*(P).handler_->last_condition);                                       
\
+  }
+
+} // namespace pn_test
+
+#endif // TESTS_PN_TEST_PROACTOR_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0bdba37d/c/tests/pn_test_test.cpp
----------------------------------------------------------------------
diff --git a/c/tests/pn_test_test.cpp b/c/tests/pn_test_test.cpp
new file mode 100644
index 0000000..1d51a43
--- /dev/null
+++ b/c/tests/pn_test_test.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Tests for the test framework
+
+#include "./pn_test.hpp"
+
+#include <proton/condition.h>
+#include <proton/error.h>
+#include <proton/event.h>
+
+using namespace pn_test;
+
+TEST_CASE("test_stringify") {
+  std::ostringstream o;
+  SECTION("event_type") {
+    o << PN_CONNECTION_INIT;
+    CHECK(o.str() == "PN_CONNECTION_INIT");
+    CHECK(Catch::toString(PN_CONNECTION_INIT) == "PN_CONNECTION_INIT");
+  }
+  SECTION("condition") {
+    pn_condition_t *c = pn_condition();
+    SECTION("empty") {
+      o << *c;
+      CHECK(o.str() == "pn_condition{}");
+      CHECK_THAT(*c, cond_empty());
+    }
+    SECTION("name-desc") {
+      pn_condition_set_name(c, "foo");
+      pn_condition_set_description(c, "bar");
+      o << *c;
+      CHECK(o.str() == "pn_condition{\"foo\", \"bar\"}");
+      CHECK_THAT(*c, cond_matches("foo", "bar"));
+      CHECK_THAT(*c, !cond_empty());
+    }
+    SECTION("desc-only") {
+      pn_condition_set_name(c, "foo");
+      o << *c;
+      CHECK(o.str() == "pn_condition{\"foo\", null}");
+      CHECK_THAT(*c, cond_matches("foo"));
+    }
+    pn_condition_free(c);
+  }
+  SECTION("error") {
+    pn_error_t *err = pn_error();
+    pn_error_format(err, PN_EOS, "foo");
+    o << *err;
+    pn_error_free(err);
+    CHECK(o.str() == "pn_error{PN_EOS, \"foo\"}");
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0bdba37d/c/tests/proactor.c
----------------------------------------------------------------------
diff --git a/c/tests/proactor.c b/c/tests/proactor.c
deleted file mode 100644
index 9078ffc..0000000
--- a/c/tests/proactor.c
+++ /dev/null
@@ -1,1107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "test_tools.h"
-#include "test_handler.h"
-#include "test_config.h"
-#include "../src/proactor/proactor-internal.h"
-
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/event.h>
-#include <proton/listener.h>
-#include <proton/session.h>
-#include <proton/netaddr.h>
-#include <proton/proactor.h>
-#include <proton/ssl.h>
-#include <proton/transport.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#define ARRAYLEN(A) (sizeof(A)/sizeof((A)[0]))
-
-/* Proactor and handler that take part in a test */
-typedef struct test_proactor_t {
-  test_handler_t handler;
-  pn_proactor_t *proactor;
-} test_proactor_t;
-
-static test_proactor_t test_proactor(test_t *t, test_handler_fn f) {
-  test_proactor_t tp;
-  test_handler_init(&tp.handler, t, f);
-  tp.proactor = pn_proactor();
-  TEST_ASSERT(tp.proactor);
-  return tp;
-}
-
-static void test_proactor_destroy(test_proactor_t *tp) {
-  pn_proactor_free(tp->proactor);
-}
-
-/* Set this to a pn_condition() to save condition data */
-pn_condition_t *last_condition = NULL;
-
-static void save_condition(pn_event_t *e) {
-  if (last_condition) {
-    pn_condition_t *cond = NULL;
-    if (pn_event_listener(e)) {
-      cond = pn_listener_condition(pn_event_listener(e));
-    } else {
-      cond = pn_event_condition(e);
-    }
-    if (cond) {
-      pn_condition_copy(last_condition, cond);
-    } else {
-      pn_condition_clear(last_condition);
-    }
-  }
-}
-
-/* Process events on a proactor array until a handler returns an event, or
- * all proactors return NULL
- */
-static pn_event_type_t test_proactors_get(test_proactor_t *tps, size_t n) {
-  if (last_condition) pn_condition_clear(last_condition);
-  while (true) {
-    bool busy = false;
-    for (test_proactor_t *tp = tps; tp < tps + n; ++tp) {
-      pn_event_batch_t *eb =  pn_proactor_get(tp->proactor);
-      if (eb) {
-        busy = true;
-        pn_event_type_t ret = PN_EVENT_NONE;
-        for (pn_event_t* e = pn_event_batch_next(eb); e; e = 
pn_event_batch_next(eb)) {
-          test_handler_log(&tp->handler, e);
-          save_condition(e);
-          ret = tp->handler.f(&tp->handler, e);
-          if (ret) break;
-        }
-        pn_proactor_done(tp->proactor, eb);
-        if (ret) return ret;
-      }
-    }
-    if (!busy) {
-      return PN_EVENT_NONE;
-    }
-  }
-}
-
-/* Run an array of proactors till a handler returns an event. */
-static pn_event_type_t test_proactors_run(test_proactor_t *tps, size_t n) {
-  pn_event_type_t e;
-  while ((e = test_proactors_get(tps, n)) == PN_EVENT_NONE)
-         ;
-  return e;
-}
-
-/* Run an array of proactors till a handler returns the desired event. */
-void test_proactors_run_until(test_proactor_t *tps, size_t n, pn_event_type_t 
want) {
-  while (test_proactors_get(tps, n) != want)
-         ;
-}
-
-/* Drain and discard outstanding events from an array of proactors */
-static void test_proactors_drain(test_proactor_t *tps, size_t n) {
-  while (test_proactors_get(tps, n))
-         ;
-}
-
-
-#define TEST_PROACTORS_GET(A) test_proactors_get((A), ARRAYLEN(A))
-#define TEST_PROACTORS_RUN(A) test_proactors_run((A), ARRAYLEN(A))
-#define TEST_PROACTORS_RUN_UNTIL(A, WANT) test_proactors_run_until((A), 
ARRAYLEN(A), WANT)
-#define TEST_PROACTORS_DRAIN(A) test_proactors_drain((A), ARRAYLEN(A))
-
-#define TEST_PROACTORS_DESTROY(A) do {           \
-    for (size_t i = 0; i < ARRAYLEN(A); ++i)       \
-      test_proactor_destroy((A)+i);             \
-  } while (0)
-
-
-#define MAX_STR 256
-struct addrinfo {
-  char host[MAX_STR];
-  char port[MAX_STR];
-  char connect[MAX_STR];
-  char host_port[MAX_STR];
-};
-
-struct addrinfo listener_info(pn_listener_t *l) {
-  struct addrinfo ai = {{0}};
-  const pn_netaddr_t *na = pn_listener_addr(l);
-  TEST_ASSERT(0 == pn_netaddr_host_port(na, ai.host, sizeof(ai.host), ai.port, 
sizeof(ai.port)));
-  for (na = pn_netaddr_next(na); na; na = pn_netaddr_next(na)) { /* Check that 
ports are consistent */
-    char port[MAX_STR];
-    TEST_ASSERT(0 == pn_netaddr_host_port(na, NULL, 0, port, sizeof(port)));
-    TEST_ASSERTF(0 == strcmp(port, ai.port), "%s !=  %s", port, ai.port);
-  }
-  (void)pn_proactor_addr(ai.connect, sizeof(ai.connect), "", ai.port); /* 
Address for connecting */
-  (void)pn_netaddr_str(na, ai.host_port, sizeof(ai.host_port)); /* host:port 
listening address */
-  return ai;
-}
-
-/* Return a pn_listener_t*, raise errors if not successful */
-pn_listener_t *test_listen(test_proactor_t *tp, const char *host) {
-  char addr[1024];
-  pn_listener_t *l = pn_listener();
-  (void)pn_proactor_addr(addr, sizeof(addr), host, "0");
-  pn_proactor_listen(tp->proactor, l, addr, 4);
-  TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1));
-  TEST_COND_EMPTY(tp->handler.t, last_condition);
-  return l;
-}
-
-
-/* Wait for the next single event, return its type */
-static pn_event_type_t wait_next(pn_proactor_t *proactor) {
-  pn_event_batch_t *events = pn_proactor_wait(proactor);
-  pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
-  pn_proactor_done(proactor, events);
-  return etype;
-}
-
-/* Test that interrupt and timeout events cause pn_proactor_wait() to return. 
*/
-static void test_interrupt_timeout(test_t *t) {
-  pn_proactor_t *p = pn_proactor();
-  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
-  pn_proactor_interrupt(p);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
-  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
-
-  /* Set an immediate timeout */
-  pn_proactor_set_timeout(p, 0);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p)); /* Inactive because 
timeout expired */
-
-  /* Set a (very short) timeout */
-  pn_proactor_set_timeout(p, 1);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
-
-  /* Set and cancel a timeout, make sure we don't get the timeout event */
-  pn_proactor_set_timeout(p, 10000000);
-  pn_proactor_cancel_timeout(p);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
-  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
-
-  pn_proactor_free(p);
-}
-
-/* Save the last connection accepted by the common_handler */
-pn_connection_t *last_accepted = NULL;
-
-/* Common handler for simple client/server interactions,  */
-static pn_event_type_t common_handler(test_handler_t *th, pn_event_t *e) {
-  pn_connection_t *c = pn_event_connection(e);
-  pn_listener_t *l = pn_event_listener(e);
-
-  switch (pn_event_type(e)) {
-
-    /* Stop on these events */
-   case PN_TRANSPORT_ERROR:
-   case PN_TRANSPORT_CLOSED:
-   case PN_PROACTOR_INACTIVE:
-   case PN_PROACTOR_TIMEOUT:
-   case PN_LISTENER_OPEN:
-    return pn_event_type(e);
-
-   case PN_LISTENER_ACCEPT:
-    last_accepted = pn_connection();
-    pn_listener_accept2(l, last_accepted, NULL);
-    pn_listener_close(l);       /* Only accept one connection */
-    return PN_EVENT_NONE;
-
-   case PN_CONNECTION_REMOTE_OPEN:
-    pn_connection_open(c);      /* Return the open (no-op if already open) */
-    return PN_EVENT_NONE;
-
-   case PN_SESSION_REMOTE_OPEN:
-    pn_session_open(pn_event_session(e));
-    return PN_EVENT_NONE;
-
-   case PN_LINK_REMOTE_OPEN:
-    pn_link_open(pn_event_link(e));
-    return PN_EVENT_NONE;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    pn_connection_close(c);     /* Return the close */
-    return PN_EVENT_NONE;
-
-    /* Ignore these events */
-   case PN_CONNECTION_BOUND:
-   case PN_CONNECTION_INIT:
-   case PN_CONNECTION_LOCAL_CLOSE:
-   case PN_CONNECTION_LOCAL_OPEN:
-   case PN_LINK_INIT:
-   case PN_LINK_LOCAL_OPEN:
-   case PN_LISTENER_CLOSE:
-   case PN_SESSION_INIT:
-   case PN_SESSION_LOCAL_OPEN:
-   case PN_TRANSPORT:
-   case PN_TRANSPORT_HEAD_CLOSED:
-   case PN_TRANSPORT_TAIL_CLOSED:
-    return PN_EVENT_NONE;
-
-   default:
-    TEST_ERRORF(th->t, "unexpected event %s", 
pn_event_type_name(pn_event_type(e)));
-    return PN_EVENT_NONE;                   /* Fail the test but keep going */
-  }
-}
-
-/* Like common_handler but does not auto-close the listener after one accept,
-   and returns on LISTENER_CLOSE
-*/
-static pn_event_type_t listen_handler(test_handler_t *th, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-   case PN_LISTENER_ACCEPT:
-    /* No automatic listener close/free for tests that accept multiple 
connections */
-    last_accepted = pn_connection();
-    pn_listener_accept2(pn_event_listener(e), last_accepted, NULL);
-    /* No automatic close */
-    return PN_EVENT_NONE;
-
-   case PN_LISTENER_CLOSE:
-    return PN_LISTENER_CLOSE;
-
-   default:
-    return common_handler(th, e);
-  }
-}
-
-/* close a connection when it is remote open */
-static pn_event_type_t open_close_handler(test_handler_t *th, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-   case PN_CONNECTION_REMOTE_OPEN:
-    pn_connection_close(pn_event_connection(e));
-    return PN_EVENT_NONE;          /* common_handler will finish on 
TRANSPORT_CLOSED */
-   default:
-    return common_handler(th, e);
-  }
-}
-
-/* Test simple client/server connection with 2 proactors */
-static void test_client_server(test_t *t) {
-  test_proactor_t tps[] ={ test_proactor(t, open_close_handler), 
test_proactor(t, common_handler) };
-  pn_listener_t *l = test_listen(&tps[1], "");
-  /* Connect and wait for close at both ends */
-  pn_proactor_connect2(tps[0].proactor, NULL, NULL, listener_info(l).connect);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);  
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Return on connection open, close and return on wake */
-static pn_event_type_t open_wake_handler(test_handler_t *th, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-   case PN_CONNECTION_REMOTE_OPEN:
-    return pn_event_type(e);
-   case PN_CONNECTION_WAKE:
-    pn_connection_close(pn_event_connection(e));
-    return pn_event_type(e);
-   default:
-    return common_handler(th, e);
-  }
-}
-
-/* Test waking up a connection that is idle */
-static void test_connection_wake(test_t *t) {
-  test_proactor_t tps[] =  { test_proactor(t, open_wake_handler), 
test_proactor(t,  listen_handler) };
-  pn_proactor_t *client = tps[0].proactor;
-  pn_listener_t *l = test_listen(&tps[1], "");
-
-  pn_connection_t *c = pn_connection();
-  pn_incref(c);                 /* Keep a reference for wake() after free */
-  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
-  pn_connection_wake(c);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Both 
ends */
-  /* The pn_connection_t is still valid so wake is legal but a no-op */
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_EVENT_NONE, TEST_PROACTORS_GET(tps)); /* No more wake 
*/
-
-  /* Verify we don't get a wake after close even if they happen together */
-  pn_connection_t *c2 = pn_connection();
-  pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_connection_wake(c2);
-  pn_proactor_disconnect(client, NULL);
-  pn_connection_wake(c2);
-
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, test_proactors_run(&tps[0], 1));
-  TEST_CONDITION(t, "amqp:connection:framing-error", "connection aborted", 
last_condition);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, test_proactors_run(&tps[0], 1));
-  TEST_ETYPE_EQUAL(t, PN_EVENT_NONE, test_proactors_get(&tps[0], 1)); /* No 
late wake */
-
-  TEST_PROACTORS_DESTROY(tps);
-  /* The pn_connection_t is still valid so wake is legal but a no-op */
-  pn_connection_wake(c);
-  pn_decref(c);
-}
-
-/* Close the transport to abort a connection, i.e. close the socket without an 
AMQP close */
-static pn_event_type_t listen_abort_handler(test_handler_t *th, pn_event_t *e) 
{
-  switch (pn_event_type(e)) {
-   case PN_CONNECTION_REMOTE_OPEN:
-    /* Close the transport - abruptly closes the socket */
-    pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
-    pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
-    return PN_EVENT_NONE;
-
-   default:
-    /* Don't auto-close the listener to keep the event sequences simple */
-    return listen_handler(th, e);
-  }
-}
-
-/* Verify that pn_transport_close_head/tail aborts a connection without an 
AMQP protocol close */
-static void test_abort(test_t *t) {
-  test_proactor_t tps[] = { test_proactor(t, open_close_handler), 
test_proactor(t, listen_abort_handler) };
-  pn_proactor_t *client = tps[0].proactor;
-  pn_listener_t *l = test_listen(&tps[1], "");
-  pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
-
-  /* server transport closes */
-  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps))) {
-    TEST_CONDITION(t, "amqp:connection:framing-error", "abort", 
last_condition);
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  }
-  /* client transport closes */
-  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps))) {
-    TEST_CONDITION(t, "amqp:connection:framing-error", "abort", 
last_condition);
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  }
-
-  pn_listener_close(l);
-
-  while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
-  while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
-
-  /* Verify expected event sequences, no unexpected events */
-  TEST_HANDLER_EXPECT(
-    &tps[0].handler,
-    PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND,
-    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, 
PN_TRANSPORT_CLOSED,
-    PN_PROACTOR_INACTIVE,
-    0);
-
-  TEST_HANDLER_EXPECT(
-    &tps[1].handler,
-    PN_LISTENER_OPEN, PN_LISTENER_ACCEPT,
-    PN_CONNECTION_INIT, PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN,
-    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, 
PN_TRANSPORT_CLOSED,
-    PN_LISTENER_CLOSE,
-    PN_PROACTOR_INACTIVE,
-    0);
-
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Refuse a connection: abort before the AMQP open sequence begins. */
-static pn_event_type_t listen_refuse_handler(test_handler_t *th, pn_event_t 
*e) {
-  switch (pn_event_type(e)) {
-
-   case PN_CONNECTION_BOUND:
-    /* Close the transport - abruptly closes the socket */
-    pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
-    pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
-    return PN_EVENT_NONE;
-
-   default:
-    /* Don't auto-close the listener to keep the event sequences simple */
-    return listen_handler(th, e);
-  }
-}
-
-/* Verify that pn_transport_close_head/tail aborts a connection without an 
AMQP protocol close */
-static void test_refuse(test_t *t) {
-  test_proactor_t tps[] = { test_proactor(t, open_close_handler), 
test_proactor(t, listen_refuse_handler) };
-  pn_proactor_t *client = tps[0].proactor;
-  pn_listener_t *l = test_listen(&tps[1], "");
-  pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
-
-  /* client transport closes */
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps)); /* client 
*/
-  TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
-
-  pn_listener_close(l);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_PROACTOR_INACTIVE);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_PROACTOR_INACTIVE);
-
-  /* Verify expected event sequences, no unexpected events */
-  TEST_HANDLER_EXPECT(
-    &tps[0].handler,
-    PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND,
-    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, 
PN_TRANSPORT_CLOSED,
-    PN_PROACTOR_INACTIVE,
-    0);
-
-  TEST_HANDLER_EXPECT(
-    &tps[1].handler,
-    PN_LISTENER_OPEN, PN_LISTENER_ACCEPT,
-    PN_CONNECTION_INIT, PN_CONNECTION_BOUND,
-    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, 
PN_TRANSPORT_CLOSED,
-    PN_LISTENER_CLOSE,
-    PN_PROACTOR_INACTIVE,
-    0);
-
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Test that INACTIVE event is generated when last connections/listeners 
closes. */
-static void test_inactive(test_t *t) {
-  test_proactor_t tps[] =  { test_proactor(t, open_wake_handler), 
test_proactor(t, listen_handler) };
-  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
-
-  /* Listen, connect, disconnect */
-  pn_listener_t *l = test_listen(&tps[1], "");
-  pn_connection_t *c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_connection_wake(c);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
-  /* Expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Immediate timer generates INACTIVE on client (no connections) */
-  pn_proactor_set_timeout(client, 0);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Connect, set-timer, disconnect */
-  pn_proactor_set_timeout(client, 1000000);
-  c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_connection_wake(c);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
-  /* Expect TRANSPORT_CLOSED from client and server */
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  /* No INACTIVE till timer is cancelled */
-  TEST_CHECK(t, pn_proactor_get(server) == NULL);
-  pn_proactor_cancel_timeout(client);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Server won't be INACTIVE until listener is closed */
-  TEST_CHECK(t, pn_proactor_get(server) == NULL);
-  pn_listener_close(l);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Tests for error handling */
-static void test_errors(test_t *t) {
-  test_proactor_t tps[] =  { test_proactor(t, open_wake_handler), 
test_proactor(t, listen_handler) };
-  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
-
-  /* Invalid connect/listen service name */
-  pn_connection_t *c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, "127.0.0.1:xxx");
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps));
-  TEST_COND_DESC(t, "xxx", last_condition);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  pn_proactor_listen(server, pn_listener(), "127.0.0.1:xxx", 1);
-  TEST_PROACTORS_RUN(tps);
-  TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, 
no OPEN */
-  TEST_COND_DESC(t, "xxx", last_condition);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Invalid connect/listen host name */
-  c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, "nosuch.example.com:");
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps));
-  TEST_COND_DESC(t, "nosuch", last_condition);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  test_handler_clear(&tps[1].handler, 0);
-  pn_proactor_listen(server, pn_listener(), "nosuch.example.com:", 1);
-  TEST_PROACTORS_RUN(tps);
-  TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, 
no OPEN */
-  TEST_COND_DESC(t, "nosuch", last_condition);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Listen on a port already in use */
-  pn_listener_t *l = pn_listener();
-  pn_proactor_listen(server, l, ":0", 1);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, TEST_PROACTORS_RUN(tps));
-  test_handler_clear(&tps[1].handler, 0);
-  struct addrinfo laddr = listener_info(l);
-  pn_proactor_listen(server, pn_listener(), laddr.connect, 1); /* Busy */
-  TEST_PROACTORS_RUN(tps);
-  TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, 
no OPEN */
-  TEST_COND_NAME(t, "proton:io", last_condition);
-  pn_listener_close(l);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Connect with no listener */
-  c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, laddr.connect);
-  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps))) {
-    TEST_COND_DESC(t, "refused", last_condition);
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-    TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-  }
-
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Closing the connection during PN_TRANSPORT_ERROR should be a no-op
- * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
- */
-static pn_event_type_t transport_close_connection_handler(test_handler_t *th, 
pn_event_t *e) {
-  switch (pn_event_type(e)) {
-   case PN_TRANSPORT_ERROR:
-    pn_connection_close(pn_event_connection(e));
-    break;
-   default:
-    return open_wake_handler(th, e);
-  }
-  return PN_EVENT_NONE;
-}
-
-/* Closing the connection during PN_TRANSPORT_ERROR due to connection failure 
should be a no-op
- * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
- */
-static void test_proton_1586(test_t *t) {
-  test_proactor_t tps[] =  { test_proactor(t, 
transport_close_connection_handler) };
-  pn_proactor_connect2(tps[0].proactor, NULL, NULL, ":yyy");
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
-  TEST_COND_DESC(t, ":yyy", last_condition);
-  test_handler_clear(&tps[0].handler, 0); /* Clear events */
-  /* There should be no events generated after PN_TRANSPORT_CLOSED */
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-  TEST_HANDLER_EXPECT(&tps[0].handler,PN_PROACTOR_INACTIVE, 0);
-
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Test that we can control listen/select on ipv6/v4 and listen on both by 
default */
-static void test_ipv4_ipv6(test_t *t) {
-  test_proactor_t tps[] ={ test_proactor(t, open_close_handler), 
test_proactor(t, listen_handler) };
-  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
-
-  /* Listen on all interfaces for IPv4 only. */
-  pn_listener_t *l4 = test_listen(&tps[1], "0.0.0.0");
-  TEST_PROACTORS_DRAIN(tps);
-
-  /* Empty address listens on both IPv4 and IPv6 on all interfaces */
-  pn_listener_t *l = test_listen(&tps[1], "");
-  TEST_PROACTORS_DRAIN(tps);
-
-#define EXPECT_CONNECT(LISTENER, HOST) do {                             \
-    char addr[1024];                                                    \
-    pn_proactor_addr(addr, sizeof(addr), HOST, listener_info(LISTENER).port); \
-    pn_proactor_connect2(client, NULL, NULL, addr);                     \
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));  \
-    TEST_COND_EMPTY(t, last_condition);                                 \
-    TEST_PROACTORS_DRAIN(tps);                                          \
-  } while(0)
-
-  EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */
-  EXPECT_CONNECT(l4, "");          /* local->v4*/
-
-  EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */
-  EXPECT_CONNECT(l, "");          /* local->all */
-
-  /* Listen on ipv6 loopback, if it fails skip ipv6 tests.
-
-     NOTE: Don't use the unspecified address "::" here - ipv6-disabled 
platforms
-     may allow listening on "::" without complaining. However they won't have a
-     local ipv6 loopback configured, so "::1" will force an error.
-  */
-  TEST_PROACTORS_DRAIN(tps);
-  pn_listener_t *l6 = pn_listener();
-  pn_proactor_listen(server, l6, "::1:0", 4);
-  pn_event_type_t e = TEST_PROACTORS_RUN(tps);
-  if (e == PN_LISTENER_OPEN && !pn_condition_is_set(last_condition)) {
-    TEST_PROACTORS_DRAIN(tps);
-
-    EXPECT_CONNECT(l6, "::1"); /* v6->v6 */
-    EXPECT_CONNECT(l6, "");    /* local->v6 */
-    EXPECT_CONNECT(l, "::1");  /* v6->all */
-
-    pn_listener_close(l6);
-  } else  {
-    const char *d = pn_condition_get_description(last_condition);
-    TEST_LOGF(t, "skip IPv6 tests: %s %s", pn_event_type_name(e), d ? d : "no 
condition");
-  }
-
-  pn_listener_close(l);
-  pn_listener_close(l4);
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Make sure we clean up released connections and open sockets correctly */
-static void test_release_free(test_t *t) {
-  test_proactor_t tps[] = { test_proactor(t, open_wake_handler), 
test_proactor(t, listen_handler) };
-  pn_proactor_t *client = tps[0].proactor;
-  pn_listener_t *l = test_listen(&tps[1], "");
-
-  /* leave one connection to the proactor  */
-  pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-
-  /* release c1 and free immediately */
-  pn_connection_t *c1 = pn_connection();
-  pn_proactor_connect2(client, c1, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_proactor_release_connection(c1); /* We free but socket should still be 
cleaned up */
-  pn_connection_free(c1);
-  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-
-  /* release c2 and but don't free till after proactor free */
-  pn_connection_t *c2 = pn_connection();
-  pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_proactor_release_connection(c2);
-  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-
-  TEST_PROACTORS_DESTROY(tps);
-  pn_connection_free(c2);
-
-  /* Check freeing a listener or connection that was never given to a proactor 
*/
-  pn_listener_free(pn_listener());
-  pn_connection_free(pn_connection());
-}
-
-#define SSL_FILE(NAME) CMAKE_CURRENT_SOURCE_DIR "/ssl-certs/" NAME
-#define SSL_PW "tserverpw"
-/* Windows vs. OpenSSL certificates */
-#if defined(_WIN32)
-#  define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
-#  define SET_CREDENTIALS(DOMAIN, NAME)                                 \
-  pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
-#else
-#  define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
-#  define SET_CREDENTIALS(DOMAIN, NAME)                                 \
-  pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME 
"-private-key.pem"), SSL_PW)
-#endif
-
-static pn_event_type_t ssl_handler(test_handler_t *h, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-
-   case PN_CONNECTION_BOUND:
-    TEST_CHECK(h->t, 0 == pn_ssl_init(pn_ssl(pn_event_transport(e)), 
h->ssl_domain, NULL));
-    return PN_EVENT_NONE;
-
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_ssl_t *ssl = pn_ssl(pn_event_transport(e));
-     TEST_CHECK(h->t, ssl);
-     char protocol[256];
-     TEST_CHECK(h->t, pn_ssl_get_protocol_name(ssl, protocol, 
sizeof(protocol)));
-     TEST_STR_IN(h->t, "TLS", protocol);
-     return PN_CONNECTION_REMOTE_OPEN;
-   }
-   default:
-    return PN_EVENT_NONE;
-  }
-}
-
-static pn_event_type_t ssl_server_handler(test_handler_t *h, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-   case PN_CONNECTION_BOUND:
-    return ssl_handler(h, e);
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_event_type_t et = ssl_handler(h, e);
-     pn_connection_open(pn_event_connection(e));
-     return et;
-   }
-   default:
-    return listen_handler(h, e);
-  }
-}
-
-static pn_event_type_t ssl_client_handler(test_handler_t *h, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-   case PN_CONNECTION_BOUND:
-    return ssl_handler(h, e);
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_event_type_t et = ssl_handler(h, e);
-     pn_connection_close(pn_event_connection(e));
-     return et;
-   }
-    break;
-   default:
-    return common_handler(h, e);
-  }
-}
-
-/* Test various SSL connections between proactors*/
-static void test_ssl(test_t *t) {
-  if (!pn_ssl_present()) {
-    TEST_LOGF(t, "Skip SSL test, no support");
-    return;
-  }
-
-  test_proactor_t tps[] ={ test_proactor(t, ssl_client_handler), 
test_proactor(t, ssl_server_handler) };
-  test_proactor_t *client = &tps[0], *server = &tps[1];
-  pn_ssl_domain_t *cd = client->handler.ssl_domain = 
pn_ssl_domain(PN_SSL_MODE_CLIENT);
-  pn_ssl_domain_t *sd =  server->handler.ssl_domain = 
pn_ssl_domain(PN_SSL_MODE_SERVER);
-  TEST_CHECK(t, 0 == SET_CREDENTIALS(sd, "tserver"));
-  pn_listener_t *l = test_listen(server, "");
-
-  /* Basic SSL connection */
-  pn_proactor_connect2(client->proactor, NULL, NULL, listener_info(l).connect);
-  /* Open ok at both ends */
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  TEST_COND_EMPTY(t, last_condition);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  TEST_COND_EMPTY(t, last_condition);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-
-  /* Verify peer with good hostname */
-  TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_trusted_ca_db(cd, 
CERTIFICATE("tserver")));
-  TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_peer_authentication(cd, 
PN_SSL_VERIFY_PEER_NAME, NULL));
-  pn_connection_t *c = pn_connection();
-  pn_connection_set_hostname(c, "test_server");
-  pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  TEST_COND_EMPTY(t, last_condition);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  TEST_COND_EMPTY(t, last_condition);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
-
-  /* Verify peer with bad hostname */
-  c = pn_connection();
-  pn_connection_set_hostname(c, "wrongname");
-  pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps));
-  TEST_CONDITION(t, "amqp:connection:framing-error", "SSL", last_condition);
-  TEST_PROACTORS_DRAIN(tps);
-
-  pn_ssl_domain_free(cd);
-  pn_ssl_domain_free(sd);
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-static void test_proactor_addr(test_t *t) {
-  /* Test the address formatter */
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), "foo", "bar");
-  TEST_STR_EQUAL(t, "foo:bar", addr);
-  pn_proactor_addr(addr, sizeof(addr), "foo", "");
-  TEST_STR_EQUAL(t, "foo:", addr);
-  pn_proactor_addr(addr, sizeof(addr), "foo", NULL);
-  TEST_STR_EQUAL(t, "foo:", addr);
-  pn_proactor_addr(addr, sizeof(addr), "", "bar");
-  TEST_STR_EQUAL(t, ":bar", addr);
-  pn_proactor_addr(addr, sizeof(addr), NULL, "bar");
-  TEST_STR_EQUAL(t, ":bar", addr);
-  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "5");
-  TEST_STR_EQUAL(t, "1:2:3:4:5", addr);
-  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "");
-  TEST_STR_EQUAL(t, "1:2:3:4:", addr);
-  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", NULL);
-  TEST_STR_EQUAL(t, "1:2:3:4:", addr);
-}
-
-static void test_parse_addr(test_t *t) {
-  char buf[1024];
-  const char *host, *port;
-
-  TEST_CHECK(t, 0 == pni_parse_addr("foo:bar", buf, sizeof(buf), &host, 
&port));
-  TEST_STR_EQUAL(t, "foo", host);
-  TEST_STR_EQUAL(t, "bar", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr("foo:", buf, sizeof(buf), &host, &port));
-  TEST_STR_EQUAL(t, "foo", host);
-  TEST_STR_EQUAL(t, "5672", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr(":bar", buf, sizeof(buf), &host, &port));
-  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
-  TEST_STR_EQUAL(t, "bar", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr(":", buf, sizeof(buf), &host, &port));
-  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
-  TEST_STR_EQUAL(t, "5672", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr(":amqps", buf, sizeof(buf), &host, &port));
-  TEST_STR_EQUAL(t, "5671", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr(":amqp", buf, sizeof(buf), &host, &port));
-  TEST_STR_EQUAL(t, "5672", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr("::1:2:3", buf, sizeof(buf), &host, 
&port));
-  TEST_STR_EQUAL(t, "::1:2", host);
-  TEST_STR_EQUAL(t, "3", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr(":::", buf, sizeof(buf), &host, &port));
-  TEST_STR_EQUAL(t, "::", host);
-  TEST_STR_EQUAL(t, "5672", port);
-
-  TEST_CHECK(t, 0 == pni_parse_addr("", buf, sizeof(buf), &host, &port));
-  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
-  TEST_STR_EQUAL(t, "5672", port);
-}
-
-/* Test pn_proactor_addr functions */
-
-static void test_netaddr(test_t *t) {
-  test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), 
test_proactor(t, listen_handler) };
-  pn_proactor_t *client = tps[0].proactor;
-  /* Use IPv4 to get consistent results all platforms */
-  pn_listener_t *l = test_listen(&tps[1], "127.0.0.1");
-  pn_connection_t *c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
-  if (!TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, 
TEST_PROACTORS_RUN(tps))) {
-    TEST_COND_EMPTY(t, last_condition); /* Show the last condition */
-    return;                     /* don't continue if connection is closed */
-  }
-
-  /* client remote, client local, server remote and server local address 
strings */
-  char cr[1024], cl[1024], sr[1024], sl[1024];
-
-  pn_transport_t *ct = pn_connection_transport(c);
-  const pn_netaddr_t *na = pn_transport_remote_addr(ct);
-  pn_netaddr_str(na, cr, sizeof(cr));
-  TEST_STR_IN(t, listener_info(l).port, cr); /* remote address has listening 
port */
-
-  pn_connection_t *s = last_accepted; /* server side of the connection */
-
-  pn_transport_t *st = pn_connection_transport(s);
-  if (!TEST_CHECK(t, st)) return;
-  pn_netaddr_str(pn_transport_local_addr(st), sl, sizeof(sl));
-  TEST_STR_EQUAL(t, cr, sl);  /* client remote == server local */
-
-  pn_netaddr_str(pn_transport_local_addr(ct), cl, sizeof(cl));
-  pn_netaddr_str(pn_transport_remote_addr(st), sr, sizeof(sr));
-  TEST_STR_EQUAL(t, cl, sr);    /* client local == server remote */
-
-  char host[MAX_STR] = "";
-  char serv[MAX_STR] = "";
-  int err = pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv));
-  TEST_CHECK(t, 0 == err);
-  TEST_STR_EQUAL(t, "127.0.0.1", host);
-  TEST_STR_EQUAL(t, listener_info(l).port, serv);
-
-  /* Make sure you can use NULL, 0 to get length of address string without a 
crash */
-  size_t len = pn_netaddr_str(pn_transport_local_addr(ct), NULL, 0);
-  TEST_CHECKF(t, strlen(cl) == len, "%d != %d", strlen(cl), len);
-
-  TEST_PROACTORS_DRAIN(tps);
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-/* Test pn_proactor_disconnect */
-static void test_disconnect(test_t *t) {
-  test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), 
test_proactor(t, listen_handler) };
-  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
-
-  /* Start two listeners */
-  pn_listener_t *l = test_listen(&tps[1], "");
-  pn_listener_t *l2 = test_listen(&tps[1], "");
-
-  /* Only wait for one connection to remote-open before disconnect */
-  pn_connection_t *c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_connection_t *c2 = pn_connection();
-  pn_proactor_connect2(client, c2, NULL, listener_info(l2).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  TEST_PROACTORS_DRAIN(tps);
-
-  /* Disconnect the client proactor */
-  pn_condition_t *cond = pn_condition();
-  pn_condition_set_name(cond, "test-name");
-  pn_condition_set_description(cond, "test-description");
-  pn_proactor_disconnect(client, cond);
-  /* Verify expected client side first */
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, test_proactors_run(&tps[0], 1));
-  TEST_CONDITION(t, "test-name", "test-description", last_condition);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, test_proactors_run(&tps[0], 1));
-  TEST_CONDITION(t, "test-name", "test-description", last_condition);
-  test_proactors_run_until(&tps[0], 1, PN_PROACTOR_INACTIVE);
-
-  /* Now check server sees the disconnects */
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps));
-  TEST_CONDITION(t, "amqp:connection:framing-error", "connection aborted", 
last_condition);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_ERROR, TEST_PROACTORS_RUN(tps));
-  TEST_CONDITION(t, "amqp:connection:framing-error", "connection aborted", 
last_condition);
-  TEST_PROACTORS_DRAIN(tps);
-
-  /* Now disconnect the server end (the listeners) */
-  pn_proactor_disconnect(server, cond);
-  pn_condition_free(cond);
-
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
-
-  /* Make sure the proactors are still functional */
-  pn_listener_t *l3 = test_listen(&tps[1], "");
-  pn_proactor_connect2(client, NULL, NULL, listener_info(l3).connect);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
-  pn_proactor_disconnect(client, NULL);
-
-  TEST_PROACTORS_DRAIN(tps);
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-struct message_stream_context {
-  pn_link_t *sender;
-  pn_delivery_t *dlv;
-  pn_rwbytes_t send_buf, recv_buf;
-  ssize_t size, sent, received;
-  bool complete;
-};
-
-#define FRAME 512                   /* Smallest legal frame */
-#define CHUNK (FRAME + FRAME/2)     /* Chunk overflows frame */
-#define BODY (CHUNK*3 + CHUNK/2)    /* Body doesn't fit into chunks */
-
-static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t 
*e) {
-  struct message_stream_context *ctx = (struct 
message_stream_context*)th->context;
-  switch (pn_event_type(e)) {
-   case PN_CONNECTION_BOUND:
-    pn_transport_set_max_frame(pn_event_transport(e), FRAME);
-    return PN_EVENT_NONE;
-
-   case PN_SESSION_INIT:
-    pn_session_set_incoming_capacity(pn_event_session(e), FRAME); /* Single 
frame incoming */
-    pn_session_set_outgoing_window(pn_event_session(e), 1);       /* Single 
frame outgoing */
-    return PN_EVENT_NONE;
-
-   case PN_LINK_REMOTE_OPEN:
-    common_handler(th, e);
-    if (pn_link_is_receiver(pn_event_link(e))) {
-      pn_link_flow(pn_event_link(e), 1);
-    } else {
-      ctx->sender = pn_event_link(e);
-    }
-    return PN_EVENT_NONE;
-
-   case PN_LINK_FLOW:           /* Start a delivery */
-    if (pn_link_is_sender(pn_event_link(e)) && !ctx->dlv) {
-      ctx->dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1));
-    }
-    return PN_LINK_FLOW;
-
-   case PN_CONNECTION_WAKE: {     /* Send a chunk */
-     ssize_t remains = ctx->size - ctx->sent;
-     ssize_t n = (CHUNK < remains) ? CHUNK : remains;
-     TEST_CHECK(th->t, n == pn_link_send(ctx->sender, ctx->send_buf.start + 
ctx->sent, n));
-     ctx->sent += n;
-     if (ctx->sent == ctx->size) {
-       TEST_CHECK(th->t, pn_link_advance(ctx->sender));
-     }
-     return PN_CONNECTION_WAKE;
-   }
-
-   case PN_DELIVERY: {          /* Receive a delivery - smaller than a chunk? 
*/
-     pn_delivery_t *dlv = pn_event_delivery(e);
-     if (pn_delivery_readable(dlv)) {
-       ssize_t n = pn_delivery_pending(dlv);
-       rwbytes_ensure(&ctx->recv_buf, ctx->received + n);
-       TEST_ASSERT(n == pn_link_recv(pn_event_link(e), ctx->recv_buf.start + 
ctx->received, n));
-       ctx->received += n;
-     }
-     ctx->complete = !pn_delivery_partial(dlv);
-     return PN_DELIVERY;
-   }
-
-   default:
-    return common_handler(th, e);
-  }
-}
-
-/* Test sending/receiving a message in chunks */
-static void test_message_stream(test_t *t) {
-  test_proactor_t tps[] ={
-    test_proactor(t, message_stream_handler),
-    test_proactor(t, message_stream_handler)
-  };
-  pn_proactor_t *client = tps[0].proactor;
-  pn_listener_t *l = test_listen(&tps[1], "");
-  struct message_stream_context ctx = { 0 };
-  tps[0].handler.context = &ctx;
-  tps[1].handler.context = &ctx;
-
-  /* Encode a large (not very) message to send in chunks */
-  char *body = (char*)malloc(BODY);
-  memset(body, 'x', BODY);
-  pn_message_t *m = pn_message();
-  pn_data_put_binary(pn_message_body(m), pn_bytes(BODY, body));
-  free(body);
-  ctx.size = message_encode(m, &ctx.send_buf);
-  pn_message_free(m);
-
-  pn_connection_t *c = pn_connection();
-  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
-  pn_session_t *ssn = pn_session(c);
-  pn_session_open(ssn);
-  pn_link_t *snd = pn_sender(ssn, "x");
-  pn_link_open(snd);
-  TEST_PROACTORS_RUN_UNTIL(tps, PN_LINK_FLOW);
-
-  /* Send and receive the message in chunks */
-  do {
-    pn_connection_wake(c);      /* Initiate send/receive of one chunk */
-    do {                        /* May be multiple receives for one send */
-      TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
-    } while (ctx.received < ctx.sent);
-  } while (!ctx.complete);
-  TEST_CHECK(t, ctx.received == ctx.size);
-  TEST_CHECK(t, ctx.sent == ctx.size);
-  TEST_CHECK(t, !memcmp(ctx.send_buf.start, ctx.recv_buf.start, ctx.size));
-
-  free(ctx.send_buf.start);
-  free(ctx.recv_buf.start);
-  TEST_PROACTORS_DESTROY(tps);
-}
-
-int main(int argc, char **argv) {
-  int failed = 0;
-  last_condition = pn_condition();
-  RUN_ARGV_TEST(failed, t, test_inactive(&t));
-  RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
-  RUN_ARGV_TEST(failed, t, test_errors(&t));
-  RUN_ARGV_TEST(failed, t, test_proton_1586(&t));
-  RUN_ARGV_TEST(failed, t, test_client_server(&t));
-  RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
-  RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
-  RUN_ARGV_TEST(failed, t, test_release_free(&t));
-#if !defined(_WIN32)
-  RUN_ARGV_TEST(failed, t, test_ssl(&t));
-#endif
-  RUN_ARGV_TEST(failed, t, test_proactor_addr(&t));
-  RUN_ARGV_TEST(failed, t, test_parse_addr(&t));
-  RUN_ARGV_TEST(failed, t, test_netaddr(&t));
-  RUN_ARGV_TEST(failed, t, test_disconnect(&t));
-  RUN_ARGV_TEST(failed, t, test_abort(&t));
-  RUN_ARGV_TEST(failed, t, test_refuse(&t));
-  RUN_ARGV_TEST(failed, t, test_message_stream(&t));
-  pn_condition_free(last_condition);
-  return failed;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0bdba37d/c/tests/proactor_test.cpp
----------------------------------------------------------------------
diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp
new file mode 100644
index 0000000..971c231
--- /dev/null
+++ b/c/tests/proactor_test.cpp
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "../src/proactor/proactor-internal.h"
+#include "./pn_test_proactor.hpp"
+#include "./test_config.h"
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/event.h>
+#include <proton/link.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+
+#include <string.h>
+
+#include <iostream>
+
+using namespace pn_test;
+using Catch::Matchers::Contains;
+using Catch::Matchers::Equals;
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. 
*/
+TEST_CASE("proactor_interrupt_timeout") {
+  proactor p;
+
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+  pn_proactor_interrupt(p);
+  CHECK(PN_PROACTOR_INTERRUPT == p.wait_next());
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+
+  /* Set an immediate timeout */
+  pn_proactor_set_timeout(p, 0);
+  CHECK(PN_PROACTOR_TIMEOUT == p.wait_next());
+  CHECK(PN_PROACTOR_INACTIVE == p.wait_next());
+
+  /* Set a (very short) timeout */
+  pn_proactor_set_timeout(p, 1);
+  CHECK(PN_PROACTOR_TIMEOUT == p.wait_next());
+  CHECK(PN_PROACTOR_INACTIVE == p.wait_next());
+
+  /* Set and cancel a timeout, make sure we don't get the timeout event */
+  pn_proactor_set_timeout(p, 10000000);
+  pn_proactor_cancel_timeout(p);
+  CHECK(PN_PROACTOR_INACTIVE == p.wait_next());
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+}
+
+namespace {
+
+class common_handler : public handler {
+  handler *accept_; // Handler for accepted connections
+
+public:
+  common_handler(handler *accept = 0) : accept_(accept) {}
+
+  bool handle(pn_event_t *e) CATCH_OVERRIDE {
+    switch (pn_event_type(e)) {
+      /* Always stop on these noteworthy events */
+    case PN_TRANSPORT_ERROR:
+    case PN_LISTENER_OPEN:
+    case PN_LISTENER_CLOSE:
+    case PN_PROACTOR_INACTIVE:
+      return true;
+
+    case PN_LISTENER_ACCEPT:
+      listener = pn_event_listener(e);
+      connection = pn_connection();
+      if (accept_) pn_connection_set_context(connection, accept_);
+      pn_listener_accept2(listener, connection, NULL);
+      return false;
+
+      // Return remote opens
+    case PN_CONNECTION_REMOTE_OPEN:
+      pn_connection_open(pn_event_connection(e));
+      return false;
+    case PN_SESSION_REMOTE_OPEN:
+      pn_session_open(pn_event_session(e));
+      return false;
+    case PN_LINK_REMOTE_OPEN:
+      pn_link_open(pn_event_link(e));
+      return false;
+
+      // Return remote closes
+    case PN_CONNECTION_REMOTE_CLOSE:
+      pn_connection_close(pn_event_connection(e));
+      return false;
+    case PN_SESSION_REMOTE_CLOSE:
+      pn_session_close(pn_event_session(e));
+      return false;
+    case PN_LINK_REMOTE_CLOSE:
+      pn_link_close(pn_event_link(e));
+      return false;
+
+    default:
+      return false;
+    }
+  }
+};
+
+/* close a connection when it is remote open */
+struct close_on_open_handler : public common_handler {
+  bool handle(pn_event_t *e) CATCH_OVERRIDE {
+    switch (pn_event_type(e)) {
+    case PN_CONNECTION_REMOTE_OPEN:
+      pn_connection_close(pn_event_connection(e));
+      return false;
+    default:
+      return common_handler::handle(e);
+    }
+  }
+};
+
+} // namespace
+
+/* Test simple client/server connection that opens and closes */
+TEST_CASE("proactor_connect") {
+  close_on_open_handler h;
+  proactor p(&h);
+  /* Connect and wait for close at both ends */
+  pn_listener_t *l = p.listen(":0", &h);
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  p.connect(l);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+}
+
+namespace {
+/* Return on connection open, close and return on wake */
+struct close_on_wake_handler : public common_handler {
+  bool handle(pn_event_t *e) CATCH_OVERRIDE {
+    switch (pn_event_type(e)) {
+    case PN_CONNECTION_WAKE:
+      pn_connection_close(pn_event_connection(e));
+      return true;
+    default:
+      return common_handler::handle(e);
+    }
+  }
+};
+} // namespace
+
+// Test waking up a connection that is idle
+TEST_CASE("proactor_connection_wake") {
+  common_handler h;
+  proactor p(&h);
+  close_on_wake_handler wh;
+  pn_listener_t *l = p.listen(":0");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  pn_connection_t *c = p.connect(l, &wh);
+  pn_incref(c); /* Keep a reference for wake() after free */
+
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  CHECK(!pn_proactor_get(p)); /* Should be idle */
+  pn_connection_wake(c);
+  REQUIRE_RUN(p, PN_CONNECTION_WAKE);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); /* Both ends */
+
+  /* Verify we don't get a wake after close even if they happen together */
+  pn_connection_t *c2 = p.connect(l, &wh);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  pn_connection_wake(c2);
+  pn_proactor_disconnect(p, NULL);
+  pn_connection_wake(c2);
+
+  for (pn_event_type_t et = p.run(); et != PN_PROACTOR_INACTIVE; et = p.run()) 
{
+    switch (et) {
+    case PN_TRANSPORT_ERROR:
+    case PN_TRANSPORT_CLOSED:
+    case PN_LISTENER_CLOSE:
+      break; // expected
+    default:
+      FAIL("Unexpected event type: " << et);
+    }
+  }
+  // The pn_connection_t is still valid so wake is legal but a no-op.
+  // Make sure there's no memory error.
+  pn_connection_wake(c);
+  pn_decref(c);
+}
+
+namespace {
+struct abort_handler : public common_handler {
+  bool handle(pn_event_t *e) {
+    switch (pn_event_type(e)) {
+    case PN_CONNECTION_REMOTE_OPEN:
+      /* Close the transport - abruptly closes the socket */
+      pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+      pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
+      return false;
+
+    default:
+      return common_handler::handle(e);
+    }
+  }
+};
+} // namespace
+
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP
+ * protocol close */
+TEST_CASE("proactor_abort") {
+  abort_handler sh; // Handle listener and server side of connection
+  proactor p(&sh);
+  pn_listener_t *l = p.listen();
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  common_handler ch; // Handle client side of connection
+  pn_connection_t *c = p.connect(l, &ch);
+
+  /* server transport closes */
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*sh.last_condition,
+             cond_matches("amqp:connection:framing-error", "abort"));
+
+  /* client transport closes */
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*ch.last_condition,
+             cond_matches("amqp:connection:framing-error", "abort"));
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  /* Verify expected event sequences, no unexpected events */
+  CHECK_THAT(ETYPES(PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN,
+                    PN_CONNECTION_BOUND, PN_TRANSPORT_TAIL_CLOSED,
+                    PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED,
+                    PN_TRANSPORT_CLOSED),
+             Equals(ch.log_clear()));
+  CHECK_THAT(ETYPES(PN_LISTENER_OPEN, PN_LISTENER_ACCEPT, PN_CONNECTION_INIT,
+                    PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN,
+                    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR,
+                    PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+                    PN_LISTENER_CLOSE, PN_PROACTOR_INACTIVE),
+             Equals(sh.log_clear()));
+}
+
+/* Test that INACTIVE event is generated when last connections/listeners 
closes.
+ */
+TEST_CASE("proactor_inactive") {
+  close_on_wake_handler h;
+  proactor p(&h);
+
+  /* Listen, connect, disconnect */
+  pn_listener_t *l = p.listen();
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  pn_connection_t *c = p.connect(l, &h);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  pn_connection_wake(c);
+  REQUIRE_RUN(p, PN_CONNECTION_WAKE);
+  /* Expect TRANSPORT_CLOSED both ends */
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  /* Immediate timer generates INACTIVE (no connections) */
+  pn_proactor_set_timeout(p, 0);
+  REQUIRE_RUN(p, PN_PROACTOR_TIMEOUT);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  /* Connect, set-timer, disconnect */
+  l = p.listen();
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  c = p.connect(l, &h);
+  pn_proactor_set_timeout(p, 1000000);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  pn_connection_wake(c);
+  REQUIRE_RUN(p, PN_CONNECTION_WAKE);
+  /* Expect TRANSPORT_CLOSED from client and server */
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  /* No INACTIVE till timer is cancelled */
+  CHECK(!pn_proactor_get(p)); // idle
+  pn_proactor_cancel_timeout(p);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+}
+
+/* Tests for error handling */
+TEST_CASE("proactor_errors") {
+  close_on_wake_handler h;
+  proactor p(&h);
+  /* Invalid connect/listen service name */
+  p.connect("127.0.0.1:xxx");
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", "xxx"));
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(p, l, "127.0.0.1:xxx", 1);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", "xxx"));
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  /* Invalid connect/listen host name */
+  p.connect("nosuch.example.com:");
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  pn_proactor_listen(p, pn_listener(), "nosuch.example.com:", 1);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  /* Listen on a port already in use */
+  l = p.listen(":0");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  std::string laddr = ":" + listening_port(l);
+  p.listen(laddr);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io"));
+
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+
+  /* Connect with no listener */
+  p.connect(laddr);
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", "refused"));
+}
+
+namespace {
+/* Closing the connection during PN_TRANSPORT_ERROR should be a no-op
+ * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
+ */
+struct transport_close_connection_handler : public common_handler {
+  bool handle(pn_event_t *e) {
+    switch (pn_event_type(e)) {
+    case PN_TRANSPORT_ERROR:
+      pn_connection_close(pn_event_connection(e));
+      break;
+    default:
+      return common_handler::handle(e);
+    }
+    return PN_EVENT_NONE;
+  }
+};
+} // namespace
+
+/* Closing the connection during PN_TRANSPORT_ERROR due to connection failure
+ * should be a no-op. Regression test for:
+ * https://issues.apache.org/jira/browse/PROTON-1586
+ */
+TEST_CASE("proactor_proton_1586") {
+  transport_close_connection_handler h;
+  proactor p(&h);
+  p.connect(":yyy");
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", ":yyy"));
+
+  // No events expected after PN_TRANSPORT_CLOSED, proactor is inactive.
+  CHECK(PN_PROACTOR_INACTIVE == p.wait_next());
+}
+
+/* Test that we can control listen/select on ipv6/v4 and listen on both by
+ * default */
+TEST_CASE("proactor_ipv4_ipv6") {
+  close_on_open_handler h;
+  proactor p(&h);
+
+  /* Listen on all interfaces for IPv4 only. */
+  pn_listener_t *l4 = p.listen("0.0.0.0");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  /* Empty address listens on both IPv4 and IPv6 on all interfaces */
+  pn_listener_t *l = p.listen(":0");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+
+#define EXPECT_CONNECT(LISTENER, HOST)                                         
\
+  do {                                                                         
\
+    p.connect(std::string(HOST) + ":" + listening_port(LISTENER));             
\
+    REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);                                       
\
+    CHECK_THAT(*h.last_condition, cond_empty());                               
\
+  } while (0)
+
+  EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */
+  EXPECT_CONNECT(l4, "");          /* local->v4*/
+
+  EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */
+  EXPECT_CONNECT(l, "");          /* local->all */
+
+  /* Listen on ipv6 loopback, if it fails skip ipv6 tests.
+
+     NOTE: Don't use the unspecified address "::" here - ipv6-disabled 
platforms
+     may allow listening on "::" without complaining. However they won't have a
+     local ipv6 loopback configured, so "::1" will force an error.
+  */
+  pn_listener_t *l6 = pn_listener();
+  pn_proactor_listen(p, l6, "::1:0", 4);
+  pn_event_type_t e = p.run();
+  if (e == PN_LISTENER_OPEN && !pn_condition_is_set(h.last_condition)) {
+    EXPECT_CONNECT(l6, "::1"); /* v6->v6 */
+    EXPECT_CONNECT(l6, "");    /* local->v6 */
+    EXPECT_CONNECT(l, "::1");  /* v6->all */
+    pn_listener_close(l6);
+  } else {
+    WARN("skip IPv6 tests: %s %s" << e << *h.last_condition);
+  }
+
+  pn_listener_close(l);
+  pn_listener_close(l4);
+}
+
+/* Make sure we clean up released connections and open sockets
+ * correctly */
+TEST_CASE("proactor_release_free") {
+  common_handler h;
+  proactor p(&h);
+
+  pn_listener_t *l = p.listen(":0");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  /* leave one connection to the proactor  */
+  pn_connection_t *c = p.connect(l);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+
+  {
+    /* release c1 and free immediately */
+    auto_free<pn_connection_t, pn_connection_free> c1(p.connect(l));
+    REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+    REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+    pn_proactor_release_connection(c1);
+  }
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+
+  /* release c2 and but don't free till after proactor free */
+  auto_free<pn_connection_t, pn_connection_free> c2(p.connect(l));
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  pn_proactor_release_connection(c2);
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+
+  // OK to free a listener/connection that was never used by a
+  // proactor.
+  pn_listener_free(pn_listener());
+  pn_connection_free(pn_connection());
+}
+
+#define SSL_FILE(NAME) CMAKE_CURRENT_SOURCE_DIR "/ssl-certs/" NAME
+#define SSL_PW "tserverpw"
+/* Windows vs. OpenSSL certificates */
+#if defined(_WIN32)
+#define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
+#define SET_CREDENTIALS(DOMAIN, NAME)                                          
\
+  pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
+#else
+#define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
+#define SET_CREDENTIALS(DOMAIN, NAME)                                          
\
+  pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME),                     
\
+                                SSL_FILE(NAME "-private-key.pem"), SSL_PW)
+#endif
+
+namespace {
+
+struct ssl_handler : public common_handler {
+  auto_free<pn_ssl_domain_t, pn_ssl_domain_free> ssl_domain;
+
+  ssl_handler(pn_ssl_domain_t *d) : ssl_domain(d) {}
+
+  bool handle(pn_event_t *e) {
+    switch (pn_event_type(e)) {
+
+    case PN_CONNECTION_BOUND:
+      CHECK(0 == pn_ssl_init(pn_ssl(pn_event_transport(e)), ssl_domain, NULL));
+      return false;
+
+    case PN_CONNECTION_REMOTE_OPEN: {
+      pn_ssl_t *ssl = pn_ssl(pn_event_transport(e));
+      CHECK(ssl);
+      char protocol[256];
+      CHECK(pn_ssl_get_protocol_name(ssl, protocol, sizeof(protocol)));
+      CHECK_THAT(protocol, Contains("TLS"));
+      pn_connection_t *c = pn_event_connection(e);
+      if (pn_connection_state(c) & PN_LOCAL_ACTIVE) {
+        pn_connection_close(c); // Client closes on completion.
+      } else {
+        pn_connection_open(c); // Server returns the OPEN
+      }
+      return true;
+    }
+    default:
+      return common_handler::handle(e);
+    }
+  }
+};
+
+} // namespace
+
+/* Test various SSL connections between proactors*/
+TEST_CASE("proactor_ssl") {
+  if (!pn_ssl_present()) {
+    WARN("Skip SSL tests, not available");
+    return;
+  }
+
+  ssl_handler client(pn_ssl_domain(PN_SSL_MODE_CLIENT));
+  ssl_handler server(pn_ssl_domain(PN_SSL_MODE_SERVER));
+  CHECK(0 == SET_CREDENTIALS(server.ssl_domain, "tserver"));
+  proactor p;
+  common_handler listener(&server); // Use server for accepted connections
+  pn_listener_t *l = p.listen(":0", &listener);
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+
+  /* Basic SSL connection */
+  p.connect(l, &client);
+  /* Open ok at both ends */
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  CHECK_THAT(*server.last_condition, cond_empty());
+  CHECK_THAT(*client.last_condition, cond_empty());
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+
+  /* Verify peer with good hostname */
+  pn_ssl_domain_t *cd = client.ssl_domain;
+  REQUIRE(0 == pn_ssl_domain_set_trusted_ca_db(cd, CERTIFICATE("tserver")));
+  REQUIRE(0 == pn_ssl_domain_set_peer_authentication(
+                   cd, PN_SSL_VERIFY_PEER_NAME, NULL));
+  pn_connection_t *c = pn_connection();
+  pn_connection_set_hostname(c, "test_server");
+  p.connect(l, &client, c);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  CHECK_THAT(*server.last_condition, cond_empty());
+  CHECK_THAT(*client.last_condition, cond_empty());
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+
+  /* Verify peer with bad hostname */
+  c = pn_connection();
+  pn_connection_set_hostname(c, "wrongname");
+  p.connect(l, &client, c);
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*client.last_condition,
+             cond_matches("amqp:connection:framing-error", "SSL"));
+}
+
+TEST_CASE("proactor_addr") {
+  /* Test the address formatter */
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), "foo", "bar");
+  CHECK_THAT("foo:bar", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), "foo", "");
+  CHECK_THAT("foo:", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), "foo", NULL);
+  CHECK_THAT("foo:", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), "", "bar");
+  CHECK_THAT(":bar", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), NULL, "bar");
+  CHECK_THAT(":bar", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "5");
+  CHECK_THAT("1:2:3:4:5", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "");
+  CHECK_THAT("1:2:3:4:", Equals(addr));
+  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", NULL);
+  CHECK_THAT("1:2:3:4:", Equals(addr));
+}
+
+/* Test pn_proactor_addr functions */
+
+TEST_CASE("proactor_netaddr") {
+  common_handler h;
+  proactor p(&h);
+  /* Use IPv4 to get consistent results all platforms */
+  pn_listener_t *l = p.listen("127.0.0.1");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  pn_connection_t *c = p.connect(l);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+  REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
+
+  // client remote, client local, server remote and server
+  // local address strings
+  char cr[1024], cl[1024], sr[1024], sl[1024];
+
+  pn_transport_t *ct = pn_connection_transport(c);
+  const pn_netaddr_t *na = pn_transport_remote_addr(ct);
+  pn_netaddr_str(na, cr, sizeof(cr));
+  CHECK_THAT(cr, Contains(listening_port(l)));
+
+  pn_connection_t *s = h.connection; /* server side of the connection */
+
+  pn_transport_t *st = pn_connection_transport(s);
+  pn_netaddr_str(pn_transport_local_addr(st), sl, sizeof(sl));
+  CHECK_THAT(cr, Equals(sl)); /* client remote == server local */
+
+  pn_netaddr_str(pn_transport_local_addr(ct), cl, sizeof(cl));
+  pn_netaddr_str(pn_transport_remote_addr(st), sr, sizeof(sr));
+  CHECK_THAT(cl, Equals(sr)); /* client local == server remote */
+
+  char host[PN_MAX_ADDR] = "";
+  char serv[PN_MAX_ADDR] = "";
+  CHECK(0 == pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv)));
+  CHECK_THAT("127.0.0.1", Equals(host));
+  CHECK(listening_port(l) == serv);
+
+  /* Make sure you can use NULL, 0 to get length of address
+   * string without a crash */
+  size_t len = pn_netaddr_str(pn_transport_local_addr(ct), NULL, 0);
+  CHECK(strlen(cl) == len);
+}
+
+TEST_CASE("proactor_parse_addr") {
+  char buf[1024];
+  const char *host, *port;
+
+  CHECK(0 == pni_parse_addr("foo:bar", buf, sizeof(buf), &host, &port));
+  CHECK_THAT("foo", Equals(host));
+  CHECK_THAT("bar", Equals(port));
+
+  CHECK(0 == pni_parse_addr("foo:", buf, sizeof(buf), &host, &port));
+  CHECK_THAT("foo", Equals(host));
+  CHECK_THAT("5672", Equals(port));
+
+  CHECK(0 == pni_parse_addr(":bar", buf, sizeof(buf), &host, &port));
+  CHECK(NULL == host);
+  CHECK_THAT("bar", Equals(port));
+
+  CHECK(0 == pni_parse_addr(":", buf, sizeof(buf), &host, &port));
+  CHECK(NULL == host);
+  CHECK_THAT("5672", Equals(port));
+
+  CHECK(0 == pni_parse_addr(":amqps", buf, sizeof(buf), &host, &port));
+  CHECK_THAT("5671", Equals(port));
+
+  CHECK(0 == pni_parse_addr(":amqp", buf, sizeof(buf), &host, &port));
+  CHECK_THAT("5672", Equals(port));
+
+  CHECK(0 == pni_parse_addr("::1:2:3", buf, sizeof(buf), &host, &port));
+  CHECK_THAT("::1:2", Equals(host));
+  CHECK_THAT("3", Equals(port));
+
+  CHECK(0 == pni_parse_addr(":::", buf, sizeof(buf), &host, &port));
+  CHECK_THAT("::", Equals(host));
+  CHECK_THAT("5672", Equals(port));
+
+  CHECK(0 == pni_parse_addr("", buf, sizeof(buf), &host, &port));
+  CHECK(NULL == host);
+  CHECK_THAT("5672", Equals(port));
+}
+
+/* Test pn_proactor_disconnect */
+TEST_CASE("proactor_disconnect") {
+  common_handler ch, sh;
+  proactor client(&ch), server(&sh);
+
+  // Start two listeners on the server
+  pn_listener_t *l = server.listen(":0");
+  REQUIRE_RUN(server, PN_LISTENER_OPEN);
+  pn_listener_t *l2 = server.listen(":0");
+  REQUIRE_RUN(server, PN_LISTENER_OPEN);
+
+  // Two connections from client
+  pn_connection_t *c = client.connect(l);
+  CHECK_CORUN(client, server, PN_CONNECTION_REMOTE_OPEN);
+  pn_connection_t *c2 = client.connect(l);
+  CHECK_CORUN(client, server, PN_CONNECTION_REMOTE_OPEN);
+
+  /* Disconnect the client proactor */
+  auto_free<pn_condition_t, pn_condition_free> cond(pn_condition());
+  pn_condition_format(cond, "test-name", "test-description");
+  pn_proactor_disconnect(client, cond);
+
+  /* Verify expected client side first */
+  CHECK_CORUN(client, server, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*client.handler->last_condition,
+             cond_matches("test-name", "test-description"));
+  CHECK_CORUN(client, server, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*client.handler->last_condition,
+             cond_matches("test-name", "test-description"));
+  REQUIRE_RUN(client, PN_PROACTOR_INACTIVE);
+
+  /* Now check server sees the disconnects */
+  CHECK_CORUN(server, client, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*server.handler->last_condition,
+             cond_matches("amqp:connection:framing-error", "aborted"));
+  CHECK_CORUN(server, client, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*server.handler->last_condition,
+             cond_matches("amqp:connection:framing-error", "aborted"));
+
+  /* Now disconnect the server end (the listeners) */
+  pn_proactor_disconnect(server, NULL);
+  REQUIRE_RUN(server, PN_LISTENER_CLOSE);
+  REQUIRE_RUN(server, PN_LISTENER_CLOSE);
+  REQUIRE_RUN(server, PN_PROACTOR_INACTIVE);
+
+  /* Make sure the proactors are still functional */
+  pn_listener_t *l3 = server.listen(":0");
+  REQUIRE_RUN(server, PN_LISTENER_OPEN);
+  client.connect(l3);
+  CHECK_CORUN(client, server, PN_CONNECTION_REMOTE_OPEN);
+}
+
+namespace {
+const size_t FRAME = 512;                    /* Smallest legal frame */
+const ssize_t CHUNK = (FRAME + FRAME / 2);   /* Chunk overflows frame */
+const size_t BODY = (CHUNK * 3 + CHUNK / 2); /* Body doesn't fit into chunks */
+} // namespace
+
+struct message_stream_handler : public common_handler {
+  pn_link_t *sender;
+  pn_delivery_t *dlv;
+  pn_rwbytes_t send_buf, recv_buf;
+  ssize_t size, sent, received;
+  bool complete;
+
+  message_stream_handler()
+      : sender(), dlv(), send_buf(), recv_buf(), size(), sent(), received(),
+        complete() {}
+
+  bool handle(pn_event_t *e) {
+    switch (pn_event_type(e)) {
+    case PN_CONNECTION_BOUND:
+      pn_transport_set_max_frame(pn_event_transport(e), FRAME);
+      return false;
+
+    case PN_SESSION_INIT:
+      pn_session_set_incoming_capacity(pn_event_session(e),
+                                       FRAME); /* Single frame incoming */
+      pn_session_set_outgoing_window(pn_event_session(e),
+                                     1); /* Single frame outgoing */
+      return false;
+
+    case PN_LINK_REMOTE_OPEN:
+      common_handler::handle(e);
+      if (pn_link_is_receiver(pn_event_link(e))) {
+        pn_link_flow(pn_event_link(e), 1);
+      } else {
+        sender = pn_event_link(e);
+      }
+      return false;
+
+    case PN_LINK_FLOW: /* Start a delivery */
+      if (pn_link_is_sender(pn_event_link(e)) && !dlv) {
+        dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1));
+      }
+      return false;
+
+    case PN_CONNECTION_WAKE: { /* Send a chunk */
+      ssize_t remains = size - sent;
+      ssize_t n = (CHUNK < remains) ? CHUNK : remains;
+      CHECK(n == pn_link_send(sender, send_buf.start + sent, n));
+      sent += n;
+      if (sent == size) {
+        CHECK(pn_link_advance(sender));
+      }
+      return false;
+    }
+
+    case PN_DELIVERY: { /* Receive a delivery - smaller than a
+                           chunk? */
+      pn_delivery_t *dlv = pn_event_delivery(e);
+      if (pn_delivery_readable(dlv)) {
+        ssize_t n = pn_delivery_pending(dlv);
+        rwbytes_ensure(&recv_buf, received + n);
+        REQUIRE(n ==
+                pn_link_recv(pn_event_link(e), recv_buf.start + received, n));
+        received += n;
+      }
+      complete = !pn_delivery_partial(dlv);
+      return true;
+    }
+    default:
+      return common_handler::handle(e);
+    }
+  }
+};
+
+/* Test sending/receiving a message in chunks */
+TEST_CASE("proactor_message_stream") {
+  message_stream_handler h;
+  proactor p(&h);
+
+  pn_listener_t *l = p.listen(":0");
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+
+  /* Encode a large (not very) message to send in chunks */
+  auto_free<pn_message_t, pn_message_free> m(pn_message());
+  pn_data_put_binary(pn_message_body(m), pn_bytes(std::string(BODY, 'x')));
+  h.size = pn_message_encode2(m, &h.send_buf);
+
+  pn_connection_t *c = p.connect(l);
+  pn_session_t *ssn = pn_session(c);
+  pn_session_open(ssn);
+  pn_link_t *snd = pn_sender(ssn, "x");
+  pn_link_open(snd);
+  REQUIRE_RUN(p, PN_LINK_FLOW);
+
+  /* Send and receive the message in chunks */
+  do {
+    pn_connection_wake(c); /* Initiate send/receive of one chunk */
+    do {                   /* May be multiple receives for one send */
+      REQUIRE_RUN(p, PN_DELIVERY);
+    } while (h.received < h.sent);
+  } while (!h.complete);
+  CHECK(h.received == h.size);
+  CHECK(h.sent == h.size);
+  CHECK(!memcmp(h.send_buf.start, h.recv_buf.start, h.size));
+
+  free(h.send_buf.start);
+  free(h.recv_buf.start);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to