This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new cb637b79e PROTON-2748: Raw connection async close fix and tests. First
part of pull 402
cb637b79e is described below
commit cb637b79ee2a4cb74c5332975c510f672c2c05fa
Author: Clifford Jansen <[email protected]>
AuthorDate: Mon Sep 11 23:00:12 2023 -0700
PROTON-2748: Raw connection async close fix and tests. First part of pull
402
---
c/src/proactor/epoll_raw_connection.c | 96 ++++---
c/src/proactor/raw_connection-internal.h | 2 +
c/src/proactor/raw_connection.c | 35 ++-
c/tests/raw_wake_test.cpp | 417 ++++++++++++++++++++++++++++++-
4 files changed, 499 insertions(+), 51 deletions(-)
diff --git a/c/src/proactor/epoll_raw_connection.c
b/c/src/proactor/epoll_raw_connection.c
index 9b85b15f1..c0e731066 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,7 +50,8 @@ struct praw_connection_t {
struct addrinfo *ai; /* Current connect address */
bool connected;
bool disconnected;
- bool batch_empty;
+ bool hup_detected;
+ bool read_check;
};
static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -318,10 +319,7 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t
*batch) {
unlock(&rc->task.mutex);
if (waking) pni_raw_wake(raw);
- pn_event_t *e = pni_raw_event_next(raw);
- if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED)
- rc->batch_empty = true;
- return e;
+ return pni_raw_event_next(raw);
}
task_t *pni_psocket_raw_task(psocket_t* ps) {
@@ -393,10 +391,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
if (rc->disconnected) {
pni_raw_connect_failed(&rc->raw_connection);
unlock(&rc->task.mutex);
- rc->batch_empty = false;
return &rc->batch;
}
if (events & (EPOLLHUP | EPOLLERR)) {
+ // Continuation of praw_connection_maybe_connect_lh() logic.
// A wake can be the first event. Otherwise, wait for connection to
complete.
bool event_pending = task_wake ||
pni_raw_wake_is_pending(&rc->raw_connection) ||
pn_collector_peek(rc->raw_connection.collector);
t->working = event_pending;
@@ -405,35 +403,46 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);
+ unlock(&rc->task.mutex);
+ return &rc->batch;
}
unlock(&rc->task.mutex);
- if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+ if (events & EPOLLERR) {
+ // Read and write sides closed via RST. Tear down immediately.
+ int soerr;
+ socklen_t soerrlen = sizeof(soerr);
+ int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen);
+ if (ec == 0 && soerr) {
+ psocket_error(rc, soerr, "async disconnect");
+ }
+ pni_raw_async_disconnect(&rc->raw_connection);
+ return &rc->batch;
+ }
+ if (events & EPOLLHUP) {
+ rc->hup_detected = true;
+ }
+
+ if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) {
+ pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+ rc->read_check = false;
+ }
if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd,
set_error);
- rc->batch_empty = false;
return &rc->batch;
}
void pni_raw_connection_done(praw_connection_t *rc) {
bool notify = false;
bool ready = false;
- bool have_event = false;
-
- // If !batch_empty, can't be sure state machine up to date, so reschedule
task if necessary.
- if (!rc->batch_empty) {
- if (pn_collector_peek(rc->raw_connection.collector))
- have_event = true;
- else {
- pn_event_t *e = pni_raw_batch_next(&rc->batch);
- // State machine up to date.
- if (e) {
- have_event = true;
- // Sole event. Can put back without order issues.
- // Edge case, performance not important.
- pn_collector_put(rc->raw_connection.collector, pn_event_class(e),
pn_event_context(e), pn_event_type(e));
- }
- }
- }
+ pn_raw_connection_t *raw = &rc->raw_connection;
+ int fd = rc->psocket.epoll_io.fd;
+
+ // Try write
+ if (pni_raw_can_write(raw)) pni_raw_write(raw, fd, snd, set_error);
+ pni_raw_process_shutdown(raw, fd, shutr, shutw);
+
+ // Update state machine and check for possible pending event.
+ bool have_event = pni_raw_batch_has_events(raw);
lock(&rc->task.mutex);
pn_proactor_t *p = rc->task.proactor;
@@ -442,24 +451,31 @@ void pni_raw_connection_done(praw_connection_t *rc) {
// The task may be in the ready state even if we've got no raw connection
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
notify = (pni_task_wake_pending(&rc->task) || have_event) &&
schedule(&rc->task);
- ready = rc->task.ready;
+ ready = rc->task.ready; // No need to poll. Already scheduled.
unlock(&rc->task.mutex);
- pn_raw_connection_t *raw = &rc->raw_connection;
- int fd = rc->psocket.epoll_io.fd;
- pni_raw_process_shutdown(raw, fd, shutr, shutw);
- int wanted =
- (pni_raw_can_read(raw) ? EPOLLIN : 0) |
- (pni_raw_can_write(raw) ? EPOLLOUT : 0);
- if (wanted) {
- rc->psocket.epoll_io.wanted = wanted;
- rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
+ bool finished_disconnect = raw->state==conn_fini && !ready &&
!raw->disconnectpending;
+ if (finished_disconnect) {
+ // If we're closed and we've sent the disconnect then close
+ pni_raw_finalize(raw);
+ praw_connection_cleanup(rc);
+ } else if (ready) {
+ // Already scheduled to run. Skip poll. Remember if we want a read.
+ rc->read_check = pni_raw_can_read(raw);
+ } else if (!rc->connected) {
+ // Connect logic has already armed the socket.
} else {
- bool finished_disconnect = raw->state==conn_fini && !ready &&
!raw->disconnectpending;
- if (finished_disconnect) {
- // If we're closed and we've sent the disconnect then close
- pni_raw_finalize(raw);
- praw_connection_cleanup(rc);
+ // Must poll for IO.
+ int wanted =
+ (pni_raw_can_read(raw) ? (EPOLLIN | EPOLLRDHUP) : 0) |
+ (pni_raw_can_write(raw) ? EPOLLOUT : 0);
+
+ // wanted == 0 implies we block until either application wake() or
EPOLLHUP | EPOLLERR.
+ // If wanted == 0 and hup_detected, blocking not possible, so skip arming
until
+ // application provides read buffers.
+ if (wanted || !rc->hup_detected) {
+ rc->psocket.epoll_io.wanted = wanted;
+ rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for
error
}
}
diff --git a/c/src/proactor/raw_connection-internal.h
b/c/src/proactor/raw_connection-internal.h
index 47b0ea925..cdfd5a852 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -134,9 +134,11 @@ void pni_raw_write_close(pn_raw_connection_t *conn);
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int,
void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int,
const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *,
int));
void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int
(*shutdown_rd)(int), int (*shutdown_wr)(int));
+void pni_raw_async_disconnect(pn_raw_connection_t *conn);
bool pni_raw_can_read(pn_raw_connection_t *conn);
bool pni_raw_can_write(pn_raw_connection_t *conn);
pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn);
+bool pni_raw_batch_has_events(pn_raw_connection_t *conn);
void pni_raw_initialize(pn_raw_connection_t *conn);
void pni_raw_finalize(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 3d8b976c6..89fbbd1dd 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -669,12 +669,14 @@ bool pni_raw_can_write(pn_raw_connection_t *conn) {
return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite;
}
-pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
+bool pni_raw_batch_has_events(pn_raw_connection_t *conn) {
+ // If collector empty, advance state machine as necessary and generate next
event.
+ // Return true if at least one event is available.
assert(conn);
do {
- pn_event_t *event = pn_collector_next(conn->collector);
+ pn_event_t *event = pn_collector_peek(conn->collector);
if (event) {
- return pni_log_event(conn, event);
+ return true;
} else if (conn->connectpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
conn->connectpending = false;
@@ -716,11 +718,20 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn)
{
pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
conn->rrequestedbuffers = true;
} else {
- return NULL;
+ return false;
}
} while (true);
}
+pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
+ if (pni_raw_batch_has_events(conn)) {
+ pn_event_t* event = pn_collector_next(conn->collector);
+ assert(event);
+ return pni_log_event(conn, event);
+ }
+ return NULL;
+}
+
void pni_raw_read_close(pn_raw_connection_t *conn) {
// If already fully closed nothing to do
if (pni_raw_rwclosed(conn)) {
@@ -781,6 +792,22 @@ void pni_raw_close(pn_raw_connection_t *conn) {
}
}
+void pni_raw_async_disconnect(pn_raw_connection_t *conn) {
+ if (pni_raw_rwclosed(conn))
+ return;
+
+ if (!pni_raw_rclosed(conn)) {
+ conn->state = pni_raw_new_state(conn, conn_read_closed);
+ conn->rclosedpending = true;
+ }
+ if (!pni_raw_wclosed(conn)) {
+ pni_raw_release_buffers(conn);
+ conn->state = pni_raw_new_state(conn, conn_write_closed);
+ conn->wclosedpending = true;
+ }
+ pni_raw_disconnect(conn);
+}
+
bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
assert(conn);
return pni_raw_rclosed(conn);
diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp
index 4a5dc23d3..80ddc2028 100644
--- a/c/tests/raw_wake_test.cpp
+++ b/c/tests/raw_wake_test.cpp
@@ -28,11 +28,22 @@
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
+#include <arpa/inet.h>
#endif
#include <string.h>
-// WAKE tests require a running proactor.
+// Raw connection tests driven by a proactor.
+
+// These tests often cheat by directly calling API functions that
+// would normally be called in an event callback for thread safety
+// reasons. This can usually work because the proactors and API calls
+// are all called from a single thread so there is no contention, but
+// the raw connection may require a wake so that the state machine and
+// polling mask can be updated. Note that wakes stop working around
+// the time the raw connection thinks it is about to be fully closed,
+// so close operations may need to be done in event callbacks to
+// avoid wake uncertainty.
#include "../src/proactor/proactor-internal.h"
#include "./pn_test_proactor.hpp"
@@ -45,14 +56,32 @@ namespace {
class common_handler : public handler {
bool close_on_wake_;
+ bool write_close_on_wake_;
+ bool stop_on_wake_;
+ bool abort_on_wake_;
+ int closed_read_count_;
+ int closed_write_count_;
+ int disconnect_count_;
+ bool disconnect_error_;
pn_raw_connection_t *last_server_;
+ pn_raw_buffer_t write_buff_;
public:
- explicit common_handler() : close_on_wake_(false), last_server_(0) {}
+ explicit common_handler() : close_on_wake_(false), write_close_on_wake_(0),
stop_on_wake_(false),
+ abort_on_wake_(false), closed_read_count_(0),
closed_write_count_(0),
+ disconnect_count_(0), disconnect_error_(false),
+ last_server_(0), write_buff_({0}) {}
void set_close_on_wake(bool b) { close_on_wake_ = b; }
-
+ void set_write_close_on_wake(bool b) { write_close_on_wake_ = b; }
+ void set_stop_on_wake(bool b) { stop_on_wake_ = b; }
+ void set_abort_on_wake(bool b) { abort_on_wake_ = b; }
+ int closed_read_count() { return closed_read_count_; }
+ int closed_write_count() { return closed_write_count_; }
+ int disconnect_count() { return disconnect_count_; }
+ bool disconnect_error() { return disconnect_error_; }
pn_raw_connection_t *last_server() { return last_server_; }
+ void set_write_on_wake(pn_raw_buffer_t *b) { write_buff_ = *b; }
bool handle(pn_event_t *e) override {
switch (pn_event_type(e)) {
@@ -71,13 +100,41 @@ public:
} break;
case PN_RAW_CONNECTION_WAKE: {
- if (close_on_wake_) {
- pn_raw_connection_t *rc = pn_event_raw_connection(e);
+ if (abort_on_wake_) abort();
+ pn_raw_connection_t *rc = pn_event_raw_connection(e);
+
+ if (write_buff_.size) {
+ // Add the buff for writing before any close operation.
+ CHECK(pn_raw_connection_write_buffers(rc, &write_buff_, 1) == 1);
+ write_buff_.size = 0;
+ }
+ if (write_close_on_wake_)
+ pn_raw_connection_write_close(rc);
+ if (close_on_wake_)
pn_raw_connection_close(rc);
+ return stop_on_wake_;
+ } break;
+
+ case PN_RAW_CONNECTION_DISCONNECTED: {
+ disconnect_count_++;
+ pn_raw_connection_t *rc = pn_event_raw_connection(e);
+ pn_condition_t *cond = pn_raw_connection_condition(rc);
+ if (disconnect_count_ == 1 && pn_condition_is_set(cond)) {
+ const char *nm = pn_condition_get_name(cond);
+ const char *ds = pn_condition_get_description(cond);
+ if (nm && strlen(nm) > 0 && ds && strlen(ds) > 0)
+ disconnect_error_ = true;
}
- return true;
+ return false;
} break;
+ case PN_RAW_CONNECTION_CLOSED_READ:
+ closed_read_count_++;
+ return false;
+
+ case PN_RAW_CONNECTION_CLOSED_WRITE:
+ closed_write_count_++;
+ return false;
default:
return false;
@@ -85,9 +142,127 @@ public:
}
};
+static const size_t buffsz = 128;
+
+// Basic test consisting of
+// client is an OS socket.
+// server is a pn_raw_connection_t with one shared read/write buffer.
+// pn_listener_t used to put the two together.
+struct basic_test {
+ common_handler h;
+ proactor p;
+ pn_listener_t *l;
+ int sockfd; // client
+ pn_raw_connection_t *server_rc;
+ char buff[buffsz];
+ bool buff_in_use;
+
+ basic_test() : h(), p(&h) {
+ l = p.listen();
+ REQUIRE_RUN(p, PN_LISTENER_OPEN);
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ REQUIRE(sockfd >= 0);
+ struct sockaddr_in laddr;
+ memset(&laddr, 0, sizeof(laddr));
+ laddr.sin_family = AF_INET;
+ laddr.sin_port = htons(atoi(pn_test::listening_port(l).c_str()));
+ laddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ connect(sockfd, (const struct sockaddr*) &laddr, sizeof(laddr));
+
+ REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
+ server_rc = h.last_server();
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+ pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
+ CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1);
+ buff_in_use = true;
+
+ pn_raw_connection_wake(server_rc);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+ CHECK(pn_proactor_get(p) == NULL); /* idle */
+ }
+
+ ~basic_test() {
+ pn_listener_close(l);
+ REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+ REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+ if (sockfd >= 0) close(sockfd);
+ bool sanity = h.closed_read_count() == 1 && h.closed_write_count() == 1 &&
+ h.disconnect_count() == 1;
+ REQUIRE(sanity == true);
+ }
+
+ void socket_write_close() {
+ if (sockfd < 0) return;
+ shutdown(sockfd, SHUT_WR);
+ }
+
+ void socket_graceful_close() {
+ if (sockfd < 0) return;
+ close(sockfd);
+ sockfd = -1;
+ }
+
+ bool socket_hard_close() {
+ // RST (not FIN), hard/abort close
+ if (sockfd < 0) return false;
+ struct linger lngr;
+ lngr.l_onoff = 1;
+ lngr.l_linger = 0;
+ if (sockfd < 0) return false;
+ if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lngr, sizeof(lngr)) == 0) {
+ if (close(sockfd) == 0) {
+ sockfd = -1;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void drain_read_buffer() {
+ assert(buff_in_use);
+ send(sockfd, "FOO", 3, 0);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_READ);
+ pn_raw_buffer_t rb = {0};
+ REQUIRE(pn_raw_connection_take_read_buffers(server_rc, &rb, 1) == 1);
+ REQUIRE(rb.size == 3);
+ buff_in_use = false;
+ }
+
+ void give_read_buffer() {
+ assert(!buff_in_use);
+ pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
+ CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1);
+ buff_in_use = true;
+ }
+
+ void write_next_wake(const char *m) {
+ assert(!buff_in_use);
+ pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
+ size_t l = strlen(m);
+ assert(l < buffsz);
+ strcpy(rb.bytes, m);
+ rb.size = l;
+ h.set_write_on_wake(&rb);
+ }
+
+ int drain_events() {
+ int ec = 0;
+ pn_event_batch_t *batch = NULL;
+ while (batch = pn_proactor_get(p.get())) {
+ pn_event_t *e;
+ while (e = pn_event_batch_next(batch)) {
+ ec++;
+ h.dispatch(e);
+ }
+ pn_proactor_done(p.get(), batch);
+ }
+ return ec;
+ }
+};
} // namespace
+
// Test waking up a connection that is idle
TEST_CASE("proactor_raw_connection_wake") {
common_handler h;
@@ -104,7 +279,7 @@ TEST_CASE("proactor_raw_connection_wake") {
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
CHECK(pn_proactor_get(p) == NULL); /* idle */
- pn_raw_connection_wake(rc);
+ pn_raw_connection_wake(rc);
REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
CHECK(pn_proactor_get(p) == NULL); /* idle */
@@ -119,3 +294,231 @@ TEST_CASE("proactor_raw_connection_wake") {
REQUIRE_RUN(p, PN_LISTENER_CLOSE);
REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
}
+
+// Normal close
+TEST_CASE("raw_connection_graceful_close") {
+ struct basic_test x;
+ x.socket_graceful_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.disconnect_error() == false);
+}
+
+// HARD close
+TEST_CASE("raw_connection_hardclose") {
+ struct basic_test x;
+ x.socket_hard_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.disconnect_error() == true);
+}
+
+// HARD close, no read buffer
+TEST_CASE("raw_connection_hardclose_nrb") {
+ struct basic_test x;
+ // Drain read buffer without replenishing
+ x.drain_read_buffer();
+ x.drain_events();
+ CHECK(pn_proactor_get(x.p) == NULL); /* idle */
+ x.socket_hard_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.disconnect_error() == true);
+}
+
+// HARD close after read close
+TEST_CASE("raw_connection_readclose_then_hardclose") {
+ struct basic_test x;
+ x.socket_write_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+ x.drain_events();
+ REQUIRE(x.h.disconnect_count() == 0);
+ x.socket_hard_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.disconnect_error() == true);
+}
+
+// HARD close after read close, no read buffer
+TEST_CASE("raw_connection_readclose_then_hardclose_nrb") {
+ struct basic_test x;
+ // Drain read buffer without replenishing
+ x.drain_read_buffer();
+ x.drain_events();
+ CHECK(pn_proactor_get(x.p) == NULL); /* idle */
+ // Shut of read side should be ignored with no read buffer.
+ x.socket_write_close();
+ CHECK(pn_proactor_get(x.p) == NULL); /* still idle */
+
+ // Confirm raw connection shuts down, even with no read buffer
+ x.socket_hard_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.disconnect_error() == true);
+}
+
+// Normal close on socket delays CLOSED_READ event until application makes
read buffers available
+TEST_CASE("raw_connection_delay_readclose") {
+ struct basic_test x;
+ x.drain_read_buffer();
+ x.socket_graceful_close();
+ x.drain_events();
+ REQUIRE(x.h.closed_read_count() == 0);
+
+ x.give_read_buffer();
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+ REQUIRE(x.h.closed_read_count() == 1);
+
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+TEST_CASE("raw_connection_rst_on_write") {
+ struct basic_test x;
+ x.drain_read_buffer();
+
+ // Send some data
+ x.write_next_wake("foo");
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WRITTEN);
+ pn_raw_buffer_t rb = {0};
+ CHECK(pn_raw_connection_take_written_buffers(x.server_rc, &rb, 1) == 1);
+ char b[buffsz];
+ REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3);
+
+ // Repeat, with closed peer socket.
+ x.socket_graceful_close();
+ x.write_next_wake("bar");
+ pn_raw_connection_wake(x.server_rc);
+ // Write or subsequent poll should fail EPIPE
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.disconnect_error() == true);
+}
+
+// One sided close. No cooperation from peer.
+TEST_CASE("raw_connection_full_close") {
+ struct basic_test x;
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ // No send/recv/close/shutdown activity from peer socket.
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+// As above. No read buffer.
+TEST_CASE("raw_connection_full_close_nrb") {
+ struct basic_test x;
+ x.drain_read_buffer();
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ // No send/recv/close/shutdown activity from peer socket.
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+// One sided close, pending write.
+TEST_CASE("raw_connection_close_wdrain") {
+ struct basic_test x;
+ x.drain_read_buffer();
+ // write and then close on next wake
+ x.write_next_wake("fubar");
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ // No send/recv/close/shutdown activity from peer socket.
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ // Now check fubar made it
+ char b[buffsz];
+ REQUIRE(recv(x.sockfd, b, buffsz, 0) == 5);
+ REQUIRE(strncmp("fubar", b, 5) == 0);
+}
+
+// One sided write_close then close.
+TEST_CASE("raw_connection_wclose_full_close") {
+ struct basic_test x;
+ x.h.set_write_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE);
+ x.drain_events();
+ REQUIRE(x.h.closed_read_count() == 0);
+
+ x.h.set_write_close_on_wake(false);
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ // No send/recv/close/shutdown activity from peer socket.
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+TEST_CASE("raw_connection_wclose_full_close_nrb") {
+ struct basic_test x;
+ x.drain_read_buffer();
+ x.h.set_write_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE);
+ x.drain_events();
+ REQUIRE(x.h.closed_read_count() == 0);
+
+ x.h.set_write_close_on_wake(false);
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ // No send/recv/close/shutdown activity from peer socket.
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+TEST_CASE("raw_connection_wclose_full_close_wdrain") {
+ struct basic_test x;
+ x.drain_read_buffer();
+ // write and then wclose then close on next wake
+ x.write_next_wake("bar");
+ x.h.set_write_close_on_wake(true);
+ x.h.set_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+ // No send/recv/close/shutdown activity from peer socket.
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ // Now check bar made it
+ char b[buffsz];
+ REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3);
+ REQUIRE(strncmp("bar", b, 3) == 0);
+}
+
+// Half closes each direction. Raw connection then peer.
+TEST_CASE("raw_connection_wclose_then_rclose") {
+ struct basic_test x;
+ x.h.set_write_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ x.drain_events();
+ REQUIRE(x.h.closed_write_count() == 1);
+ REQUIRE(x.h.closed_read_count() == 0);
+
+ char b[buffsz];
+ REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF
+ x.socket_write_close();
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ REQUIRE(x.h.closed_read_count() == 1);
+}
+
+// As above but peer first then raw connection.
+TEST_CASE("raw_connection_rclose_then_wclose") {
+ struct basic_test x;
+ x.socket_write_close();
+ x.drain_events();
+ REQUIRE(x.h.closed_read_count() == 1);
+ REQUIRE(x.h.closed_write_count() == 0);
+
+ x.h.set_write_close_on_wake(true);
+ pn_raw_connection_wake(x.server_rc);
+ REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+ char b[buffsz];
+ REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF
+ REQUIRE(x.h.closed_write_count() == 1);
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]