pespin has submitted this change. ( 
https://gerrit.osmocom.org/c/libosmo-netif/+/38911?usp=email )

Change subject: stream_cli: Support destroy object within user callback
......................................................................

stream_cli: Support destroy object within user callback

This commit improves the code in stream_cli to allow the user calling
osmo_stream_cli_destroy() while in a callback.

For instance, the following scenario:
read_cb(len=0)
 osmo_stream_cli_reconnect()
  osmo_stream_cli_close()
   disconnect_cb
    [user calls osmo_stream_cli_destroy()]
 [popped stack does read_cb()]
   [user uses freed msg]

This allows using stream_cli by users handling destruction and reconnect
of stream_ci objects on their own, such as libosmo-abis
e1_input/sign_link stack.

Change-Id: I952938474fa2780bf3c906cbdffb2d024b03c1b7
---
M src/stream_cli.c
M tests/stream/stream_test.err
2 files changed, 149 insertions(+), 45 deletions(-)

Approvals:
  daniel: Looks good to me, approved
  Jenkins Builder: Verified
  laforge: Looks good to me, but someone else must approve




diff --git a/src/stream_cli.c b/src/stream_cli.c
index 9fc8101..88d5f9e 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -83,6 +83,11 @@
 #define OSMO_STREAM_CLI_F_RECONF       (1 << 0)
 #define OSMO_STREAM_CLI_F_NODELAY      (1 << 1)

+/* Mark whether the object is currently in a user callback. */
+#define IN_CB_MASK_CONNECT_CB          (1 << 0)
+#define IN_CB_MASK_DISCONNECT_CB       (1 << 1)
+#define IN_CB_MASK_READ_CB             (1 << 2)
+
 struct osmo_stream_cli {
        char *name;
        char sockname[OSMO_SOCK_NAME_MAXLEN];
@@ -114,12 +119,33 @@
        int                             flags;
        int                             reconnect_timeout;
        struct osmo_sock_init2_multiaddr_pars ma_pars;
+       uint8_t                         in_cb_mask; /* IN_CB_MASK_* */
+       bool                            delay_free;
 };

 /*! \addtogroup stream_cli
  *  @{
  */

+/* return true if freed */
+static inline bool free_delayed_if_needed(struct osmo_stream_cli *cli)
+{
+       /* Nobody requested delayed free, skip */
+       if (!cli->delay_free)
+               return false;
+       /* Check for other callbacks active in case we were e.g. in:
+       * read_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() 
--> [user] --> osmo_stream_client_destroy()
+       * or:
+       * connect_cb() -> [user] -> osmo_steam_client_close() -> 
disconnect_cb() --> [user] --> osmo_stream_client_destroy()
+       */
+       if (cli->in_cb_mask != 0)
+               return false;
+
+       LOGSCLI(cli, LOGL_DEBUG, "free(delayed)\n");
+       talloc_free(cli);
+       return true;
+}
+
 static void stream_cli_close_iofd(struct osmo_stream_cli *cli)
 {
        if (!cli->iofd)
@@ -140,18 +166,21 @@

 /*! Close an Osmocom Stream Client.
  *  \param[in] cli Osmocom Stream Client to be closed
+ *  \return true if stream was freed due to disconnect_cb, false otherwise
  *  We unregister the socket fd from the osmocom select() loop
  *  abstraction and close the socket */
-void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+static bool stream_cli_close(struct osmo_stream_cli *cli)
 {
        int old_state = cli->state;
+       LOGSCLI(cli, LOGL_DEBUG, "close()\n");

+       /* This guards against reentrant close through disconnect_cb(): */
        if (cli->state == STREAM_CLI_STATE_CLOSED)
-               return;
+               return false;
        if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) {
                osmo_timer_del(&cli->timer);
                cli->state = STREAM_CLI_STATE_CLOSED;
-               return;
+               return false;
        }


@@ -172,10 +201,45 @@
         * Also, if reconnect is disabled by user, notify the user that 
connect() failed: */
        if (old_state == STREAM_CLI_STATE_CONNECTED ||
            (old_state == STREAM_CLI_STATE_CONNECTING && cli->reconnect_timeout 
< 0)) {
-               LOGSCLI(cli, LOGL_DEBUG, "connection closed\n");
+               cli->in_cb_mask |= IN_CB_MASK_DISCONNECT_CB;
                if (cli->disconnect_cb)
                        cli->disconnect_cb(cli);
+               cli->in_cb_mask &= ~IN_CB_MASK_DISCONNECT_CB;
+               return free_delayed_if_needed(cli);
        }
+       return false;
+}
+
+/*! Close an Osmocom Stream Client.
+ *  \param[in] cli Osmocom Stream Client to be closed
+ *  We unregister the socket fd from the osmocom select() loop
+ *  abstraction and close the socket */
+void osmo_stream_cli_close(struct osmo_stream_cli *cli)
+{
+       stream_cli_close(cli);
+}
+
+/*! Re-connect an Osmocom Stream Client.
+ *  If re-connection is enabled for this client
+ *  (which is the case unless negative timeout was explicitly set via 
osmo_stream_cli_set_reconnect_timeout() call),
+ *  we close any existing connection (if any) and schedule a re-connect timer 
*/
+static bool stream_cli_reconnect(struct osmo_stream_cli *cli)
+{
+       bool freed = stream_cli_close(cli);
+
+       if (freed)
+               return true;
+
+       if (cli->reconnect_timeout < 0) {
+               LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n");
+               return false;
+       }
+
+       cli->state = STREAM_CLI_STATE_WAIT_RECONNECT;
+       LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n",
+               cli->reconnect_timeout);
+       osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0);
+       return false;
 }

 /*! Re-connect an Osmocom Stream Client.
@@ -184,17 +248,7 @@
  *  we close any existing connection (if any) and schedule a re-connect timer 
*/
 void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli)
 {
-       osmo_stream_cli_close(cli);
-
-       if (cli->reconnect_timeout < 0) {
-               LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n");
-               return;
-       }
-
-       cli->state = STREAM_CLI_STATE_WAIT_RECONNECT;
-       LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n",
-               cli->reconnect_timeout);
-       osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0);
+       stream_cli_reconnect(cli);
 }

 /*! Check if Osmocom Stream Client is in connected state.
@@ -247,12 +301,16 @@
        return cli->iofd;
 }

-static void stream_cli_read(struct osmo_stream_cli *cli)
+/* Return true if read_cb caused a delayed_free, hence cli not available 
anymore. */
+static bool stream_cli_read(struct osmo_stream_cli *cli)
 {
        LOGSCLI(cli, LOGL_DEBUG, "message received\n");

+       cli->in_cb_mask |= IN_CB_MASK_READ_CB;
        if (cli->read_cb)
                cli->read_cb(cli);
+       cli->in_cb_mask &= ~IN_CB_MASK_READ_CB;
+       return free_delayed_if_needed(cli);
 }

 static int stream_cli_write(struct osmo_stream_cli *cli)
@@ -321,7 +379,7 @@
                        return 0;
                }
                msgb_free(msg);
-               osmo_stream_cli_reconnect(cli);
+               stream_cli_reconnect(cli);
                return 0;
        }

@@ -357,13 +415,13 @@

        if (ret < 0) {
                LOGSCLI(cli, LOGL_ERROR, "connect failed (%d)\n", res);
-               osmo_stream_cli_reconnect(cli);
+               stream_cli_reconnect(cli);
                return;
        }
        ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
        if (ret >= 0 && error > 0) {
                LOGSCLI(cli, LOGL_ERROR, "connect so_error (%d)\n", error);
-               osmo_stream_cli_reconnect(cli);
+               stream_cli_reconnect(cli);
                return;
        }

@@ -392,8 +450,11 @@
        default:
                break;
        }
+       cli->in_cb_mask |= IN_CB_MASK_CONNECT_CB;
        if (cli->connect_cb)
                cli->connect_cb(cli);
+       cli->in_cb_mask &= ~IN_CB_MASK_CONNECT_CB;
+       free_delayed_if_needed(cli);
 }

 static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what)
@@ -407,7 +468,8 @@
        case STREAM_CLI_STATE_CONNECTED:
                if (what & OSMO_FD_READ) {
                        LOGSCLI(cli, LOGL_DEBUG, "connected read\n");
-                       stream_cli_read(cli);
+                       if (stream_cli_read(cli) == true)
+                               break; /* cli (and hence ofd) freed, done. */
                }
                if (what & OSMO_FD_WRITE) {
                        LOGSCLI(cli, LOGL_DEBUG, "connected write\n");
@@ -457,6 +519,7 @@
 static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct 
msgb *msg)
 {
        struct osmo_stream_cli *cli  = osmo_iofd_get_data(iofd);
+       bool freed;

        switch (cli->state) {
        case STREAM_CLI_STATE_CONNECTING:
@@ -468,21 +531,29 @@
                case -EPIPE:
                case -ECONNRESET:
                        LOGSCLI(cli, LOGL_ERROR, "lost connection with srv 
(%d)\n", res);
-                       osmo_stream_cli_reconnect(cli);
+                       freed = stream_cli_reconnect(cli);
                        break;
                case 0:
                        LOGSCLI(cli, LOGL_NOTICE, "connection closed with 
srv\n");
-                       osmo_stream_cli_reconnect(cli);
+                       freed = stream_cli_reconnect(cli);
                        break;
                default:
                        LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from 
srv\n", res);
+                       freed = false;
                        break;
                }
+               if (freed)
+                       return; /* msg was also freed as part of the talloc 
tree. */
                /* Notify user of new data or error: */
-               if (cli->iofd_read_cb)
-                       cli->iofd_read_cb(cli, res, msg);
-               else
+               if (!cli->iofd_read_cb) {
                        msgb_free(msg);
+                       return;
+               }
+               cli->in_cb_mask |= IN_CB_MASK_READ_CB;
+               cli->iofd_read_cb(cli, res, msg);
+               cli->in_cb_mask &= ~IN_CB_MASK_READ_CB;
+               OSMO_ASSERT(cli->in_cb_mask == 0);
+               free_delayed_if_needed(cli);
                break;
        default:
                osmo_panic("%s() called with unexpected state %d\n", __func__, 
cli->state);
@@ -499,8 +570,8 @@
                break;
        case STREAM_CLI_STATE_CONNECTED:
                if (msg && res <= 0) {
-                       osmo_stream_cli_reconnect(cli);
                        LOGSCLI(cli, LOGL_ERROR, "received error %d in response 
to send\n", res);
+                       stream_cli_reconnect(cli);
                }
                break;
        default:
@@ -519,6 +590,7 @@
 static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, 
struct msgb *msg, const struct msghdr *msgh)
 {
        struct osmo_stream_cli *cli  = osmo_iofd_get_data(iofd);
+       bool freed;

        res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);

@@ -532,20 +604,28 @@
                case -EPIPE:
                case -ECONNRESET:
                        LOGSCLI(cli, LOGL_ERROR, "lost connection with srv 
(%d)\n", res);
-                       osmo_stream_cli_reconnect(cli);
+                       freed = stream_cli_reconnect(cli);
                        break;
                case 0:
                        LOGSCLI(cli, LOGL_NOTICE, "connection closed with 
srv\n");
-                       osmo_stream_cli_reconnect(cli);
+                       freed = stream_cli_reconnect(cli);
                        break;
                default:
+                       freed = false;
                        break;
                }
+               if (freed)
+                       return; /* msg was also freed as part of the talloc 
tree. */
                /* Notify user of new data or error: */
-               if (cli->iofd_read_cb)
-                       cli->iofd_read_cb(cli, res, msg);
-               else
+               if (!cli->iofd_read_cb) {
                        msgb_free(msg);
+                       return;
+               }
+               cli->in_cb_mask |= IN_CB_MASK_READ_CB;
+               cli->iofd_read_cb(cli, res, msg);
+               cli->in_cb_mask &= ~IN_CB_MASK_READ_CB;
+               OSMO_ASSERT(cli->in_cb_mask == 0);
+               free_delayed_if_needed(cli);
                break;
        default:
                osmo_panic("%s() called with unexpected state %d\n", __func__, 
cli->state);
@@ -860,10 +940,18 @@
  *  \param[in] cli Stream Client to destroy */
 void osmo_stream_cli_destroy(struct osmo_stream_cli *cli)
 {
-       osmo_stream_cli_close(cli);
+       LOGSCLI(cli, LOGL_DEBUG, "destroy()\n");
+       OSMO_ASSERT(!stream_cli_close(cli));
        osmo_timer_del(&cli->timer);
        msgb_queue_free(&cli->tx_queue);
-       talloc_free(cli);
+       /* if we are in a user callback, delay freeing. */
+       if (cli->in_cb_mask != 0) {
+               LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", 
cli->in_cb_mask);
+               cli->delay_free = true;
+       } else {
+               LOGSCLI(cli, LOGL_DEBUG, "free(destroy)\n");
+               talloc_free(cli);
+       }
 }

 /*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or 
osmo_stream_cli_reconnect() instead!
@@ -877,8 +965,10 @@
        int ret;

        /* we are reconfiguring this socket, close existing first. */
-       if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0)
-               osmo_stream_cli_close(cli);
+       if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) {
+               if (stream_cli_close(cli) == true)
+                       return -ENAVAIL; /* freed */
+       }

        cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;

@@ -904,7 +994,7 @@
        if (ret < 0) {
                LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error 
(%d)\n", ret);
                if (reconnect)
-                       osmo_stream_cli_reconnect(cli);
+                       stream_cli_reconnect(cli);
                return ret;
        }
        osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, 
osmo_stream_cli_fd_cb, cli, 0);
@@ -1031,8 +1121,10 @@
        unsigned int local_addrcnt;

        /* we are reconfiguring this socket, close existing first. */
-       if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && 
osmo_stream_cli_get_fd(cli) >= 0)
-               osmo_stream_cli_close(cli);
+       if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && 
osmo_stream_cli_get_fd(cli) >= 0) {
+               if (stream_cli_close(cli) == true)
+                       return -ENAVAIL; /* freed */
+       }

        cli->flags &= ~OSMO_STREAM_CLI_F_RECONF;

@@ -1075,7 +1167,7 @@

        if (ret < 0) {
                LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error 
(%d)\n", ret);
-               osmo_stream_cli_reconnect(cli);
+               stream_cli_reconnect(cli);
                return ret;
        }

@@ -1239,7 +1331,7 @@
                        LOGSCLI(cli, LOGL_ERROR, "lost connection with srv 
(%d)\n", errno);
                else
                        LOGSCLI(cli, LOGL_ERROR, "recv failed (%d)\n", errno);
-               osmo_stream_cli_reconnect(cli);
+               stream_cli_reconnect(cli);
                return ret;
        } else if (ret == 0) {
                LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n");
diff --git a/tests/stream/stream_test.err b/tests/stream/stream_test.err
index 5180409..8950706 100644
--- a/tests/stream/stream_test.err
+++ b/tests/stream/stream_test.err
@@ -1,3 +1,4 @@
+CLICONN(cli_test,){CLOSED} close()
 SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection 
established

@@ -32,7 +33,7 @@
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connected 
read
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} message 
received
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connection 
closed with srv
-CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection 
closed
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){WAIT_RECONNECT} 
retrying reconnect in 9 seconds...

 {11.000008} autoreconnecting test step 4 [client OK, server OK], FD reg 0
@@ -41,7 +42,7 @@
 {11.000009} autoreconnecting test step 3 [client OK, server OK], FD reg 1
 SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection 
established
-CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection 
closed
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()

 {11.000010} autoreconnecting test step 2 [client OK, server OK], FD reg 0
 SRVCONN(srv_test,r=127.0.0.1:8976<->l=127.0.0.11:1111) connected read/write 
(what=0x1)
@@ -49,6 +50,10 @@
 SRVCONN(srv_test,r=127.0.0.1:8976<->l=127.0.0.11:1111) connection closed with 
client

 {11.000011} autoreconnecting test step 1 [client OK, server NA], FD reg 0
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} destroy()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} close()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} free(destroy)
+CLICONN(cli_test,){CLOSED} close()
 SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8976
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTING} connection 
established

@@ -83,10 +88,13 @@
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connected 
read
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} message 
received
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} connection 
closed with srv
-CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} connection 
closed
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CONNECTED} close()
 CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} not 
reconnecting, disabled

 {20.000019} non-reconnecting test step 0 [client OK, server OK], FD reg 0
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} destroy()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} close()
+CLICONN(cli_test,r=127.0.0.11:1111<->l=127.0.0.1:8976){CLOSED} free(destroy)
 SRV(srv_link_test,127.0.0.11:1111) accept()ed new link from 127.0.0.1:8977
 CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTING} connection 
established
 SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 24 bytes from client
@@ -98,7 +106,9 @@
 SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client
 SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client
 SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1111) received 10 bytes from client
-CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CLOSED} connection closed
+CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTED} destroy()
+CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CONNECTED} close()
+CLICONN(,r=127.0.0.11:1111<->l=127.0.0.1:8977){CLOSED} free(destroy)
 SRV(srv_link_test,127.0.0.11:1112) accept()ed new link from 127.0.0.1:8977
 CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTING} connection 
established
 SRVCONN(,r=127.0.0.1:8977<->l=127.0.0.11:1112) connected write
@@ -112,4 +122,6 @@
 CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes 
from srv
 CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes 
from srv
 CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} received 10 bytes 
from srv
-CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CLOSED} connection closed
+CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} destroy()
+CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CONNECTED} close()
+CLICONN(,r=127.0.0.11:1112<->l=127.0.0.1:8977){CLOSED} free(destroy)

--
To view, visit https://gerrit.osmocom.org/c/libosmo-netif/+/38911?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I952938474fa2780bf3c906cbdffb2d024b03c1b7
Gerrit-Change-Number: 38911
Gerrit-PatchSet: 6
Gerrit-Owner: pespin <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: daniel <[email protected]>
Gerrit-Reviewer: fixeria <[email protected]>
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: osmith <[email protected]>
Gerrit-Reviewer: pespin <[email protected]>

Reply via email to