On Mo, 2017-03-20 at 15:38 +0100, Jeff Burdges wrote:
> 
> Christian has expressed an interest in GNUnet becoming modular over
> scheduler for asynchronous IO operations.  There are two primary
> targets
> for this : 
> -  Qt's scheduler does not play well with others.  We must support it
> for Qt applications.
> -  The Rust community rejected all C schedulers as unsafe, or more
> precisely "slow if wrapped safely", so they wrote their own
> asynchronous
> IO stack.

I'm planning to write a secushare API in Rust which uses the
Future/Stream types from the futures crate in combination with a Core
event loop (tokio_core crate).
We are using Qt, too through the Rust bindings from https://github.com/
White-Oak/qml-rust, so integrating Qt's event loop with GNUnet is not
our goal right now. In the end it should be possible to integrate any
event loop with GNUnet of course.

Did I understand it correctly that modifying the GNUnet scheduler means
you're planning to not do IPC with the GNUnet services from Rust
anymore? That would be good news to me because it's easier to maintain
the Rust bindings when they call the API functions (which are supposed
to change rarely).

> 
> We should imho target the Rust scheduler first because doing so will
> be
> more fun, so I'll talk about doing that.  The question is : 
> 
> Where should we plug into their asynchronous IO stack?  Including,
> where
> does their stack look most like GNUnet's existing asynchronous IO
> stack? 

I was playing around a bit with the GNUnet scheduler already to open it
up to other event loops (see the attached patch). It's a quick try and
probably needs to be improved. Basically I introduced two new API
functions:

- GNUNET_SCHEDULER_set_work_callback: allows setting a callback which
is called after select returned. The callback is expected to schedule a
task in the external event loop. After that the GNUnet event loop
blocks

- GNUNET_SCHEDULER_do_work: the scheduled task is expected to run this
function, so all the available GNUnet tasks can be executed. After that
GNUnet's event loop continues to run

When it comes to how to schedule a task in the tokio_core event loop, I
was thinking of a 'Stream' (https://docs.rs/futures/0.1.11/futures/stre
am/trait.Stream.html) which is spawned before the GNUnet scheduler is
started. Communication with that stream should be possible with the
task::park and task::unpark (see https://docs.rs/futures/0.1.11/futures
/task/fn.park.html).

This is my first idea and it's possible that I overlooked something,
the tokio.rs stuff is not the easiest to see through.

> 
> 
> In essence, the two choices is between living on top of tokio-core
> directly, or using the higher level abstractions in tokio-proto too.
> We'd use https://docs.rs/tokio-core/0.1.6/tokio_core/ for sure, but I
> do
> not know if we'd use https://docs.rs/tokio-proto/0.1.1/tokio_proto/
> too.

So far tokio_core seemed sufficient to me.

> 
> Anything marked with a ? is something GNUnet overall needs, but Rust
> services should not need because another service like gnunet-arm or
> transport does everything we need.  In particular, I think TLS and
> DNS
> might push us up the stack onto tokio-proto, but other GNUnet
> processes
> do everything we need there.

I'm confused. Why would GNUnet need tokio-tls or trust-dns?

lurchi.
diff --git a/src/include/gnunet_scheduler_lib.h b/src/include/gnunet_scheduler_lib.h
index 2be1858..9705b49 100644
--- a/src/include/gnunet_scheduler_lib.h
+++ b/src/include/gnunet_scheduler_lib.h
@@ -134,6 +134,16 @@ typedef void
 
 
 /**
+ * Signature of the function called to scheduler work in an external
+ * event loop.
+ *
+ * @param cls closure
+ */
+typedef void
+(*GNUNET_SCHEDULER_WorkCallback) (void *work_cb_cls);
+
+
+/**
  * Signature of the select function used by the scheduler.
  * #GNUNET_NETWORK_socket_select matches it.
  *
@@ -577,6 +587,30 @@ void
 GNUNET_SCHEDULER_set_select (GNUNET_SCHEDULER_select new_select,
                              void *new_select_cls);
 
+/**
+ * Sets a callback function to call when the event loop has tasks to
+ * execute. If a callback was set the event loop blocks when tasks are ready
+ * to be run until GNUNET_SCHEDULER_do_work was called. The callback
+ * function is expected to schedule a task in the external event loop and 
+ * return. That task has to call GNUNET_SCHEDULER_do_work. After that the
+ * GNUnet event loop will continue to run.
+ *
+ * @param work_cb the function called when work is available
+ * @param work_cb_cls the closure passed to work_cb
+ */
+void
+GNUNET_SCHEDULER_set_work_callback (GNUNET_SCHEDULER_WorkCallback work_cb,
+                                    void *work_cb_cls);
+
+/**
+ * Runs tasks that are ready in the GNUnet event loop. This function must be
+ * called in an external event loop (in a different thread). It only makes
+ * makes sense to call this function once after the work callback has been
+ * called (see GNUNET_SCHEDULER_set_work_callback).
+ */
+void
+GNUNET_SCHEDULER_do_work (void);
+
 
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
diff --git a/src/util/scheduler.c b/src/util/scheduler.c
index 409a094..e1e4cb7 100644
--- a/src/util/scheduler.c
+++ b/src/util/scheduler.c
@@ -280,6 +280,29 @@ static struct GNUNET_SCHEDULER_TaskContext tc;
  */
 static void *scheduler_select_cls;
 
+/**
+ * Function to notify an external event loop that tasks are ready
+ */
+static GNUNET_SCHEDULER_WorkCallback work_cb;
+
+/**
+ * Closure for #work_cb
+ */
+static void *work_cb_cls;
+
+/**
+ * event loop state
+ */
+static struct GNUNET_NETWORK_FDSet *rs;
+static struct GNUNET_NETWORK_FDSet *ws;
+static struct GNUNET_TIME_Relative timeout;
+static int select_ret;
+static unsigned long long last_tr;
+static unsigned int busy_wait_warning;
+static const struct GNUNET_DISK_FileHandle *pr;
+static struct GNUNET_DISK_PipeHandle *work_done_pipe_handle;
+static char signal_buffer;
+
 
 /**
  * Sets the select function to use in the scheduler (scheduler_select).
@@ -719,6 +742,90 @@ check_lifeness ()
 }
 
 
+static void
+do_work (void)
+{
+  if (select_ret == GNUNET_SYSERR)
+  {
+    if (errno == EINTR)
+      return;
+
+    LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select");
+#ifndef MINGW
+#if USE_LSOF
+    char lsof[512];
+
+    snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ());
+    (void) close (1);
+    (void) dup2 (2, 1);
+    if (0 != system (lsof))
+      LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
+                    "system");
+#endif
+#endif
+#if DEBUG_FDS
+    struct GNUNET_SCHEDULER_Task *t;
+
+    for (t = pending_head; NULL != t; t = t->next)
+    {
+      if (-1 != t->read_fd)
+      {
+        int flags = fcntl (t->read_fd, F_GETFD);
+        if ((flags == -1) && (errno == EBADF))
+          {
+            LOG (GNUNET_ERROR_TYPE_ERROR,
+                 "Got invalid file descriptor %d!\n",
+                 t->read_fd);
+            dump_backtrace (t);
+          }
+      }
+      if (-1 != t->write_fd)
+        {
+          int flags = fcntl (t->write_fd, F_GETFD);
+          if ((flags == -1) && (errno == EBADF))
+            {
+              LOG (GNUNET_ERROR_TYPE_ERROR,
+                   "Got invalid file descriptor %d!\n",
+                   t->write_fd);
+      	      dump_backtrace (t);
+            }
+        }
+    }
+#endif
+    GNUNET_assert (0);
+    return;
+  }
+
+  if ( (0 == select_ret) &&
+       (0 == timeout.rel_value_us) &&
+       (busy_wait_warning > 16) )
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Looks like we're busy waiting...\n");
+    short_wait (100);                /* mitigate */
+  }
+  check_ready (rs, ws);
+  run_ready (rs, ws);
+  if (GNUNET_NETWORK_fdset_handle_isset (rs, pr))
+  {
+    /* consume the signal */
+    GNUNET_DISK_file_read (pr, &signal_buffer, sizeof (signal_buffer));
+    /* mark all active tasks as ready due to shutdown */
+    GNUNET_SCHEDULER_shutdown ();
+  }
+  if (last_tr == tasks_run)
+  {
+    short_wait (1);
+    busy_wait_warning++;
+  }
+  else
+  {
+    last_tr = tasks_run;
+    busy_wait_warning = 0;
+  }
+}
+
+
 /**
  * Initialize and run scheduler.  This function will return when all
  * tasks have completed.  On systems with signals, receiving a SIGTERM
@@ -737,10 +844,10 @@ void
 GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
                       void *task_cls)
 {
-  struct GNUNET_NETWORK_FDSet *rs;
-  struct GNUNET_NETWORK_FDSet *ws;
-  struct GNUNET_TIME_Relative timeout;
-  int ret;
+  //struct GNUNET_NETWORK_FDSet *rs;
+  //struct GNUNET_NETWORK_FDSet *ws;
+  //struct GNUNET_TIME_Relative timeout;
+  //int ret;
   struct GNUNET_SIGNAL_Context *shc_int;
   struct GNUNET_SIGNAL_Context *shc_term;
 #if (SIGTERM != GNUNET_TERM_SIG)
@@ -752,10 +859,10 @@ GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
   struct GNUNET_SIGNAL_Context *shc_hup;
   struct GNUNET_SIGNAL_Context *shc_pipe;
 #endif
-  unsigned long long last_tr;
-  unsigned int busy_wait_warning;
-  const struct GNUNET_DISK_FileHandle *pr;
-  char c;
+  //unsigned long long last_tr;
+  //unsigned int busy_wait_warning;
+  //const struct GNUNET_DISK_FileHandle *pr;
+  //char c;
 
   GNUNET_assert (NULL == active_task);
   rs = GNUNET_NETWORK_fdset_create ();
@@ -769,6 +876,11 @@ GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
   pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle,
                                 GNUNET_DISK_PIPE_END_READ);
   GNUNET_assert (NULL != pr);
+  work_done_pipe_handle = GNUNET_DISK_pipe (GNUNET_YES,
+                                            GNUNET_NO,
+                                            GNUNET_NO,
+                                            GNUNET_NO);
+  GNUNET_assert (NULL != work_done_pipe_handle);
   my_pid = getpid ();
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Registering signal handlers\n");
@@ -777,7 +889,7 @@ GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
   shc_term = GNUNET_SIGNAL_handler_install (SIGTERM,
 					    &sighandler_shutdown);
 #if (SIGTERM != GNUNET_TERM_SIG)
-  shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG,
+  shc_gterm = GNUNEi_SIGNAL_handler_install (GNUNET_TERM_SIG,
 					     &sighandler_shutdown);
 #endif
 #ifndef MINGW
@@ -814,94 +926,117 @@ GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
       timeout = GNUNET_TIME_UNIT_ZERO;
     }
     if (NULL == scheduler_select)
-      ret = GNUNET_NETWORK_socket_select (rs,
-                                          ws,
-                                          NULL,
-                                          timeout);
+      select_ret = GNUNET_NETWORK_socket_select (rs,
+                                                 ws,
+                                                 NULL,
+                                                 timeout);
     else
-      ret = scheduler_select (scheduler_select_cls,
-                              rs,
-                              ws,
-                              NULL,
-                              timeout);
-    if (ret == GNUNET_SYSERR)
+      select_ret = scheduler_select (scheduler_select_cls,
+                                     rs,
+                                     ws,
+                                     NULL,
+                                     timeout);
+
+    if (GNUNET_SYSERR == select_ret && errno == EINTR)
+      continue;
+    
+    if (NULL == work_cb)
     {
-      if (errno == EINTR)
-        continue;
-
-      LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select");
-#ifndef MINGW
-#if USE_LSOF
-      char lsof[512];
-
-      snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ());
-      (void) close (1);
-      (void) dup2 (2, 1);
-      if (0 != system (lsof))
-        LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
-                      "system");
-#endif
-#endif
-#if DEBUG_FDS
-      struct GNUNET_SCHEDULER_Task *t;
-
-      for (t = pending_head; NULL != t; t = t->next)
-      {
-        if (-1 != t->read_fd)
-        {
-          int flags = fcntl (t->read_fd, F_GETFD);
-          if ((flags == -1) && (errno == EBADF))
-            {
-              LOG (GNUNET_ERROR_TYPE_ERROR,
-                   "Got invalid file descriptor %d!\n",
-                   t->read_fd);
-	      dump_backtrace (t);
-            }
-        }
-        if (-1 != t->write_fd)
-          {
-            int flags = fcntl (t->write_fd, F_GETFD);
-            if ((flags == -1) && (errno == EBADF))
-              {
-                LOG (GNUNET_ERROR_TYPE_ERROR,
-                     "Got invalid file descriptor %d!\n",
-                     t->write_fd);
-		dump_backtrace (t);
-              }
-          }
-      }
-#endif
-      GNUNET_assert (0);
-      break;
+      do_work(); 
     }
 
-    if ( (0 == ret) &&
-         (0 == timeout.rel_value_us) &&
-         (busy_wait_warning > 16) )
-    {
-      LOG (GNUNET_ERROR_TYPE_WARNING,
-           "Looks like we're busy waiting...\n");
-      short_wait (100);                /* mitigate */
-    }
-    check_ready (rs, ws);
-    run_ready (rs, ws);
-    if (GNUNET_NETWORK_fdset_handle_isset (rs, pr))
-    {
-      /* consume the signal */
-      GNUNET_DISK_file_read (pr, &c, sizeof (c));
-      /* mark all active tasks as ready due to shutdown */
-      GNUNET_SCHEDULER_shutdown ();
-    }
-    if (last_tr == tasks_run)
-    {
-      short_wait (1);
-      busy_wait_warning++;
-    }
     else
     {
-      last_tr = tasks_run;
-      busy_wait_warning = 0;
+      static char c;
+      work_cb (work_cb_cls);
+      GNUNET_DISK_file_read (GNUNET_DISK_pipe_handle (work_done_pipe_handle,
+                                                      GNUNET_DISK_PIPE_END_READ),
+                             &c, sizeof (c));
     }
+
+    if (GNUNET_SYSERR == select_ret)
+      break;
+    /// start
+//    if (ret == GNUNET_SYSERR)
+//    {
+//      if (errno == EINTR)
+//        continue;
+//
+//      LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select");
+//#ifndef MINGW
+//#if USE_LSOF
+//      char lsof[512];
+//
+//      snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ());
+//      (void) close (1);
+//      (void) dup2 (2, 1);
+//      if (0 != system (lsof))
+//        LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
+//                      "system");
+//#endif
+//#endif
+//#if DEBUG_FDS
+//      struct GNUNET_SCHEDULER_Task *t;
+//
+//      for (t = pending_head; NULL != t; t = t->next)
+//      {
+//        if (-1 != t->read_fd)
+//        {
+//          int flags = fcntl (t->read_fd, F_GETFD);
+//          if ((flags == -1) && (errno == EBADF))
+//            {
+//              LOG (GNUNET_ERROR_TYPE_ERROR,
+//                   "Got invalid file descriptor %d!\n",
+//                   t->read_fd);
+//	      dump_backtrace (t);
+//            }
+//        }
+//        if (-1 != t->write_fd)
+//          {
+//            int flags = fcntl (t->write_fd, F_GETFD);
+//            if ((flags == -1) && (errno == EBADF))
+//              {
+//                LOG (GNUNET_ERROR_TYPE_ERROR,
+//                     "Got invalid file descriptor %d!\n",
+//                     t->write_fd);
+//		dump_backtrace (t);
+//              }
+//          }
+//      }
+//#endif
+//      GNUNET_assert (0);
+//      break;
+//    }
+//
+//    if ( (0 == ret) &&
+//         (0 == timeout.rel_value_us) &&
+//         (busy_wait_warning > 16) )
+//    {
+//      LOG (GNUNET_ERROR_TYPE_WARNING,
+//           "Looks like we're busy waiting...\n");
+//      short_wait (100);                /* mitigate */
+//    }
+//    check_ready (rs, ws);
+//    run_ready (rs, ws);
+//    if (GNUNET_NETWORK_fdset_handle_isset (rs, pr))
+//    {
+//      /* consume the signal */
+//      GNUNET_DISK_file_read (pr, &c, sizeof (c));
+//      /* mark all active tasks as ready due to shutdown */
+//      GNUNET_SCHEDULER_shutdown ();
+//    }
+//    if (last_tr == tasks_run)
+//    {
+//      short_wait (1);
+//      busy_wait_warning++;
+//    }
+//    else
+//    {
+//      last_tr = tasks_run;
+//      busy_wait_warning = 0;
+//    }
+//
+    /// end
   }
   GNUNET_SIGNAL_handler_uninstall (shc_int);
   GNUNET_SIGNAL_handler_uninstall (shc_term);
@@ -1750,4 +1885,46 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio,
   return t;
 }
 
+
+/**
+ * Sets a callback function to call when the event loop has tasks to
+ * execute. If a callback was set the event loop blocks when tasks are ready
+ * to be run until GNUNET_SCHEDULER_do_work was called. The callback
+ * function is expected to schedule a task in the external event loop and 
+ * return. That task has to call GNUNET_SCHEDULER_do_work. After that the
+ * GNUnet event loop will continue to run.
+ *
+ * @param work_cb the function called when work is available
+ * @param work_cb_cls the closure passed to work_cb
+ */
+void
+GNUNET_SCHEDULER_set_work_callback (GNUNET_SCHEDULER_WorkCallback work_cb,
+                                    void *work_cb_cls)
+{
+  work_cb = work_cb;
+  work_cb_cls = work_cb_cls;
+}
+
+
+/**
+ * Runs tasks that are ready in the GNUnet event loop. This function must be
+ * called in an external event loop (in a different thread). It only makes
+ * makes sense to call this function once after the work callback has been
+ * called (see GNUNET_SCHEDULER_set_work_callback).
+ */
+void
+GNUNET_SCHEDULER_do_work (void)
+{
+  static char c;
+
+  if (NULL == work_done_pipe_handle)
+    return;
+  
+  do_work (); 
+  GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle (work_done_pipe_handle,
+                                                   GNUNET_DISK_PIPE_END_WRITE),
+                          &c, sizeof (c));           
+}
+
+
 /* end of scheduler.c */

Attachment: signature.asc
Description: This is a digitally signed message part

_______________________________________________
GNUnet-developers mailing list
[email protected]
https://lists.gnu.org/mailman/listinfo/gnunet-developers

Reply via email to