pespin has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/libosmo-netif/+/38955?usp=email )


Change subject: stream_cli: Add API osmo_stream_cli_set_tx_queue_max_length()
......................................................................

stream_cli: Add API osmo_stream_cli_set_tx_queue_max_length()

Change-Id: I3935fb933fe6136d68a9403eebbaf2616c2e5578
---
M TODO-RELEASE
M include/osmocom/netif/stream.h
M src/stream_cli.c
3 files changed, 37 insertions(+), 6 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/libosmo-netif refs/changes/55/38955/1

diff --git a/TODO-RELEASE b/TODO-RELEASE
index 4f59788..74a0abb 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -8,3 +8,4 @@
 # If any interfaces have been removed or changed since the last public 
release: c:r:0.
 #library       what                    description / commit summary line
 libosmo-netif add API osmo_stream_cli_set_{ip_dscp,priority}(), 
osmo_stream_srv_link_set_{ip_dscp,priority}()
+libosmo-netif add API osmo-stream_cli_set_tx_queue_max_length()
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index 6edf915..3b3e04e 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -208,6 +208,7 @@
 void osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data);
 void osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int 
timeout);
 void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli);
+int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, 
unsigned int size);
 char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli);
 struct osmo_fd *osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli);
 int osmo_stream_cli_get_fd(const struct osmo_stream_cli *cli);
diff --git a/src/stream_cli.c b/src/stream_cli.c
index 88d5f9e..a00f32a 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -96,7 +96,9 @@
                struct osmo_fd                  ofd;
                struct osmo_io_fd               *iofd;
        };
-       struct llist_head               tx_queue;
+       struct llist_head               tx_queue; /* osmo_ofd mode (only): 
Queue of msgbs */
+       unsigned int                    tx_queue_count; /* osmo_ofd mode 
(only): Current amount of msgbs queued */
+       unsigned int                    tx_queue_max_length; /* Max amount of 
msgbs which can be enqueued */
        struct osmo_timer_list          timer;
        enum osmo_stream_cli_state      state;
        char                            *addr[OSMO_STREAM_MAX_ADDRS];
@@ -321,12 +323,11 @@
        struct msgb *msg;
        int ret;

-       if (llist_empty(&cli->tx_queue)) {
+       msg = msgb_dequeue_count(&cli->tx_queue, &cli->tx_queue_count);
+       if (!msg) { /* done, tx_queue empty */
                osmo_fd_write_disable(&cli->ofd);
                return 0;
        }
-       msg = llist_first_entry(&cli->tx_queue, struct msgb, list);
-       llist_del(&msg->list);

        if (!osmo_stream_cli_is_connected(cli)) {
                LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping 
data!\n");
@@ -367,6 +368,7 @@
                /* Update msgb and re-add it at the start of the queue: */
                msgb_pull(msg, ret);
                llist_add(&msg->list, &cli->tx_queue);
+               cli->tx_queue_count++;
                return 0;
        }

@@ -376,6 +378,7 @@
                if (err == EAGAIN) {
                        /* Re-add at the start of the queue to re-attempt: */
                        llist_add(&msg->list, &cli->tx_queue);
+                       cli->tx_queue_count++;
                        return 0;
                }
                msgb_free(msg);
@@ -510,6 +513,7 @@
        cli->reconnect_timeout = 5;     /* default is 5 seconds. */
        cli->segmentation_cb = NULL;
        INIT_LLIST_HEAD(&cli->tx_queue);
+       cli->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */

        cli->ma_pars.sctp.version = 0;

@@ -863,6 +867,24 @@
        return cli->data;
 }

+/*! Set the maximum length queue of the stream client.
+ *  \param[in] cli Stream Client to modify
+ *  \param[in] size maximum amount of msgbs which can be queued in the 
internal tx queue.
+ *  \returns 0 on success, negative on error.
+ *
+ *  The maximum length queue default value is 1024 msgbs. */
+int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, 
unsigned int size)
+{
+       cli->tx_queue_max_length = size;
+
+       if (cli->iofd) /* Otherwise, this will be done in 
osmo_stream_cli_open() */
+               osmo_iofd_set_txqueue_max_length(cli->iofd, 
cli->tx_queue_max_length);
+
+       /* XXX: Here, in OSMO_STREAM_MODE_OSMO_FD mode we could check current
+        * size of cli->tx_queue and shrink it from the front or back... */
+       return 0;
+}
+
 /*! Retrieve the stream client socket description.
  *  Calling this function will build a string that describes the socket in 
terms of its local/remote
  *  address/port.  The returned name is stored in a static buffer; it is hence 
not re-entrant or thread-safe.
@@ -944,6 +966,7 @@
        OSMO_ASSERT(!stream_cli_close(cli));
        osmo_timer_del(&cli->timer);
        msgb_queue_free(&cli->tx_queue);
+       cli->tx_queue_count = 0;
        /* if we are in a user callback, delay freeing. */
        if (cli->in_cb_mask != 0) {
                LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", 
cli->in_cb_mask);
@@ -1204,8 +1227,8 @@
                if (!cli->iofd)
                        goto error_close_socket;

+               osmo_iofd_set_txqueue_max_length(cli->iofd, 
cli->tx_queue_max_length);
                osmo_iofd_notify_connected(cli->iofd);
-
                configure_cli_segmentation_cb(cli, cli->segmentation_cb);
 
                if (osmo_iofd_register(cli->iofd, fd) < 0)
@@ -1253,7 +1276,12 @@

        switch (cli->mode) {
        case OSMO_STREAM_MODE_OSMO_FD:
-               msgb_enqueue(&cli->tx_queue, msg);
+               if (cli->tx_queue_count >= cli->tx_queue_max_length) {
+                       LOGSCLI(cli, LOGL_ERROR, "send: tx queue full, dropping 
msg!\n");
+                       msgb_free(msg);
+                       return;
+               }
+               msgb_enqueue_count(&cli->tx_queue, msg, &cli->tx_queue_count);
                osmo_fd_write_enable(&cli->ofd);
                break;
        case OSMO_STREAM_MODE_OSMO_IO:
@@ -1350,6 +1378,7 @@
        switch (cli->mode) {
        case OSMO_STREAM_MODE_OSMO_FD:
                msgb_queue_free(&cli->tx_queue);
+               cli->tx_queue_count = 0;
                /* If in state 'connecting', keep WRITE flag up to receive
                * socket connection signal and then transition to 
STATE_CONNECTED: */
                if (cli->state == STREAM_CLI_STATE_CONNECTED)

--
To view, visit https://gerrit.osmocom.org/c/libosmo-netif/+/38955?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I3935fb933fe6136d68a9403eebbaf2616c2e5578
Gerrit-Change-Number: 38955
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <[email protected]>

Reply via email to