wingo pushed a commit to branch wip-whippet
in repository guile.

commit 4d7041bfa9820a90f4fe234eea8ad183e1e8b2f2
Author: Andy Wingo <wi...@igalia.com>
AuthorDate: Sun Mar 13 13:54:58 2022 +0100

    Another attempt at parallel marking, avoiding the channel
    
    Not great though!
---
 parallel-marker.h | 104 ++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 94 insertions(+), 10 deletions(-)

diff --git a/parallel-marker.h b/parallel-marker.h
index f935bc155..808a588ba 100644
--- a/parallel-marker.h
+++ b/parallel-marker.h
@@ -223,6 +223,8 @@ mark_deque_steal(struct mark_deque *q) {
 
 #define LOCAL_MARK_QUEUE_SIZE 64
 #define LOCAL_MARK_QUEUE_MASK 63
+#define LOCAL_MARK_QUEUE_SHARE_THRESHOLD 48
+#define LOCAL_MARK_QUEUE_SHARE_AMOUNT 32
 struct local_mark_queue {
   size_t read;
   size_t write;
@@ -237,13 +239,21 @@ static inline void
 local_mark_queue_poison(struct local_mark_queue *q) {
   q->read = 0; q->write = LOCAL_MARK_QUEUE_SIZE;
 }
+static inline size_t
+local_mark_queue_size(struct local_mark_queue *q) {
+  return q->write - q->read;
+}
 static inline int
 local_mark_queue_empty(struct local_mark_queue *q) {
-  return q->read == q->write;
+  return local_mark_queue_size(q) == 0;
+}
+static inline int
+local_mark_queue_should_share(struct local_mark_queue *q) {
+  return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SHARE_THRESHOLD;
 }
 static inline int
 local_mark_queue_full(struct local_mark_queue *q) {
-  return q->read + LOCAL_MARK_QUEUE_SIZE == q->write;
+  return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SIZE;
 }
 static inline void
 local_mark_queue_push(struct local_mark_queue *q, uintptr_t v) {
@@ -313,7 +323,9 @@ mark_notify_wait(struct mark_notify *notify) {
   }
 
   // Spurious wakeup is OK.
+  DEBUG("-- marker waiting\n");
   pthread_cond_wait(&notify->cond, &notify->lock);
+  DEBUG("-- marker woke\n");
   res = MARK_NOTIFY_WOKE;
   notify->pending = 0;
 
@@ -324,10 +336,12 @@ done:
 
 static void
 mark_notify_wake(struct mark_notify *notify) {
+  DEBUG("== notifying pending wake!\n");
   pthread_mutex_lock(&notify->lock);
   notify->pending = 1;
   pthread_cond_signal(&notify->cond);
   pthread_mutex_unlock(&notify->lock);
+  DEBUG("== notifying pending wake done\n");
 }
 
 // A mostly lock-free multi-producer, single consumer queue, largely
@@ -486,7 +500,9 @@ struct mark_worker {
 
 struct marker {
   struct mark_deque deque;
+  pthread_mutex_t deque_writer_lock;
   struct mark_channel overflow;
+  atomic_size_t active_markers;
   size_t worker_count;
   struct mark_worker workers[MARK_WORKERS_MAX_COUNT];
 };
@@ -586,6 +602,7 @@ marker_init(struct context *cx) {
   struct marker *marker = context_marker(cx);
   if (!mark_deque_init(&marker->deque))
     return 0;
+  pthread_mutex_init(&marker->deque_writer_lock, NULL);
   mark_channel_init(&marker->overflow);
   size_t desired_worker_count = 0;
   if (getenv("GC_MARKERS"))
@@ -615,16 +632,80 @@ static inline void trace_one(struct gcobj *obj, void 
*mark_data) ALWAYS_INLINE;
 static inline int mark_object(struct context *cx,
                               struct gcobj *obj) ALWAYS_INLINE;
 
+static inline void
+marker_share(struct local_marker *mark) {
+  struct marker *marker = context_marker(mark->cx);
+  DEBUG("marker %p: trying to share\n", mark->worker);
+  if (pthread_mutex_trylock(&marker->deque_writer_lock) != 0) {
+    DEBUG("marker %p: trylock failed\n", mark->worker);
+    if (local_mark_queue_full(&mark->local)) {
+      DEBUG("marker %p: forcing lock acquisition\n", mark->worker);
+      pthread_mutex_lock(&marker->deque_writer_lock);
+    } else
+      return;
+  }
+
+  DEBUG("marker %p: sharing\n", mark->worker);
+  for (size_t i = 0; i < LOCAL_MARK_QUEUE_SHARE_AMOUNT; i++)
+    mark_deque_push(&marker->deque, local_mark_queue_pop(&mark->local));
+
+  pthread_mutex_unlock(&marker->deque_writer_lock);
+}
+
 static inline void
 marker_visit(void **loc, void *mark_data) {
   struct local_marker *mark = mark_data;
   struct gcobj *obj = *loc;
   if (obj && mark_object(mark->cx, obj)) {
-    if (!local_mark_queue_full(&mark->local))
-      local_mark_queue_push(&mark->local, (uintptr_t)obj);
-    else {
-      mark_channel_write(&mark->worker->writer, (uintptr_t)obj);
+    if (local_mark_queue_should_share(&mark->local))
+      marker_share(mark);
+    local_mark_queue_push(&mark->local, (uintptr_t)obj);
+  }
+}
+
+static uintptr_t
+mark_worker_steal(struct local_marker *mark) {
+  DEBUG("marker %p: trying to steal\n", mark->worker);
+  while (1) {
+    uintptr_t addr = mark_deque_steal(mark->deque);
+    if (addr == mark_deque_empty) {
+      struct marker *marker = context_marker(mark->cx);
+      if (atomic_fetch_sub_explicit(&marker->active_markers, 1,
+                                    memory_order_relaxed) == 1) {
+        DEBUG("  ->> marker %p: DONE (no spinning) <<-\n", mark->worker);
+        return 0;
+      }
+      size_t spin_count = 0;
+      while (1) {
+        addr = mark_deque_steal(mark->deque);
+        if (addr != mark_deque_empty) {
+          DEBUG("marker %p: spinning got 0x%zx\n", mark->worker, addr);
+          atomic_fetch_add_explicit(&marker->active_markers, 1,
+                                    memory_order_relaxed);
+          break;
+        }
+        if (atomic_load_explicit(&marker->active_markers,
+                                 memory_order_relaxed) == 0) {
+          DEBUG("  ->> marker %p: DONE <<-\n", mark->worker);
+          return 0;
+        }
+        // spin
+        DEBUG("marker %p: spinning #%zu\n", mark->worker, spin_count);
+        if (spin_count < 10)
+          __builtin_ia32_pause();
+        else if (spin_count < 20)
+          sched_yield();
+        else if (spin_count < 40)
+          usleep(0);
+        else
+          usleep(1);
+        spin_count++;
+      }
     }
+    DEBUG("marker %p: stealing got 0x%zx\n", mark->worker, addr);
+    if (addr == mark_deque_abort)
+      continue;
+    return addr;
   }
 }
 
@@ -643,11 +724,9 @@ mark_worker_mark(struct mark_worker *worker) {
     if (!local_mark_queue_empty(&mark.local)) {
       addr = local_mark_queue_pop(&mark.local);
     } else {
-      addr = mark_deque_steal(mark.deque);
-      if (addr == mark_deque_empty)
+      addr = mark_worker_steal(&mark);
+      if (!addr)
         break;
-      if (addr == mark_deque_abort)
-        continue;
     }
     trace_one((struct gcobj*)addr, &mark);
     n++;
@@ -671,16 +750,21 @@ marker_trace(struct context *cx) {
   DEBUG("starting trace; %zu workers\n", marker->worker_count);
   while (1) {
     DEBUG("waking workers\n");
+    atomic_store_explicit(&marker->active_markers, marker->worker_count,
+                          memory_order_release);
     for (size_t i = 0; i < marker->worker_count; i++)
       mark_worker_request_mark(&marker->workers[i]);
 
     DEBUG("running controller loop\n");
     size_t n = 0;
     while (1) {
+      DEBUG("controller: popping\n");
       uintptr_t addr = mark_channel_pop(&marker->overflow);
+      DEBUG("controller: popped 0x%zx\n", addr);
       if (!addr)
         break;
       mark_deque_push(&marker->deque, addr);
+      DEBUG("controller: pushed to deque\n");
       n++;
     }
     DEBUG("controller loop done, %zu objects sent for rebalancing\n", n);

Reply via email to