pespin has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/libosmo-netif/+/38956?usp=email )


Change subject: stream_srv: Add API 
osmo_stream_srv_link_set_tx_queue_max_length()
......................................................................

stream_srv: Add API osmo_stream_srv_link_set_tx_queue_max_length()

Change-Id: I3c2deac7f7be0cf838834135a548cce70367a905
---
M TODO-RELEASE
M include/osmocom/netif/stream.h
M src/stream_srv.c
3 files changed, 33 insertions(+), 7 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/56/38956/1

diff --git a/TODO-RELEASE b/TODO-RELEASE
index 74a0abb..268b142 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -8,4 +8,4 @@
 # If any interfaces have been removed or changed since the last public 
release: c:r:0.
 #library       what                    description / commit summary line
 libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), 
osmo_stream_srv_link_set_{ip_dscp,priority}()
-libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length()
+libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length(), 
osmo_stream_srv_link_set_tx_queue_max_length()
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index 3b3e04e..a132a27 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -81,6 +81,7 @@
 void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, 
osmo_stream_srv_link_accept_cb_t accept_cb);
 void osmo_stream_srv_link_set_data(struct osmo_stream_srv_link *link, void 
*data);
 void *osmo_stream_srv_link_get_data(struct osmo_stream_srv_link *link);
+int osmo_stream_srv_link_set_tx_queue_max_length(struct osmo_stream_srv_link 
*link, unsigned int size);
 char *osmo_stream_srv_link_get_sockname(const struct osmo_stream_srv_link 
*link);
 struct osmo_fd *osmo_stream_srv_link_get_ofd(struct osmo_stream_srv_link 
*link);
 int osmo_stream_srv_link_get_fd(const struct osmo_stream_srv_link *link);
diff --git a/src/stream_srv.c b/src/stream_srv.c
index aa7edbf..4f9323b 100644
--- a/src/stream_srv.c
+++ b/src/stream_srv.c
@@ -87,6 +87,7 @@
        osmo_stream_srv_link_accept_cb_t accept_cb;
        void                    *data;
        int                     flags;
+       unsigned int            tx_queue_max_length; /* Max amount of msgbs 
which can be enqueued */
        struct osmo_sock_init2_multiaddr_pars ma_pars;
 };

@@ -203,6 +204,7 @@
        link->sk_domain = AF_UNSPEC;
        link->sk_type = SOCK_STREAM;
        link->proto = IPPROTO_TCP;
+       link->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */
        osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, 
osmo_stream_srv_link_ofd_cb, link, 0);

        link->ma_pars.sctp.version = 0;
@@ -394,6 +396,18 @@
        return link->data;
 }

+/*! Set the maximum length queue of the stream servers accepted and allocated 
from this server link.
+ *  \param[in] link Stream Server Link to modify
+ *  \param[in] size maximum amount of msgbs which can be queued in the 
internal tx queue.
+ *  \returns 0 on success, negative on error.
+ *
+ *  The maximum length queue default value is 1024 msgbs. */
+int osmo_stream_srv_link_set_tx_queue_max_length(struct osmo_stream_srv_link 
*link, unsigned int size)
+{
+       link->tx_queue_max_length = size;
+       return 0;
+}
+
 /* Similar to osmo_sock_multiaddr_get_name_buf(), but aimed at listening 
sockets (only local part): */
 static char *get_local_sockname_buf(char *buf, size_t buf_len, const struct 
osmo_stream_srv_link *link)
 {
@@ -643,7 +657,8 @@
                struct osmo_fd                  ofd;
                struct osmo_io_fd               *iofd;
        };
-       struct llist_head               tx_queue;
+       struct llist_head               tx_queue; /* osmo_ofd mode (only): 
Queue of msgbs */
+       unsigned int                    tx_queue_count; /* osmo_ofd mode 
(only): Current amount of msgbs queued */
        osmo_stream_srv_closed_cb_t     closed_cb;
        osmo_stream_srv_read_cb_t       read_cb;
        osmo_stream_srv_read_cb2_t      iofd_read_cb;
@@ -769,12 +784,11 @@
        struct msgb *msg;
        int ret;

-       if (llist_empty(&conn->tx_queue)) {
+       msg = msgb_dequeue_count(&conn->tx_queue, &conn->tx_queue_count);
+       if (!msg) { /* done, tx_queue empty */
                osmo_fd_write_disable(&conn->ofd);
                return;
        }
-       msg = llist_first_entry(&conn->tx_queue, struct msgb, list);
-       llist_del(&msg->list);

        LOGSSRV(conn, LOGL_DEBUG, "sending %u bytes of data\n", msg->len);

@@ -811,6 +825,7 @@
                /* Update msgb and re-add it at the start of the queue: */
                msgb_pull(msg, ret);
                llist_add(&msg->list, &conn->tx_queue);
+               conn->tx_queue_count++;
                return;
        }

@@ -820,6 +835,7 @@
                if (err == EAGAIN) {
                        /* Re-add at the start of the queue to re-attempt: */
                        llist_add(&msg->list, &conn->tx_queue);
+                       conn->tx_queue_count++;
                        return;
                }
                msgb_free(msg);
@@ -923,6 +939,7 @@

        conn->mode = OSMO_STREAM_MODE_OSMO_IO;
        conn->srv = link;
+       conn->data = data;

        osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd);

@@ -939,7 +956,8 @@
                talloc_free(conn);
                return NULL;
        }
-       conn->data = data;
+
+       osmo_iofd_set_txqueue_max_length(conn->iofd, 
conn->srv->tx_queue_max_length);

        if (osmo_iofd_register(conn->iofd, fd) < 0) {
                LOGSSRV(conn, LOGL_ERROR, "could not register FD %d\n", fd);
@@ -1127,6 +1145,7 @@
                osmo_fd_unregister(&conn->ofd);
                close(conn->ofd.fd);
                msgb_queue_free(&conn->tx_queue);
+               conn->tx_queue_count = 0;
                conn->ofd.fd = -1;
                break;
        case OSMO_STREAM_MODE_OSMO_IO:
@@ -1158,7 +1177,12 @@

        switch (conn->mode) {
        case OSMO_STREAM_MODE_OSMO_FD:
-               msgb_enqueue(&conn->tx_queue, msg);
+               if (conn->tx_queue_count >= conn->srv->tx_queue_max_length) {
+                       LOGSSRV(conn, LOGL_ERROR, "send: tx queue full, 
dropping msg!\n");
+                       msgb_free(msg);
+                       return;
+               }
+               msgb_enqueue_count(&conn->tx_queue, msg, &conn->tx_queue_count);
                osmo_fd_write_enable(&conn->ofd);
                break;
        case OSMO_STREAM_MODE_OSMO_IO:
@@ -1249,6 +1273,7 @@
        switch (conn->mode) {
        case OSMO_STREAM_MODE_OSMO_FD:
                msgb_queue_free(&conn->tx_queue);
+               conn->tx_queue_count = 0;
                osmo_fd_write_disable(&conn->ofd);
                break;
        case OSMO_STREAM_MODE_OSMO_IO:

--
To view, visit https://gerrit.osmocom.org/c/libosmo-netif/+/38956?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I3c2deac7f7be0cf838834135a548cce70367a905
Gerrit-Change-Number: 38956
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <[email protected]>

Reply via email to