laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/libosmocore/+/20296 )

Change subject: logging: Change stderr + file target to use non-blocking write
......................................................................

logging: Change stderr + file target to use non-blocking write

So far, we used blocking, buffered fwrite() to write to stderr
and file targets.  This causes problems if there are [slow] consumers
causing delays, such as gnome-terminal (when the program is started
interactively) or systemd/journald (where we observe 64..128ms blocks on
stderr).

This patch introduces stderr/file based logging via write_queue
and osmo_select_main(), i.e. switch from glibc-buffered, blocking
to internally buffered, non-blocking writes.

* when osmo_stderr_target is created via application.c, we create it
  in blocking stream mode for backwards compatibility, particularly
  for [smaller] programs that don't use osmo_select_main()

* when the VTY code encounters 'log stderr' or 'log file FILENAME',
  we switch that respective target to non-blocking write-queue mode,
  as this means the application is in fact using osmo_select_main()

* The config file can now state 'log stderr blocking-io' or
  'log file FILENAME blocking-io' to explicitly enforce using blocking
  stream based I/O

* The application can at any time use API functions to switch either way

Closes: OS#4311
Change-Id: Ia58fd78535c41b3da3aeb7733aadc785ace610da
---
M include/osmocom/core/logging.h
M src/logging.c
M src/vty/logging_vty.c
M tests/logging/logging_vty_test.c
4 files changed, 313 insertions(+), 22 deletions(-)

Approvals:
  Jenkins Builder: Verified
  pespin: Looks good to me, but someone else must approve
  osmith: Looks good to me, but someone else must approve
  laforge: Looks good to me, approved



diff --git a/include/osmocom/core/logging.h b/include/osmocom/core/logging.h
index 343f976..a554adc 100644
--- a/include/osmocom/core/logging.h
+++ b/include/osmocom/core/logging.h
@@ -297,8 +297,11 @@

        union {
                struct {
+                       /* direct, blocking output via stdio */
                        FILE *out;
                        const char *fname;
+                       /* indirect output via write_queue and 
osmo_select_main() */
+                       struct osmo_wqueue *wqueue;
                } tgt_file;

                struct {
@@ -408,6 +411,8 @@
 struct log_target *log_target_create_systemd(bool raw);
 void log_target_systemd_set_raw(struct log_target *target, bool raw);
 int log_target_file_reopen(struct log_target *tgt);
+int log_target_file_switch_to_stream(struct log_target *tgt);
+int log_target_file_switch_to_wqueue(struct log_target *tgt);
 int log_targets_reopen(void);

 void log_add_target(struct log_target *target);
diff --git a/src/logging.c b/src/logging.c
index 1d139d5..5c6b993 100644
--- a/src/logging.c
+++ b/src/logging.c
@@ -35,6 +35,7 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
+#include <unistd.h>

 #ifdef HAVE_STRINGS_H
 #include <strings.h>
@@ -57,6 +58,9 @@

 #include <time.h>
 #include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 #include <errno.h>
 #include <pthread.h>

@@ -65,12 +69,17 @@
 #include <osmocom/core/logging.h>
 #include <osmocom/core/timer.h>
 #include <osmocom/core/thread.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/write_queue.h>

 #include <osmocom/vty/logging.h>       /* for LOGGING_STR. */

 /* maximum length of the log string of a single log event (typically  line) */
 #define MAX_LOG_SIZE   4096

+/* maximum number of log statements we queue in file/stderr target write queue 
*/
+#define LOG_WQUEUE_LEN 1024
+
 osmo_static_assert(_LOG_CTX_COUNT <= ARRAY_SIZE(((struct 
log_context*)NULL)->ctx),
                   enum_logging_ctx_items_fit_in_struct_log_context);
 osmo_static_assert(_LOG_FLT_COUNT <= ARRAY_SIZE(((struct 
log_target*)NULL)->filter_data),
@@ -904,11 +913,32 @@
 }

 #if (!EMBEDDED)
+static void _file_output_stream(struct log_target *target, unsigned int level,
+                        const char *log)
+{
+       OSMO_ASSERT(target->tgt_file.out);
+       fputs(log, target->tgt_file.out);
+       fflush(target->tgt_file.out);
+}
+
+/* output via non-blocking write_queue, doing internal buffering */
 static void _file_output(struct log_target *target, unsigned int level,
                         const char *log)
 {
-       fprintf(target->tgt_file.out, "%s", log);
-       fflush(target->tgt_file.out);
+       int len = strlen(log);
+       struct msgb *msg;
+
+       OSMO_ASSERT(target->tgt_file.wqueue);
+       msg = msgb_alloc_c(target->tgt_file.wqueue, len, "log_file_msg");
+       if (!msg)
+               return;
+
+       /* we simply enqueue the log message to a write queue here, to avoid 
any blocking
+        * writes on the output file.  The write queue will tell us once the 
file is writable
+        * and call _file_wq_write_cb() */
+       memcpy(msg->data, log, len);
+       msgb_put(msg, len);
+       osmo_wqueue_enqueue_quiet(target->tgt_file.wqueue, msg);
 }
 #endif

@@ -972,7 +1002,7 @@

        target->type = LOG_TGT_TYPE_STDERR;
        target->tgt_file.out = stderr;
-       target->output = _file_output;
+       target->output = _file_output_stream;
        return target;
 #else
        return NULL;
@@ -980,11 +1010,28 @@
 }

 #if (!EMBEDDED)
-/*! Create a new file-based log target
+/* write-queue tells us we should write another msgb (log line) to the output 
fd */
+static int _file_wq_write_cb(struct osmo_fd *ofd, struct msgb *msg)
+{
+       int rc;
+
+       rc = write(ofd->fd, msgb_data(msg), msgb_length(msg));
+       if (rc < 0)
+               return rc;
+       if (rc != msgb_length(msg)) {
+               /* pull the number of bytes we have already written */
+               msgb_pull(msg, rc);
+               /* ask write_queue to re-insert the msgb at the head of the 
queue */
+               return -EAGAIN;
+       }
+       return 0;
+}
+
+/*! Create a new file-based log target using buffered, blocking stream output
  *  \param[in] fname File name of the new log file
  *  \returns Log target in case of success, NULL otherwise
  */
-struct log_target *log_target_create_file(const char *fname)
+struct log_target *log_target_create_file_stream(const char *fname)
 {
        struct log_target *target;

@@ -998,9 +1045,193 @@
                log_target_destroy(target);
                return NULL;
        }
+       target->output = _file_output_stream;
+       target->tgt_file.fname = talloc_strdup(target, fname);

+       return target;
+}
+
+/*! switch from non-blocking/write-queue to blocking + buffered stream output
+ *  \param[in] target log target which we should switch
+ *  \return 0 on success; 1 if already switched before; negative on error */
+int log_target_file_switch_to_stream(struct log_target *target)
+{
+       struct osmo_wqueue *wq;
+       const char *name;
+
+       if (!target)
+               return -ENODEV;
+
+       /* this only works for file/stderr targets */
+       switch (target->type) {
+       case LOG_TGT_TYPE_FILE:
+               name = target->tgt_file.fname;
+               break;
+       case LOG_TGT_TYPE_STDERR:
+               name = "stderr";
+               break;
+       default:
+               return -EINVAL;
+       }
+
+       if (target->tgt_file.out) {
+               /* target has already been switched over */
+               return 1;
+       }
+
+       LOGP(DLGLOBAL, LOGL_INFO, "Switching log target '%s' to blocking stream 
I/O\n", name);
+
+       wq = target->tgt_file.wqueue;
+       OSMO_ASSERT(wq);
+
+       /* re-open output as stream */
+       if (target->type == LOG_TGT_TYPE_STDERR)
+               target->tgt_file.out = stderr;
+       else
+               target->tgt_file.out = fopen(target->tgt_file.fname, "a");
+       if (!target->tgt_file.out) {
+               LOGP(DLGLOBAL, LOGL_ERROR, "Cannot open log target '%s' as 
blocking stream I/O: %s\n",
+                    name, strerror(errno));
+               return -EIO;
+       }
+
+       /* synchronously write anything left in the queue */
+       while (!llist_empty(&wq->msg_queue)) {
+               struct msgb *msg = msgb_dequeue(&wq->msg_queue);
+               fwrite(msgb_data(msg), msgb_length(msg), 1, 
target->tgt_file.out);
+               msgb_free(msg);
+       }
+
+       /* now that everything succeeded, we can finally close the old output 
fd */
+       if (target->type == LOG_TGT_TYPE_FILE) {
+               osmo_fd_unregister(&wq->bfd);
+               close(wq->bfd.fd);
+       }
+
+       /* release the queue itself */
+       talloc_free(wq);
+       target->tgt_file.wqueue = NULL;
+       target->output = _file_output_stream;
+       target->raw_output = NULL;
+
+       return 0;
+}
+
+/*! switch from blocking + buffered file output to non-blocking write-queue 
based output.
+ *  \param[in] target log target which we should switch
+ *  \return 0 on success; 1 if already switched before; negative on error */
+int log_target_file_switch_to_wqueue(struct log_target *target)
+{
+       struct osmo_wqueue *wq;
+       const char *name;
+       int rc;
+
+       if (!target)
+               return -ENODEV;
+
+       /* this only works for file/stderr targets */
+       switch (target->type) {
+       case LOG_TGT_TYPE_FILE:
+               name = target->tgt_file.fname;
+               break;
+       case LOG_TGT_TYPE_STDERR:
+               name = "stderr";
+               break;
+       default:
+               return -EINVAL;
+       }
+
+       if (!target->tgt_file.out) {
+               /* target has already been switched over */
+               return 1;
+       }
+
+       LOGP(DLGLOBAL, LOGL_INFO, "Switching log target '%s' to non-blocking 
I/O\n", name);
+
+       /* we create a ~640kB sized talloc pool within the write-queue to 
ensure individual
+        * log lines (stored as msgbs) will not put result in malloc() calls, 
and also to
+        * reduce the OOM probability within logging, as the pool is already 
allocated */
+       wq = talloc_pooled_object(target, struct osmo_wqueue, LOG_WQUEUE_LEN,
+                                 LOG_WQUEUE_LEN*(sizeof(struct msgb)+512));
+       if (!wq)
+               return -ENOMEM;
+       osmo_wqueue_init(wq, LOG_WQUEUE_LEN);
+
+       fflush(target->tgt_file.out);
+       if (target->type == LOG_TGT_TYPE_FILE) {
+               rc = open(target->tgt_file.fname, 
O_WRONLY|O_APPEND|O_CREAT|O_NONBLOCK, 0660);
+               if (rc < 0) {
+                       LOGP(DLGLOBAL, LOGL_ERROR, "Cannot open log target '%s' 
as non-blocking I/O: %s\n",
+                            name, strerror(errno));
+                       talloc_free(wq);
+                       return -errno;
+               }
+       } else {
+               rc = STDERR_FILENO;
+       }
+       wq->bfd.fd = rc;
+       wq->bfd.when = OSMO_FD_WRITE;
+       wq->write_cb = _file_wq_write_cb;
+
+       rc = osmo_fd_register(&wq->bfd);
+       if (rc < 0) {
+               talloc_free(wq);
+               return -EIO;
+       }
+       target->tgt_file.wqueue = wq;
        target->output = _file_output;

+       /* now that everything succeeded, we can finally close the old output 
stream */
+       if (target->type == LOG_TGT_TYPE_FILE)
+               fclose(target->tgt_file.out);
+       target->tgt_file.out = NULL;
+
+       return 0;
+}
+
+/*! Create a new file-based log target using non-blocking write_queue
+ *  \param[in] fname File name of the new log file
+ *  \returns Log target in case of success, NULL otherwise
+ */
+struct log_target *log_target_create_file(const char *fname)
+{
+       struct log_target *target;
+       struct osmo_wqueue *wq;
+       int rc;
+
+       target = log_target_create();
+       if (!target)
+               return NULL;
+
+       target->type = LOG_TGT_TYPE_FILE;
+       /* we create a ~640kB sized talloc pool within the write-queue to 
ensure individual
+        * log lines (stored as msgbs) will not put result in malloc() calls, 
and also to
+        * reduce the OOM probability within logging, as the pool is already 
allocated */
+       wq = talloc_pooled_object(target, struct osmo_wqueue, LOG_WQUEUE_LEN,
+                                 LOG_WQUEUE_LEN*(sizeof(struct msgb)+512));
+       if (!wq) {
+               log_target_destroy(target);
+               return NULL;
+       }
+       osmo_wqueue_init(wq, LOG_WQUEUE_LEN);
+       wq->bfd.fd = open(fname, O_WRONLY|O_APPEND|O_CREAT|O_NONBLOCK, 0660);
+       if (wq->bfd.fd < 0) {
+               talloc_free(wq);
+               log_target_destroy(target);
+               return NULL;
+       }
+       wq->bfd.when = OSMO_FD_WRITE;
+       wq->write_cb = _file_wq_write_cb;
+
+       rc = osmo_fd_register(&wq->bfd);
+       if (rc < 0) {
+               talloc_free(wq);
+               log_target_destroy(target);
+               return NULL;
+       }
+
+       target->tgt_file.wqueue = wq;
+       target->output = _file_output;
        target->tgt_file.fname = talloc_strdup(target, fname);

        return target;
@@ -1040,17 +1271,33 @@
  *  \param[in] target log target to unregister, close and delete */
 void log_target_destroy(struct log_target *target)
 {
-
        /* just in case, to make sure we don't have any references */
        log_del_target(target);

 #if (!EMBEDDED)
+       struct osmo_wqueue *wq;
        switch (target->type) {
        case LOG_TGT_TYPE_FILE:
-               if (target->tgt_file.out == NULL)
-                       break;
-               fclose(target->tgt_file.out);
-               target->tgt_file.out = NULL;
+       case LOG_TGT_TYPE_STDERR:
+               if (target->tgt_file.out) {
+                       if (target->type == LOG_TGT_TYPE_FILE)
+                               fclose(target->tgt_file.out);
+                       target->tgt_file.out = NULL;
+               }
+               wq = target->tgt_file.wqueue;
+               if (wq) {
+                       if (wq->bfd.fd >= 0) {
+                               if (target->type == LOG_TGT_TYPE_FILE)
+                                       close(wq->bfd.fd);
+                               wq->bfd.fd = -1;
+                       }
+                       osmo_fd_unregister(&wq->bfd);
+                       osmo_wqueue_clear(wq);
+                       talloc_free(wq);
+                       target->tgt_file.wqueue = NULL;
+               }
+               talloc_free((void *)target->tgt_file.fname);
+               target->tgt_file.fname = NULL;
                break;
 #ifdef HAVE_SYSLOG_H
        case LOG_TGT_TYPE_SYSLOG:
@@ -1071,13 +1318,33 @@
  *  \returns 0 in case of success; negative otherwise */
 int log_target_file_reopen(struct log_target *target)
 {
-       fclose(target->tgt_file.out);
+       struct osmo_wqueue *wq;
+       int rc;

-       target->tgt_file.out = fopen(target->tgt_file.fname, "a");
-       if (!target->tgt_file.out)
-               return -errno;
+       OSMO_ASSERT(target->type == LOG_TGT_TYPE_FILE || target->type == 
LOG_TGT_TYPE_STDERR);
+       OSMO_ASSERT(target->tgt_file.out || target->tgt_file.wqueue);

-       /* we assume target->output already to be set */
+       if (target->tgt_file.out) {
+               fclose(target->tgt_file.out);
+               target->tgt_file.out = fopen(target->tgt_file.fname, "a");
+               if (!target->tgt_file.out)
+                       return -errno;
+       } else {
+               wq = target->tgt_file.wqueue;
+               osmo_fd_unregister(&wq->bfd);
+               if (wq->bfd.fd >= 0) {
+                       close(wq->bfd.fd);
+                       wq->bfd.fd = -1;
+               }
+
+               rc = open(target->tgt_file.fname, 
O_WRONLY|O_APPEND|O_CREAT|O_NONBLOCK, 0660);
+               if (rc < 0)
+                       return -errno;
+               wq->bfd.fd = rc;
+               rc = osmo_fd_register(&wq->bfd);
+               if (rc < 0)
+                       return rc;
+       }

        return 0;
 }
diff --git a/src/vty/logging_vty.c b/src/vty/logging_vty.c
index 3b131c2..d33fe5e 100644
--- a/src/vty/logging_vty.c
+++ b/src/vty/logging_vty.c
@@ -835,8 +835,9 @@
 }

 DEFUN(cfg_log_stderr, cfg_log_stderr_cmd,
-       "log stderr",
-       LOG_STR "Logging via STDERR of the process\n")
+       "log stderr [blocking-io]",
+       LOG_STR "Logging via STDERR of the process\n"
+       "Use blocking, synchronous I/O\n")
 {
        struct log_target *tgt;

@@ -852,6 +853,11 @@
                log_add_target(tgt);
        }

+       if (argc > 0 && !strcmp(argv[0], "blocking-io"))
+               log_target_file_switch_to_stream(tgt);
+       else
+               log_target_file_switch_to_wqueue(tgt);
+
        vty->index = tgt;
        vty->node = CFG_LOG_NODE;

@@ -878,8 +884,9 @@
 }

 DEFUN(cfg_log_file, cfg_log_file_cmd,
-       "log file .FILENAME",
-       LOG_STR "Logging to text file\n" "Filename\n")
+       "log file FILENAME [blocking-io]",
+       LOG_STR "Logging to text file\n" "Filename\n"
+       "Use blocking, synchronous I/O\n")
 {
        const char *fname = argv[0];
        struct log_target *tgt;
@@ -896,6 +903,11 @@
                log_add_target(tgt);
        }

+       if (argc > 1 && !strcmp(argv[1], "blocking-io"))
+               log_target_file_switch_to_stream(tgt);
+       else
+               log_target_file_switch_to_wqueue(tgt);
+
        vty->index = tgt;
        vty->node = CFG_LOG_NODE;

@@ -904,7 +916,7 @@


 DEFUN(cfg_no_log_file, cfg_no_log_file_cmd,
-       "no log file .FILENAME",
+       "no log file FILENAME",
        NO_STR LOG_STR "Logging to text file\n" "Filename\n")
 {
        const char *fname = argv[0];
@@ -979,7 +991,10 @@
                return 1;
                break;
        case LOG_TGT_TYPE_STDERR:
-               vty_out(vty, "log stderr%s", VTY_NEWLINE);
+               if (tgt->tgt_file.wqueue)
+                       vty_out(vty, "log stderr%s", VTY_NEWLINE);
+               else
+                       vty_out(vty, "log stderr blocking-io%s", VTY_NEWLINE);
                break;
        case LOG_TGT_TYPE_SYSLOG:
 #ifdef HAVE_SYSLOG_H
@@ -990,7 +1005,10 @@
 #endif
                break;
        case LOG_TGT_TYPE_FILE:
-               vty_out(vty, "log file %s%s", tgt->tgt_file.fname, VTY_NEWLINE);
+               if (tgt->tgt_file.wqueue)
+                       vty_out(vty, "log file %s%s", tgt->tgt_file.fname, 
VTY_NEWLINE);
+               else
+                       vty_out(vty, "log file %s blocking-io%s", 
tgt->tgt_file.fname, VTY_NEWLINE);
                break;
        case LOG_TGT_TYPE_STRRB:
                vty_out(vty, "log alarms %zu%s",
diff --git a/tests/logging/logging_vty_test.c b/tests/logging/logging_vty_test.c
index e7019f6..078555e 100644
--- a/tests/logging/logging_vty_test.c
+++ b/tests/logging/logging_vty_test.c
@@ -254,6 +254,7 @@
        log_set_print_category_hex(osmo_stderr_target, 0);
        log_set_print_level(osmo_stderr_target, 1);
        log_set_print_filename2(osmo_stderr_target, LOG_FILENAME_NONE);
+       log_target_file_switch_to_wqueue(osmo_stderr_target);

        if (cmdline_config.config_file) {
                rc = vty_read_config_file(cmdline_config.config_file, NULL);

--
To view, visit https://gerrit.osmocom.org/c/libosmocore/+/20296
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: Ia58fd78535c41b3da3aeb7733aadc785ace610da
Gerrit-Change-Number: 20296
Gerrit-PatchSet: 16
Gerrit-Owner: laforge <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: osmith <[email protected]>
Gerrit-Reviewer: pespin <[email protected]>
Gerrit-MessageType: merged

Reply via email to