pespin has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/osmo-pcap/+/39367?usp=email )


Change subject: server: Implement non-blocking write to pcap file with osmo_io
......................................................................

server: Implement non-blocking write to pcap file with osmo_io

Actual zero-copy msgb passing from read tcp socket will be implemented
in follow-up patches.

Change-Id: I098a9455a2a4cc626444e6fc13aa88c4cc9694f0
Related: SYS#7080
---
M include/osmo-pcap/osmo_pcap_server.h
M src/osmo_pcap_wr_file.c
M src/osmo_server_core.c
3 files changed, 171 insertions(+), 39 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmo-pcap refs/changes/67/39367/1

diff --git a/include/osmo-pcap/osmo_pcap_server.h 
b/include/osmo-pcap/osmo_pcap_server.h
index 9b93888..e71096b 100644
--- a/include/osmo-pcap/osmo_pcap_server.h
+++ b/include/osmo-pcap/osmo_pcap_server.h
@@ -29,6 +29,7 @@

 #include <pcap.h>

+#include <osmocom/core/osmo_io.h>
 #include <osmocom/core/select.h>
 #include <osmocom/core/linuxlist.h>
 #include <osmocom/core/write_queue.h>
@@ -73,20 +74,30 @@
        SERVER_CTR_NOCLIENT,
 };

+struct osmo_pcap_wr_file;
+typedef void (*osmo_pcap_wr_file_flush_completed_cb_t)(struct 
osmo_pcap_wr_file *wrf, void *data);
 struct osmo_pcap_wr_file {
+       struct llist_head entry; /* entry into 
(osmo_pcap_conn)->wrf_flushing_list */
        void *data; /* user backpointer */
        /* canonicalized absolute pathname of pcap file we write to */
        char *filename;
        /* file descriptor of the file we write to */
-       int local_fd;
+       struct osmo_io_fd *local_iofd;
        /* Current write offset of the file we write to (local_fd) */
        off_t wr_offset;
+       /* Number of bytes confirmed to be written, <=wr_offset */
+       off_t wr_completed;
+       osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb;
 };
 struct osmo_pcap_wr_file *osmo_pcap_wr_file_alloc(void *ctx, void *data);
 void osmo_pcap_wr_file_free(struct osmo_pcap_wr_file *wrf);
+void osmo_pcap_wr_file_set_flush_completed_cb(struct osmo_pcap_wr_file *wrf, 
osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb);
 int osmo_pcap_wr_file_open(struct osmo_pcap_wr_file *wrf, const char 
*filename, mode_t mode);
 void osmo_pcap_wr_file_close(struct osmo_pcap_wr_file *wrf);
-int osmo_pcap_wr_file_write(struct osmo_pcap_wr_file *wrf, const uint8_t 
*data, size_t len);
+int osmo_pcap_wr_file_write_msgb(struct osmo_pcap_wr_file *wrf, struct msgb 
*msg);
+bool osmo_pcap_wr_file_has_pending_writes(const struct osmo_pcap_wr_file *wrf);
+int osmo_pcap_wr_file_flush(struct osmo_pcap_wr_file *wrf, struct llist_head 
*wrf_flushing_list);
+bool osmo_pcap_wr_file_is_flushing(const struct osmo_pcap_wr_file *wrf);
 void osmo_pcap_wr_file_move_to_dir(struct osmo_pcap_wr_file *wrf, const char 
*dst_dirpath);

 struct osmo_pcap_conn {
@@ -103,6 +114,9 @@
        /* Remote connection */
        struct osmo_stream_srv *srv;
        struct osmo_pcap_wr_file *wrf;
+       /* list of osmo_pcap_wr_file->entry.
+        * wrf which we want to close but still have pending writes to be 
completed */
+       struct llist_head wrf_flushing_list;

        /* pcap stuff */
        enum osmo_pcap_fmt file_fmt;
diff --git a/src/osmo_pcap_wr_file.c b/src/osmo_pcap_wr_file.c
index a66e5e2..418a129 100644
--- a/src/osmo_pcap_wr_file.c
+++ b/src/osmo_pcap_wr_file.c
@@ -1,5 +1,5 @@
 /*
- * Write to a file
+ * Asynchronous non-blocking write to a file
  *
  * (C) 2025 by sysmocom - s.f.m.c. GmbH <[email protected]>
  * All Rights Reserved
@@ -32,14 +32,49 @@
 #include <osmo-pcap/common.h>
 #include <osmo-pcap/osmo_pcap_server.h>

+static void local_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb 
*msg)
+{
+       struct osmo_pcap_wr_file *wrf = osmo_iofd_get_data(iofd);
+
+       LOGP(DSERVER, LOGL_DEBUG, "%s: write_cb(%s, %d)\n", 
osmo_iofd_get_name(iofd), wrf->filename, res);
+       if (res <= 0) {
+               LOGP(DSERVER, LOGL_ERROR, "%s: Failed writing to file path='%s' 
fd=%d: %s (%d)\n",
+                    osmo_iofd_get_name(iofd), wrf->filename, 
osmo_iofd_get_fd(iofd), strerror(-res), res);
+               /* Trigger cb to tell user to free it, even if it was not being 
flushed.
+                * Special attention must be kept at the user regarding this 
code path, ie.
+                * user can't assume the werf was actually in flushing state...
+                */
+               if (wrf->flush_completed_cb)
+                       wrf->flush_completed_cb(wrf, wrf->data);
+               /* wrf may be freed here. */
+               return;
+       }
+
+       wrf->wr_completed += res;
+
+       if (osmo_pcap_wr_file_is_flushing(wrf)) {
+               if (!osmo_pcap_wr_file_has_pending_writes(wrf)) {
+                       LOGP(DSERVER, LOGL_DEBUG, "%s: closing now after 
completed data write path='%s' fd=%d\n",
+                            osmo_iofd_get_name(iofd), wrf->filename, 
osmo_iofd_get_fd(iofd));
+                       if (wrf->flush_completed_cb)
+                               wrf->flush_completed_cb(wrf, wrf->data);
+                       /* wrf may be freed here. */
+                       return;
+               }
+       }
+}
+
 struct osmo_pcap_wr_file *osmo_pcap_wr_file_alloc(void *ctx, void *data)
 {
        struct osmo_pcap_wr_file *wrf = talloc_zero(ctx, struct 
osmo_pcap_wr_file);
        OSMO_ASSERT(wrf);

+       /* Initialize entry so that we can know whether we are included in a
+        * list in osmo_pcap_wr_file_is_flushing(): */
+       INIT_LLIST_HEAD(&wrf->entry);
        wrf->data = data;
-       wrf->local_fd = -1;
        wrf->wr_offset = 0;
+       wrf->wr_completed = 0;

        return wrf;
 }
@@ -49,22 +84,43 @@
        if (!wrf)
                return;
        osmo_pcap_wr_file_close(wrf);
+       if (osmo_pcap_wr_file_is_flushing(wrf))
+               llist_del(&wrf->entry);
        talloc_free(wrf);
 }

+void osmo_pcap_wr_file_set_flush_completed_cb(struct osmo_pcap_wr_file *wrf, 
osmo_pcap_wr_file_flush_completed_cb_t flush_completed_cb)
+{
+       wrf->flush_completed_cb = flush_completed_cb;
+}
+
 int osmo_pcap_wr_file_open(struct osmo_pcap_wr_file *wrf, const char 
*filename, mode_t mode)
 {
+       struct osmo_io_ops ioops = {
+               .read_cb = NULL,
+               .write_cb = local_iofd_write_cb,
+       };
        int rc;
        OSMO_ASSERT(filename);
-       OSMO_ASSERT(wrf->local_fd == -1);
+       OSMO_ASSERT(wrf->local_iofd == NULL);

-       rc = open(filename, O_CREAT|O_WRONLY|O_TRUNC, mode);
+       rc = open(filename, O_CREAT|O_WRONLY|O_TRUNC|O_NONBLOCK, mode);
        if (rc < 0) {
                LOGP(DSERVER, LOGL_ERROR, "Failed to open file '%s': %s\n",
                     filename, strerror(errno));
                return rc;
        }
-       wrf->local_fd = rc;
+
+       wrf->local_iofd = osmo_iofd_setup(wrf, rc, filename, 
OSMO_IO_FD_MODE_READ_WRITE,
+                                         &ioops, wrf);
+       if (!wrf->local_iofd)
+               return -EBADFD;
+       if (osmo_iofd_register(wrf->local_iofd, -1) < 0) {
+               osmo_iofd_free(wrf->local_iofd);
+               wrf->local_iofd = NULL;
+               return -ENAVAIL;
+       }
+
        wrf->filename = talloc_strdup(wrf, filename);
        OSMO_ASSERT(wrf->filename);
        return rc;
@@ -72,26 +128,55 @@

 void osmo_pcap_wr_file_close(struct osmo_pcap_wr_file *wrf)
 {
-       if (wrf->local_fd > 0) {
-               close(wrf->local_fd);
-               wrf->local_fd = -1;
-       }
+       osmo_iofd_free(wrf->local_iofd);
+       wrf->local_iofd = NULL;
 }

-int osmo_pcap_wr_file_write(struct osmo_pcap_wr_file *wrf, const uint8_t 
*data, size_t len)
+int osmo_pcap_wr_file_write_msgb(struct osmo_pcap_wr_file *wrf, struct msgb 
*msg)
 {
-       int rc = write(wrf->local_fd, data, len);
-       if (rc >= 0) {
-               wrf->wr_offset += rc;
-               if (rc != len) {
-                       LOGP(DSERVER, LOGL_ERROR, "Short write '%s': ret %d != 
%zu\n",
-                            wrf->filename, rc, len);
-                       return -1;
-               }
-       }
+       int rc = osmo_iofd_write_msgb(wrf->local_iofd, msg);
+       if (rc < 0)
+               return rc;
+       wrf->wr_offset += msgb_length(msg);
        return rc;
 }

+bool osmo_pcap_wr_file_has_pending_writes(const struct osmo_pcap_wr_file *wrf)
+{
+       return wrf->wr_completed < wrf->wr_offset;
+}
+
+/* Mark the wrf as done writing to it. It will be closed and freed
+ * asynchronously when all data has been written to it.
+ * wrf may be freed during the call to this function, so don't use it anymore. 
*/
+int osmo_pcap_wr_file_flush(struct osmo_pcap_wr_file *wrf, struct llist_head 
*wrf_flushing_list)
+{
+       if (osmo_pcap_wr_file_is_flushing(wrf)) {
+               LOGP(DSERVER, LOGL_ERROR, "Trying to flush a file which was 
already being flushed: '%s'\n",
+                    wrf->filename);
+               return -EINVAL;
+       }
+
+       if (!osmo_pcap_wr_file_has_pending_writes(wrf)) {
+               if (wrf->flush_completed_cb)
+                       wrf->flush_completed_cb(wrf, wrf->data);
+               /* wrf may be freed here. */
+               return 0;
+       }
+
+       /* Put it in the flushing list, it will be closed freed once pending 
writes complete. */
+       llist_add_tail(&wrf->entry, wrf_flushing_list);
+       return 0;
+}
+
+/* whether we finished pushing more data to the wrf and we are waiting for it 
to
+ * finish writing before closing:
+ */
+bool osmo_pcap_wr_file_is_flushing(const struct osmo_pcap_wr_file *wrf)
+{
+       return !llist_empty(&wrf->entry);
+}
+
 /* Move file from current dir to dst_dirpath, and updates wrf->filename to 
point to new location. */
 void osmo_pcap_wr_file_move_to_dir(struct osmo_pcap_wr_file *wrf, const char 
*dst_dirpath)
 {
diff --git a/src/osmo_server_core.c b/src/osmo_server_core.c
index 782ead9..bc8c08c 100644
--- a/src/osmo_server_core.c
+++ b/src/osmo_server_core.c
@@ -116,6 +116,35 @@
                        0);
 }

+/* wrf has written all data and can safely be closed, rotated, etc. */
+static void osmo_pcap_wr_file_flush_completed_cb(struct osmo_pcap_wr_file 
*wrf, void *data)
+{
+       struct osmo_pcap_conn *conn = data;
+
+       if (wrf->wr_completed < wrf->wr_offset) {
+               LOGP(DSERVER, LOGL_NOTICE, "%s: Closing file with pending 
writes (%zu completed bytes < %zu wrote bytes)\n",
+                    wrf->filename, wrf->wr_completed, wrf->wr_offset);
+       }
+
+       if (!osmo_pcap_wr_file_is_flushing(wrf)) {
+               /* If it is not flushing, it probably is still assigned to conn;
+                * unassign it: */
+               if (conn->wrf == wrf)
+                       conn->wrf = NULL;
+       }
+
+       osmo_pcap_wr_file_close(wrf);
+
+       if (conn->server->completed_path)
+               osmo_pcap_wr_file_move_to_dir(wrf, 
conn->server->completed_path);
+
+       osmo_pcap_conn_event(conn, "closingtracefile", wrf->filename);
+       rate_ctr_inc2(conn->ctrg, PEER_CTR_PROTATE);
+       rate_ctr_inc2(conn->server->ctrg, SERVER_CTR_PROTATE);
+
+       osmo_pcap_wr_file_free(wrf);
+}
+
 static inline size_t calc_data_max_len(const struct osmo_pcap_server *server)
 {
        size_t data_max_len;
@@ -168,6 +197,7 @@
                return NULL;
        }

+       INIT_LLIST_HEAD(&conn->wrf_flushing_list);
        conn->data_max_len = calc_data_max_len(server);
        conn->data = talloc_zero_size(conn, sizeof(struct osmo_pcap_data) + 
conn->data_max_len);
        /* a bit nasty. we do not work with ids but names */
@@ -227,7 +257,11 @@

 void osmo_pcap_conn_free(struct osmo_pcap_conn *conn)
 {
+       struct osmo_pcap_wr_file *wrf;
        osmo_pcap_conn_close(conn);
+       /* We are freeing, make sure all files are processed even if we may be 
lossing some data... */
+       while ((wrf = llist_first_entry_or_null(&conn->wrf_flushing_list, 
struct osmo_pcap_wr_file, entry)))
+               osmo_pcap_wr_file_flush_completed_cb(wrf, conn);
        llist_del(&conn->entry);
        talloc_free(conn);
 }
@@ -237,16 +271,8 @@
        if (!conn->wrf)
                return;

-       osmo_pcap_wr_file_close(conn->wrf);
-
-       if (conn->server->completed_path)
-               osmo_pcap_wr_file_move_to_dir(conn->wrf, 
conn->server->completed_path);
-
-       osmo_pcap_conn_event(conn, "closingtracefile", conn->wrf->filename);
-       rate_ctr_inc2(conn->ctrg, PEER_CTR_PROTATE);
-       rate_ctr_inc2(conn->server->ctrg, SERVER_CTR_PROTATE);
-
-       osmo_pcap_wr_file_free(conn->wrf);
+       osmo_pcap_wr_file_flush(conn->wrf, &conn->wrf_flushing_list);
+       /* conn->wrf may have been freed or moved to conn->wrf_flushing_list: */
        conn->wrf = NULL;
 }

@@ -298,9 +324,6 @@
        /* omit any storing/creation of the file */
        if (!conn->store) {
                update_last_write(conn, now);
-               /* TODO: Once we support async write, we'll want to schedule 
close here instead of freeing: */
-               osmo_pcap_wr_file_free(conn->wrf);
-               conn->wrf = NULL;
                return;
        }

@@ -323,16 +346,21 @@
        }

        conn->wrf = osmo_pcap_wr_file_alloc(conn, conn);
+       osmo_pcap_wr_file_set_flush_completed_cb(conn->wrf, 
osmo_pcap_wr_file_flush_completed_cb);
        rc = osmo_pcap_wr_file_open(conn->wrf, curr_filename, 
conn->server->permission_mask);
        talloc_free(curr_filename);
        if (rc < 0)
                return;

-       rc = osmo_pcap_wr_file_write(conn->wrf, conn->file_hdr, 
conn->file_hdr_len);
-       if (rc != conn->file_hdr_len) {
+       /* TODO: get msgb from conn object stored: */
+       struct msgb *msg = msgb_alloc_c(conn->wrf, conn->file_hdr_len, 
"local_iofd_hdr");
+       memcpy(msgb_put(msg, conn->file_hdr_len), conn->file_hdr, 
conn->file_hdr_len);
+
+       rc = osmo_pcap_wr_file_write_msgb(conn->wrf, msg);
+       if (rc < 0) {
                LOGP(DSERVER, LOGL_ERROR, "Failed to write the header: %d\n", 
errno);
-               osmo_pcap_wr_file_free(conn->wrf);
-               conn->wrf = NULL;
+               msgb_free(msg);
+               osmo_pcap_conn_close_trace(conn);
                return;
        }
        update_last_write(conn, now);
@@ -512,9 +540,14 @@
        if (!check_restart_pcap_max_size(conn, len))
                check_restart_pcap_localtime(conn, now);

-       rc = osmo_pcap_wr_file_write(conn->wrf, data, len);
+       /* TODO: get msgb from caller: */
+       struct msgb *msg = msgb_alloc_c(conn->wrf, len, "local_iofd_msg");
+       memcpy(msgb_put(msg, len), data, len);
+
+       rc = osmo_pcap_wr_file_write_msgb(conn->wrf, msg);
        if (rc < 0) {
                LOGP(DSERVER, LOGL_ERROR, "%s: Failed writing to file\n", 
conn->name);
+               msgb_free(msg);
                return -1;
        }
        update_last_write(conn, now);

--
To view, visit https://gerrit.osmocom.org/c/osmo-pcap/+/39367?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: osmo-pcap
Gerrit-Branch: master
Gerrit-Change-Id: I098a9455a2a4cc626444e6fc13aa88c4cc9694f0
Gerrit-Change-Number: 39367
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <[email protected]>

Reply via email to