pespin has submitted this change. ( 
https://gerrit.osmocom.org/c/libosmo-netif/+/36574?usp=email )

Change subject: stream_{cli,srv}: Add 'res' param to read_cb2
......................................................................

stream_{cli,srv}: Add 'res' param to read_cb2

Notify user about read errors, similar to what is supported in the
earlier ofd cb backend of osmo_stream_cli/srv:
https://osmocom.org/issues/6405#note-15

Related: OS#6405
Fixes: 5fec34a9f20c3b8769373d1b28ae2062e5e2bdd6
Fixes: 0245cf5e07855abea72693272c55b50b5a93aff4
Change-Id: I395c75ff1e9904757ce1d767a9ac2f779593c4c8
---
M examples/ipa-stream-client.c
M examples/ipa-stream-server.c
M examples/stream-client.c
M examples/stream-server.c
M include/osmocom/netif/stream.h
M src/stream_cli.c
M src/stream_srv.c
M tests/stream/stream_test.c
8 files changed, 176 insertions(+), 58 deletions(-)

Approvals:
  jolly: Looks good to me, but someone else must approve
  osmith: Looks good to me, but someone else must approve
  fixeria: Looks good to me, but someone else must approve
  pespin: Looks good to me, approved
  Jenkins Builder: Verified




diff --git a/examples/ipa-stream-client.c b/examples/ipa-stream-client.c
index 720fe66..b58370e 100644
--- a/examples/ipa-stream-client.c
+++ b/examples/ipa-stream-client.c
@@ -102,13 +102,19 @@
        return 0;
 }

-static int read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
+static int read_cb(struct osmo_stream_cli *conn, int res, struct msgb *msg)
 {
        int num;
        struct msg_sent *cur, *tmp, *found = NULL;

        LOGP(DIPATEST, LOGL_DEBUG, "received message from stream (payload 
len=%d)\n", msgb_length(msg));

+       if (res <= 0) {
+               LOGP(DIPATEST, LOGL_ERROR, "Event with no data! %d\n", res);
+               msgb_free(msg);
+               return 0;
+       }
+
        if (osmo_ipa_process_msg(msg) < 0) {
                LOGP(DIPATEST, LOGL_ERROR, "bad IPA message\n");
                msgb_free(msg);
diff --git a/examples/ipa-stream-server.c b/examples/ipa-stream-server.c
index 5317921..e87eab4 100644
--- a/examples/ipa-stream-server.c
+++ b/examples/ipa-stream-server.c
@@ -47,8 +47,15 @@
        exit(EXIT_SUCCESS);
 }

-int read_cb(struct osmo_stream_srv *conn, struct msgb *msg)
+int read_cb(struct osmo_stream_srv *conn, int res, struct msgb *msg)
 {
+       if (res <= 0) {
+               LOGP(DSTREAMTEST, LOGL_ERROR, "cannot receive message (%d)\n", 
res);
+               msgb_free(msg);
+               osmo_stream_srv_destroy(conn);
+               return -EBADF;
+       }
+
        LOGP(DSTREAMTEST, LOGL_DEBUG, "received message from stream (payload 
len=%d)\n", msgb_length(msg));

        osmo_ipa_msg_push_headers(msg, osmo_ipa_msgb_cb_proto(msg), 
osmo_ipa_msgb_cb_proto_ext(msg));
diff --git a/examples/stream-client.c b/examples/stream-client.c
index cae0153..6d20263 100644
--- a/examples/stream-client.c
+++ b/examples/stream-client.c
@@ -54,10 +54,17 @@
        return 0;
 }

-static int read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
+static int read_cb(struct osmo_stream_cli *conn, int res, struct msgb *msg)
 {
        LOGP(DSTREAMTEST, LOGL_NOTICE, "receiving message from stream... ");

+       if (res < 0) {
+               LOGPC(DSTREAMTEST, LOGL_ERROR, "cannot receive message (res = 
%d)\n", res);
+               msgb_free(msg);
+               return 0;
+       }
+
+
        LOGPC(DSTREAMTEST, LOGL_NOTICE, "got %d bytes: %s\n", msg->len, 
msgb_hexdump(msg));

        msgb_free(msg);
diff --git a/examples/stream-server.c b/examples/stream-server.c
index f6332dc..5295c2b 100644
--- a/examples/stream-server.c
+++ b/examples/stream-server.c
@@ -3,6 +3,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <errno.h>

 #include <osmocom/core/select.h>
 #include <osmocom/core/socket.h>
@@ -44,10 +45,20 @@
        signal(SIGINT, SIG_DFL);
 }

-int read_cb(struct osmo_stream_srv *conn, struct msgb *msg)
+int read_cb(struct osmo_stream_srv *conn, int res, struct msgb *msg)
 {
        LOGP(DSTREAMTEST, LOGL_NOTICE, "receiving message from stream... ");

+       if (res <= 0) {
+               if (res < 0)
+                       LOGPC(DSTREAMTEST, LOGL_ERROR, "cannot receive message: 
%s\n", strerror(-res));
+               else
+                       LOGPC(DSTREAMTEST, LOGL_ERROR, "client closed 
connection\n");
+               msgb_free(msg);
+               osmo_stream_srv_destroy(conn);
+               return -EBADF;
+       }
+
        LOGPC(DSTREAMTEST, LOGL_NOTICE, "got %d bytes: %s\n", msg->len, 
msgb_hexdump(msg));

        msgb_free(msg);
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index 398b277..3c4ec7e 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -102,7 +102,14 @@

 typedef int (*osmo_stream_srv_read_cb_t)(struct osmo_stream_srv *conn);
 typedef int (*osmo_stream_srv_closed_cb_t)(struct osmo_stream_srv *conn);
-typedef int (*osmo_stream_srv_read_cb2_t)(struct osmo_stream_srv *conn, struct 
msgb *msg);
+
+/*! Completion call-back function when something was read from from the stream 
client socket.
+ * \param[in] conn Stream Server that got receive event.
+ * \param[in] res return value of the read()/recvmsg()/... call, or -errno in 
case of error.
+ * \param[in] msg message buffer containing the read data. Ownership is 
transferred to the
+ * call-back, and it must make sure to msgb_free() it eventually! */
+typedef int (*osmo_stream_srv_read_cb2_t)(struct osmo_stream_srv *conn, int 
res, struct msgb *msg);
+
 typedef int (*osmo_stream_srv_segmentation_cb_t)(struct msgb *msg);

 struct osmo_stream_srv *osmo_stream_srv_create(void *ctx, struct 
osmo_stream_srv_link *link, int fd,
@@ -172,7 +179,14 @@
 typedef int (*osmo_stream_cli_connect_cb_t)(struct osmo_stream_cli *cli);
 typedef int (*osmo_stream_cli_disconnect_cb_t)(struct osmo_stream_cli *cli);
 typedef int (*osmo_stream_cli_read_cb_t)(struct osmo_stream_cli *cli);
-typedef int (*osmo_stream_cli_read_cb2_t)(struct osmo_stream_cli *cli, struct 
msgb *msg);
+
+/*! Completion call-back function when something was read from from the stream 
client socket.
+ * \param[in] cli Stream Client that got receive event.
+ * \param[in] res return value of the read()/recvmsg()/... call, or -errno in 
case of error.
+ * \param[in] msg message buffer containing the read data. Ownership is 
transferred to the
+ * call-back, and it must make sure to msgb_free() it eventually! */
+typedef int (*osmo_stream_cli_read_cb2_t)(struct osmo_stream_cli *cli, int 
res, struct msgb *msg);
+
 typedef int (*osmo_stream_cli_segmentation_cb_t)(struct msgb *msg);

 void osmo_stream_cli_set_name(struct osmo_stream_cli *cli, const char *name);
diff --git a/src/stream_cli.c b/src/stream_cli.c
index 62ea03e..d4067d6 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -448,14 +448,23 @@
                stream_cli_handle_connecting(cli, res);
                break;
        case STREAM_CLI_STATE_CONNECTED:
-               if (res <= 0) {
-                       LOGSCLI(cli, LOGL_NOTICE, "received result %d in 
response to read\n", res);
+               switch (res) {
+               case -EPIPE:
+               case -ECONNRESET:
+                       LOGSCLI(cli, LOGL_ERROR, "lost connection with srv 
(%d)\n", res);
                        osmo_stream_cli_reconnect(cli);
-                       msgb_free(msg);
+                       break;
+               case 0:
+                       LOGSCLI(cli, LOGL_NOTICE, "connection closed with 
srv\n");
+                       osmo_stream_cli_reconnect(cli);
+                       break;
+               default:
+                       LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from 
srv\n", res);
                        break;
                }
+               /* Notify user of new data or error: */
                if (cli->iofd_read_cb)
-                       cli->iofd_read_cb(cli, msg);
+                       cli->iofd_read_cb(cli, res, msg);
                else
                        msgb_free(msg);
                break;
@@ -503,15 +512,22 @@
                stream_cli_handle_connecting(cli, res);
                break;
        case STREAM_CLI_STATE_CONNECTED:
-               if (res <= 0) {
-                       LOGSCLI(cli, LOGL_NOTICE, "received result %d in 
response to recvmsg\n", res);
+               switch (res) {
+               case -EPIPE:
+               case -ECONNRESET:
+                       LOGSCLI(cli, LOGL_ERROR, "lost connection with srv 
(%d)\n", res);
                        osmo_stream_cli_reconnect(cli);
-                       msgb_free(msg);
+                       break;
+               case 0:
+                       LOGSCLI(cli, LOGL_NOTICE, "connection closed with 
srv\n");
+                       osmo_stream_cli_reconnect(cli);
+                       break;
+               default:
                        break;
                }
-               /* Forward message to read callback, also if the connection 
failed. */
+               /* Notify user of new data or error: */
                if (cli->iofd_read_cb)
-                       cli->iofd_read_cb(cli, msg);
+                       cli->iofd_read_cb(cli, res, msg);
                else
                        msgb_free(msg);
                break;
diff --git a/src/stream_srv.c b/src/stream_srv.c
index 74193a4..dad6b7a 100644
--- a/src/stream_srv.c
+++ b/src/stream_srv.c
@@ -611,26 +611,31 @@
 static void stream_srv_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct 
msgb *msg)
 {
        struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
-       LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res);

-       if (OSMO_UNLIKELY(res <= 0)) {
-               /* This connection is dead, destroy it. */
-               msgb_free(msg);
-               osmo_stream_srv_destroy(conn);
-       } else {
-               if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
-                       LOGSSRV(conn, LOGL_INFO, "Connection is being flushed 
and closed; ignoring received message\n");
-                       msgb_free(msg);
-                       if (osmo_iofd_txqueue_len(iofd) == 0)
-                               osmo_stream_srv_destroy(conn);
-                       return;
-               }
-
-               if (conn->iofd_read_cb)
-                       conn->iofd_read_cb(conn, msg);
-               else
-                       msgb_free(msg);
+       switch (res) {
+       case -EPIPE:
+       case -ECONNRESET:
+               LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", 
res);
+               break;
+       case 0:
+               LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n");
+               break;
+       default:
+               LOGSSRV(conn, LOGL_DEBUG, "received %d bytes from client\n", 
res);
+               break;
        }
+       if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) {
+               LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and 
closed; ignoring received message\n");
+               msgb_free(msg);
+               if (osmo_iofd_txqueue_len(iofd) == 0)
+                       osmo_stream_srv_destroy(conn);
+               return;
+       }
+
+       if (conn->iofd_read_cb)
+               conn->iofd_read_cb(conn, res, msg);
+       else
+               msgb_free(msg);
 }

 static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct 
msgb *msg)
@@ -658,27 +663,32 @@
        LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res);

        res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
-       if (res == -EAGAIN)
-               return;

-       if (OSMO_UNLIKELY(res <= 0)) {
-               /* This connection is dead, destroy it. */
-               msgb_free(msg);
-               osmo_stream_srv_destroy(conn);
-       } else {
-               if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
-                       LOGSSRV(conn, LOGL_INFO, "Connection is being flushed 
and closed; ignoring received message\n");
-                       msgb_free(msg);
-                       if (osmo_iofd_txqueue_len(iofd) == 0)
-                               osmo_stream_srv_destroy(conn);
-                       return;
-               }
-
-               if (conn->iofd_read_cb)
-                       conn->iofd_read_cb(conn, msg);
-               else
-                       msgb_free(msg);
+       switch (res) {
+       case -EPIPE:
+       case -ECONNRESET:
+               LOGSSRV(conn, LOGL_ERROR, "lost connection with client (%d)\n", 
res);
+               break;
+       case 0:
+               LOGSSRV(conn, LOGL_NOTICE, "connection closed with client\n");
+               break;
+       default:
+               if (OSMO_LIKELY(res > 0))
+                       LOGSSRV(conn, LOGL_DEBUG, "received %u bytes from 
client\n", res);
+               break;
        }
+       if (OSMO_UNLIKELY(conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY)) {
+               LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and 
closed; ignoring received message\n");
+               msgb_free(msg);
+               if (osmo_iofd_txqueue_len(iofd) == 0)
+                       osmo_stream_srv_destroy(conn);
+               return;
+       }
+
+       if (conn->iofd_read_cb)
+               conn->iofd_read_cb(conn, res, msg);
+       else
+               msgb_free(msg);
 }

 static const struct osmo_io_ops srv_ioops_sctp = {
diff --git a/tests/stream/stream_test.c b/tests/stream/stream_test.c
index d73dd1d..6e849ca 100644
--- a/tests/stream/stream_test.c
+++ b/tests/stream/stream_test.c
@@ -502,12 +502,19 @@

 static struct osmo_timer_list fragmented_send_tl_cli;

-static int test_segm_ipa_stream_srv_cli_read_cb(struct osmo_stream_cli *osc, 
struct msgb *msg)
+static int test_segm_ipa_stream_srv_cli_read_cb(struct osmo_stream_cli *osc, 
int res, struct msgb *msg)
 {
        unsigned char *data;
        struct ipa_head *h = (struct ipa_head *) msg->l1h;
        uint8_t ipac_msg_type = *msg->data;
        struct msgb *reply;
+
+       if (res < 0) {
+               fprintf(stderr, "cannot receive message (res = %d)\n", res);
+               msgb_free(msg);
+               return -ENOMSG;
+       }
+
        LOGCLI(osc, "Received message from stream (payload len = %" PRIu16 
")\n", msgb_length(msg));
        if (ipac_msg_type < 0 || 5 < ipac_msg_type) {
                fprintf(stderr, "Received unexpected IPAC message type 
%"PRIu8"\n", ipac_msg_type);
@@ -566,7 +573,7 @@
        return osc;
 }

-int test_segm_ipa_stream_srv_srv_read_cb(struct osmo_stream_srv *conn, struct 
msgb *msg)
+int test_segm_ipa_stream_srv_srv_read_cb(struct osmo_stream_srv *conn, int 
res, struct msgb *msg)
 {
        static unsigned msgnum_srv = 0;
        struct ipa_head *ih = (struct ipa_head *)msg->l1h;
@@ -574,6 +581,16 @@
        struct msgb *m;
        uint8_t msgt;

+       if (res <= 0) {
+               if (res < 0)
+                       LOGSRV(conn, "cannot receive message: %s\n", 
strerror(-res));
+               else
+                       LOGSRV(conn, "client closed connection\n");
+               msgb_free(msg);
+               osmo_stream_srv_destroy(conn);
+               return -EBADF;
+       }
+
        LOGSRV(conn, "[%u-srv] Received IPA message from stream (payload len = 
%" PRIu16 ")\n",
               ++msgnum_srv, msgb_length(msg));
        LOGSRV(conn, "\tmsg buff data (including stripped headers): %s\n",
@@ -699,12 +716,26 @@
        osmo_timer_schedule(&fragmented_send_tl_srv_destroy, 0, 2);
 }

-int test_segm_ipa_stream_cli_srv_read_cb(struct osmo_stream_srv *conn, struct 
msgb *msg)
+int test_segm_ipa_stream_cli_srv_read_cb(struct osmo_stream_srv *conn, int 
res, struct msgb *msg)
 {
        unsigned char *data;
        struct ipa_head *h = (struct ipa_head *) msg->l1h;
-       uint8_t ipa_msg_type = ((uint8_t *)h)[sizeof(struct ipa_head)];
-       struct msgb *reply = msgb_alloc_headroom(128, 0, "IPA reply");
+       uint8_t ipa_msg_type;
+       struct msgb *reply;
+
+       if (res <= 0) {
+               if (res < 0)
+                       LOGSRV(conn, "cannot receive message: %s\n", 
strerror(-res));
+               else
+                       LOGSRV(conn, "client closed connection\n");
+               msgb_free(msg);
+               osmo_stream_srv_destroy(conn);
+               return -EBADF;
+       }
+
+       ipa_msg_type = ((uint8_t *)h)[sizeof(struct ipa_head)];
+
+       reply = msgb_alloc_headroom(128, 0, "IPA reply");
        if (reply == NULL) {
                fprintf(stderr, "Cannot allocate message\n");
                return -ENOMEM;
@@ -778,7 +809,7 @@

 static bool test_segm_ipa_stream_cli_all_msgs_processed = false;

-static int test_segm_ipa_stream_cli_cli_read_cb(struct osmo_stream_cli *osc, 
struct msgb *msg)
+static int test_segm_ipa_stream_cli_cli_read_cb(struct osmo_stream_cli *osc, 
int res, struct msgb *msg)
 {
        static unsigned msgnum_cli = 0;
        unsigned char *data;

--
To view, visit https://gerrit.osmocom.org/c/libosmo-netif/+/36574?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: libosmo-netif
Gerrit-Branch: master
Gerrit-Change-Id: I395c75ff1e9904757ce1d767a9ac2f779593c4c8
Gerrit-Change-Number: 36574
Gerrit-PatchSet: 5
Gerrit-Owner: pespin <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: fixeria <[email protected]>
Gerrit-Reviewer: jolly <[email protected]>
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: osmith <[email protected]>
Gerrit-Reviewer: pespin <[email protected]>
Gerrit-MessageType: merged

Reply via email to