This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 523f28615 PROTON-2791: Add MSG_MORE send flag on raw connections
523f28615 is described below
commit 523f2861592f29873877463266ffab24f14aa708
Author: Cliff Jansen <[email protected]>
AuthorDate: Mon Feb 12 09:10:16 2024 -0800
PROTON-2791: Add MSG_MORE send flag on raw connections
---
c/src/proactor/epoll_raw_connection.c | 6 ++++--
c/src/proactor/raw_connection-internal.h | 2 +-
c/src/proactor/raw_connection.c | 5 +++--
c/tests/raw_connection_test.cpp | 8 +++++---
4 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/c/src/proactor/epoll_raw_connection.c
b/c/src/proactor/epoll_raw_connection.c
index 7e4dcb4a4..b7547f9f9 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -350,8 +350,10 @@ task_t *pni_raw_connection_task(praw_connection_t *rc) {
return &rc->task;
}
-static long snd(int fd, const void* b, size_t s) {
- return send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+static long snd(int fd, const void* b, size_t s, bool more) {
+ int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
+ if (more) flags |= MSG_MORE;
+ return send(fd, b, s, flags);
}
static long rcv(int fd, void* b, size_t s) {
diff --git a/c/src/proactor/raw_connection-internal.h
b/c/src/proactor/raw_connection-internal.h
index fe0e29fe0..eb38e81d3 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -132,7 +132,7 @@ void pni_raw_close(pn_raw_connection_t *conn);
void pni_raw_read_close(pn_raw_connection_t *conn);
void pni_raw_write_close(pn_raw_connection_t *conn);
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int,
void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
-void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int,
const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *,
int));
+void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int,
const void*, size_t, bool), void (*set_error)(pn_raw_connection_t *, const char
*, int));
void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int
(*shutdown_rd)(int), int (*shutdown_wr)(int));
void pni_raw_async_disconnect(pn_raw_connection_t *conn);
bool pni_raw_can_read(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 0c2118e01..27a699617 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -566,7 +566,7 @@ finished_reading:
return;
}
-void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int,
const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *,
int)) {
+void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int,
const void*, size_t, bool), void(*set_error)(pn_raw_connection_t *, const char
*, int)) {
assert(conn);
if (pni_raw_wdrained(conn)) return;
@@ -578,7 +578,8 @@ void pni_raw_write(pn_raw_connection_t *conn, int sock,
long (*send)(int, const
assert(conn->wbuffers[p-1].type == buff_unwritten);
char *bytes =
conn->wbuffers[p-1].bytes+conn->wbuffers[p-1].offset+conn->unwritten_offset;
size_t s = conn->wbuffers[p-1].size-conn->unwritten_offset;
- int r = send(sock, bytes, s);
+ bool more = conn->wbuffers[p-1].next != 0;
+ int r = send(sock, bytes, s, more);
if (r < 0) {
// Interrupted system call try again
switch (errno) {
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
index 7378d7541..92cc2ad0f 100644
--- a/c/tests/raw_connection_test.cpp
+++ b/c/tests/raw_connection_test.cpp
@@ -87,10 +87,12 @@ namespace {
::shutdown(fd, SHUT_WR);
}
- long snd(int fd, const void* b, size_t s) {
+ long snd(int fd, const void* b, size_t s, bool more) {
write_err = 0;
+ int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
+ if (more) flags |= MSG_MORE;
if (max_send_size && max_send_size < s) s = max_send_size;
- return ::send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+ return ::send(fd, b, s, flags);
}
int makepair(int fds[2]) {
@@ -164,7 +166,7 @@ namespace {
return s;
}
- long snd(int fd, const void* b, size_t s){
+ long snd(int fd, const void* b, size_t s, bool /* more: unused */ ){
CHECK(fd < buffers.size());
write_err = 0;
if (max_send_size && max_send_size < s) s = max_send_size;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]