This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 523f28615 PROTON-2791: Add MSG_MORE send flag on raw connections
523f28615 is described below

commit 523f2861592f29873877463266ffab24f14aa708
Author: Cliff Jansen <[email protected]>
AuthorDate: Mon Feb 12 09:10:16 2024 -0800

    PROTON-2791: Add MSG_MORE send flag on raw connections
---
 c/src/proactor/epoll_raw_connection.c    | 6 ++++--
 c/src/proactor/raw_connection-internal.h | 2 +-
 c/src/proactor/raw_connection.c          | 5 +++--
 c/tests/raw_connection_test.cpp          | 8 +++++---
 4 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 7e4dcb4a4..b7547f9f9 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -350,8 +350,10 @@ task_t *pni_raw_connection_task(praw_connection_t *rc) {
   return &rc->task;
 }
 
-static long snd(int fd, const void* b, size_t s) {
-  return send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+static long snd(int fd, const void* b, size_t s, bool more) {
+  int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
+  if (more) flags |= MSG_MORE;
+  return send(fd, b, s, flags);
 }
 
 static long rcv(int fd, void* b, size_t s) {
diff --git a/c/src/proactor/raw_connection-internal.h 
b/c/src/proactor/raw_connection-internal.h
index fe0e29fe0..eb38e81d3 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -132,7 +132,7 @@ void pni_raw_close(pn_raw_connection_t *conn);
 void pni_raw_read_close(pn_raw_connection_t *conn);
 void pni_raw_write_close(pn_raw_connection_t *conn);
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, 
void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
-void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, 
const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, 
int));
+void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, 
const void*, size_t, bool), void (*set_error)(pn_raw_connection_t *, const char 
*, int));
 void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int 
(*shutdown_rd)(int), int (*shutdown_wr)(int));
 void pni_raw_async_disconnect(pn_raw_connection_t *conn);
 bool pni_raw_can_read(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 0c2118e01..27a699617 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -566,7 +566,7 @@ finished_reading:
   return;
 }
 
-void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, 
const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, 
int)) {
+void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, 
const void*, size_t, bool), void(*set_error)(pn_raw_connection_t *, const char 
*, int)) {
   assert(conn);
 
   if (pni_raw_wdrained(conn)) return;
@@ -578,7 +578,8 @@ void pni_raw_write(pn_raw_connection_t *conn, int sock, 
long (*send)(int, const
     assert(conn->wbuffers[p-1].type == buff_unwritten);
     char *bytes = 
conn->wbuffers[p-1].bytes+conn->wbuffers[p-1].offset+conn->unwritten_offset;
     size_t s = conn->wbuffers[p-1].size-conn->unwritten_offset;
-    int r = send(sock,  bytes, s);
+    bool more = conn->wbuffers[p-1].next != 0;
+    int r = send(sock,  bytes, s, more);
     if (r < 0) {
       // Interrupted system call try again
       switch (errno) {
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
index 7378d7541..92cc2ad0f 100644
--- a/c/tests/raw_connection_test.cpp
+++ b/c/tests/raw_connection_test.cpp
@@ -87,10 +87,12 @@ namespace {
       ::shutdown(fd, SHUT_WR);
   }
 
-  long snd(int fd, const void* b, size_t s) {
+  long snd(int fd, const void* b, size_t s, bool more) {
     write_err = 0;
+    int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
+    if (more) flags |= MSG_MORE;
     if (max_send_size && max_send_size < s) s = max_send_size;
-    return ::send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+    return ::send(fd, b, s, flags);
   }
 
   int makepair(int fds[2]) {
@@ -164,7 +166,7 @@ namespace {
     return s;
   }
 
-  long snd(int fd, const void* b, size_t s){
+  long snd(int fd, const void* b, size_t s, bool /* more: unused */ ){
     CHECK(fd < buffers.size());
     write_err = 0;
     if (max_send_size && max_send_size < s) s = max_send_size;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to