wingo pushed a commit to branch wip-whippet
in repository guile.

commit 7ce07de6701ccbf7cbd4be287326fab4cc728127
Author: Andy Wingo <wi...@igalia.com>
AuthorDate: Sat Mar 12 21:09:17 2022 +0100

    First crack at parallel marking
---
 Makefile          |   6 +-
 mark-sweep.h      |   4 +-
 parallel-marker.h | 417 +++++++++++++++++++++++++++++++++++++++++++++++++++---
 serial-marker.h   |   7 +-
 4 files changed, 404 insertions(+), 30 deletions(-)

diff --git a/Makefile b/Makefile
index 2846749e7..ecf35b3e3 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,7 @@ TESTS=gcbench # MT_GCBench MT_GCBench2
 COLLECTORS=bdw semi mark-sweep parallel-mark-sweep
 
 CC=gcc
-CFLAGS=-Wall -O2 -g
+CFLAGS=-Wall -O2 -g -fno-strict-aliasing
 
 ALL_TESTS=$(foreach COLLECTOR,$(COLLECTORS),$(addprefix 
$(COLLECTOR)-,$(TESTS)))
 
@@ -15,10 +15,10 @@ semi-%: semi.h precise-roots.h %.c
        $(CC) $(CFLAGS) -I. -DNDEBUG -DGC_SEMI -o $@ $*.c
 
 mark-sweep-%: mark-sweep.h precise-roots.h serial-marker.h assert.h debug.h %.c
-       $(CC) $(CFLAGS) -I. -DNDEBUG -DGC_MARK_SWEEP -o $@ $*.c
+       $(CC) $(CFLAGS) -I. -Wno-unused -DNDEBUG -DGC_MARK_SWEEP -o $@ $*.c
 
 parallel-mark-sweep-%: mark-sweep.h precise-roots.h parallel-marker.h assert.h 
debug.h %.c
-       $(CC) $(CFLAGS) -I. -DNDEBUG -DGC_PARALLEL_MARK_SWEEP -o $@ $*.c
+       $(CC) $(CFLAGS) -I. -Wno-unused -DNDEBUG -DGC_PARALLEL_MARK_SWEEP 
-lpthread -o $@ $*.c
 
 check: $(addprefix test-$(TARGET),$(TARGETS))
 
diff --git a/mark-sweep.h b/mark-sweep.h
index 6241d8ab6..2c90f48eb 100644
--- a/mark-sweep.h
+++ b/mark-sweep.h
@@ -144,7 +144,7 @@ static inline int mark_object(struct context *cx, struct 
gcobj *obj) {
   return 1;
 }
 
-static void trace_one(struct gcobj *obj, void *mark_data) {
+static inline void trace_one(struct gcobj *obj, void *mark_data) {
   switch (tag_live_alloc_kind(obj->tag)) {
 #define SCAN_OBJECT(name, Name, NAME) \
     case ALLOC_KIND_##NAME: \
@@ -168,7 +168,7 @@ static void collect(struct context *cx) {
   marker_prepare(cx);
   for (struct handle *h = cx->roots; h; h = h->next)
     marker_visit_root(&h->v, cx);
-  marker_trace(cx, trace_one);
+  marker_trace(cx);
   marker_release(cx);
   DEBUG("done marking\n");
   cx->sweep = cx->heap_base;
diff --git a/parallel-marker.h b/parallel-marker.h
index 805522ae7..f935bc155 100644
--- a/parallel-marker.h
+++ b/parallel-marker.h
@@ -1,6 +1,7 @@
 #ifndef SERIAL_TRACE_H
 #define SERIAL_TRACE_H
 
+#include <pthread.h>
 #include <stdatomic.h>
 #include <sys/mman.h>
 #include <unistd.h>
@@ -253,23 +254,356 @@ local_mark_queue_pop(struct local_mark_queue *q) {
   return q->data[q->read++ & LOCAL_MARK_QUEUE_MASK];
 }
 
+struct mark_notify {
+  size_t notifiers;
+  int pending;
+  pthread_mutex_t lock;
+  pthread_cond_t cond;
+};
+
+static void
+mark_notify_init(struct mark_notify *notify) {
+  notify->notifiers = 0;
+  notify->pending = 0;
+  pthread_mutex_init(&notify->lock, NULL);
+  pthread_cond_init(&notify->cond, NULL);
+}
+
+static void
+mark_notify_destroy(struct mark_notify *notify) {
+  pthread_mutex_destroy(&notify->lock);
+  pthread_cond_destroy(&notify->cond);
+}
+
+static void
+mark_notify_add_notifier(struct mark_notify *notify) {
+  pthread_mutex_lock(&notify->lock);
+  notify->notifiers++;
+  pthread_mutex_unlock(&notify->lock);
+}
+
+static void
+mark_notify_remove_notifier(struct mark_notify *notify) {
+  pthread_mutex_lock(&notify->lock);
+  notify->notifiers--;
+  if (notify->notifiers == 0)
+    pthread_cond_signal(&notify->cond);
+  pthread_mutex_unlock(&notify->lock);
+}
+
+enum mark_notify_status {
+  MARK_NOTIFY_DONE,
+  MARK_NOTIFY_WOKE
+};
+static enum mark_notify_status
+mark_notify_wait(struct mark_notify *notify) {
+  enum mark_notify_status res;
+
+  pthread_mutex_lock(&notify->lock);
+
+  if (notify->pending) {
+    res = MARK_NOTIFY_WOKE;
+    notify->pending = 0;
+    goto done;
+  }
+
+  if (notify->notifiers == 0) {
+    res = MARK_NOTIFY_DONE;
+    goto done;
+  }
+
+  // Spurious wakeup is OK.
+  pthread_cond_wait(&notify->cond, &notify->lock);
+  res = MARK_NOTIFY_WOKE;
+  notify->pending = 0;
+
+done:
+  pthread_mutex_unlock(&notify->lock);
+  return res;
+}
+
+static void
+mark_notify_wake(struct mark_notify *notify) {
+  pthread_mutex_lock(&notify->lock);
+  notify->pending = 1;
+  pthread_cond_signal(&notify->cond);
+  pthread_mutex_unlock(&notify->lock);
+}
+
+// A mostly lock-free multi-producer, single consumer queue, largely
+// inspired by Rust's std::sync::channel.
+//
+// 
https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+
+struct mark_channel_message {
+  struct mark_channel_message * _Atomic next;
+  // Payload will be zero only for free messages, and for the sentinel
+  // message.
+  atomic_uintptr_t payload;
+};
+
+#define MARK_CHANNEL_WRITER_MESSAGE_COUNT ((size_t)1024)
+
+struct mark_channel {
+  union {
+    struct mark_channel_message* _Atomic head;
+    char head_padding[64];
+  };
+  union {
+    atomic_size_t length;
+    char length_padding[64];
+  };
+  struct mark_channel_message* tail;
+  struct mark_channel_message sentinel;
+
+  struct mark_notify notify;
+};
+
+struct mark_channel_writer {
+  struct mark_channel_message messages[MARK_CHANNEL_WRITER_MESSAGE_COUNT];
+  size_t next_message;
+
+  struct mark_channel *channel;
+};
+
+static void
+mark_channel_init(struct mark_channel *ch) {
+  memset(ch, 0, sizeof(*ch));
+  atomic_init(&ch->head, &ch->sentinel);
+  atomic_init(&ch->length, 0);
+  mark_notify_init(&ch->notify);
+  ch->tail = &ch->sentinel;
+}
+
+static void
+mark_channel_destroy(struct mark_channel *ch) {
+  mark_notify_destroy(&ch->notify);
+}
+
+static void
+mark_channel_push(struct mark_channel *ch, struct mark_channel_message *msg) {
+  ASSERT(msg->payload);
+  atomic_store_explicit(&msg->next, NULL, memory_order_relaxed);
+
+  struct mark_channel_message *prev =
+    atomic_exchange_explicit(&ch->head, msg, memory_order_acq_rel);
+
+  atomic_store_explicit(&prev->next, msg, memory_order_release);
+
+  size_t old_length =
+    atomic_fetch_add_explicit(&ch->length, 1, memory_order_relaxed);
+  if (old_length == 0)
+    mark_notify_wake(&ch->notify);
+}
+
+static uintptr_t
+mark_channel_try_pop(struct mark_channel *ch) {
+  struct mark_channel_message *tail = ch->tail;
+  struct mark_channel_message *next =
+    atomic_load_explicit(&tail->next, memory_order_acquire);
+
+  if (next) {
+    ch->tail = next;
+    uintptr_t payload =
+      atomic_load_explicit(&next->payload, memory_order_acquire);
+    ASSERT(payload != 0);
+    // Indicate to the writer that the old tail node can now be re-used.
+    // Note though that the new tail node is floating garbage; its
+    // payload has been popped but the node itself is still part of the
+    // queue.  Care has to be taken to ensure that any remaining queue
+    // entries are popped before the associated channel writer's
+    // messages are deallocated.
+    atomic_store_explicit(&tail->payload, 0, memory_order_release);
+    atomic_fetch_sub_explicit(&ch->length, 1, memory_order_relaxed);
+    return payload;
+  }
+
+  // if (atomic_load_explicit(&ch->head) == tail) return EMPTY else 
INCONSISTENT
+  return 0;
+}
+
+static uintptr_t
+mark_channel_pop(struct mark_channel *ch) {
+  while (1) {
+    uintptr_t ret = mark_channel_try_pop(ch);
+    if (ret)
+      return ret;
+
+    if (atomic_load_explicit(&ch->length, memory_order_relaxed) == 0) {
+      if (mark_notify_wait(&ch->notify) == MARK_NOTIFY_DONE)
+        return 0;
+    }
+  }
+}
+
+static void
+mark_channel_writer_init(struct mark_channel *ch,
+                         struct mark_channel_writer *writer) {
+  memset(writer, 0, sizeof(*writer));
+  writer->channel = ch;
+}
+
+static void
+mark_channel_write(struct mark_channel_writer *writer, uintptr_t payload) {
+  ASSERT(payload);
+  struct mark_channel_message *msg = &writer->messages[writer->next_message];
+  while (atomic_load_explicit(&msg->payload, memory_order_acquire) != 0)
+    sched_yield();
+  writer->next_message++;
+  if (writer->next_message == MARK_CHANNEL_WRITER_MESSAGE_COUNT)
+    writer->next_message = 0;
+  atomic_store_explicit(&msg->payload, payload, memory_order_release);
+  mark_channel_push(writer->channel, msg);
+}
+
+static void
+mark_channel_writer_activate(struct mark_channel_writer *writer) {
+  mark_notify_add_notifier(&writer->channel->notify);
+}
+static void
+mark_channel_writer_deactivate(struct mark_channel_writer *writer) {
+  mark_notify_remove_notifier(&writer->channel->notify);
+}
+
+enum mark_worker_state {
+  MARK_WORKER_STOPPED,
+  MARK_WORKER_IDLE,
+  MARK_WORKER_MARKING,
+  MARK_WORKER_STOPPING,
+  MARK_WORKER_DEAD
+};
+
+struct mark_worker {
+  struct context *cx;
+  pthread_t thread;
+  enum mark_worker_state state;
+  pthread_mutex_t lock;
+  pthread_cond_t cond;
+  struct mark_channel_writer writer;
+};
+
+#define MARK_WORKERS_MAX_COUNT 8
+
 struct marker {
   struct mark_deque deque;
+  struct mark_channel overflow;
+  size_t worker_count;
+  struct mark_worker workers[MARK_WORKERS_MAX_COUNT];
 };
 
 struct local_marker {
-  struct local_mark_queue local;
+  struct mark_worker *worker;
   struct mark_deque *deque;
   struct context *cx;
+  struct local_mark_queue local;
 };
 
 struct context;
 static inline struct marker* context_marker(struct context *cx);
 
+static size_t number_of_current_processors(void) { return 1; }
+
+static void
+mark_worker_init(struct mark_worker *worker, struct context *cx,
+                   struct marker *marker) {
+  worker->cx = cx;
+  worker->thread = 0;
+  worker->state = MARK_WORKER_STOPPED;
+  pthread_mutex_init(&worker->lock, NULL);
+  pthread_cond_init(&worker->cond, NULL);
+  mark_channel_writer_init(&marker->overflow, &worker->writer);
+}
+
+static void mark_worker_mark(struct mark_worker *worker);
+
+static void*
+mark_worker_thread(void *data) {
+  struct mark_worker *worker = data;
+
+  pthread_mutex_lock(&worker->lock);
+  while (1) {
+    switch (worker->state) {
+    case MARK_WORKER_IDLE:
+      pthread_cond_wait(&worker->cond, &worker->lock);
+      break;
+    case MARK_WORKER_MARKING:
+      mark_worker_mark(worker);
+      worker->state = MARK_WORKER_IDLE;
+      break;
+    case MARK_WORKER_STOPPING:
+      worker->state = MARK_WORKER_DEAD;
+      pthread_mutex_unlock(&worker->lock);
+      return NULL;
+    default:
+      abort();
+    }
+  }
+}
+
+static int
+mark_worker_spawn(struct mark_worker *worker) {
+  pthread_mutex_lock(&worker->lock);
+  ASSERT(worker->state == MARK_WORKER_STOPPED);
+  worker->state = MARK_WORKER_IDLE;
+  pthread_mutex_unlock(&worker->lock);
+
+  if (pthread_create(&worker->thread, NULL, mark_worker_thread, worker)) {
+    perror("spawning marker thread failed");
+    worker->state = MARK_WORKER_STOPPED;
+    return 0;
+  }
+
+  return 1;
+}
+
+static void
+mark_worker_request_mark(struct mark_worker *worker) {
+  pthread_mutex_lock(&worker->lock);
+  ASSERT(worker->state == MARK_WORKER_IDLE);
+  mark_channel_writer_activate(&worker->writer);
+  worker->state = MARK_WORKER_MARKING;
+  pthread_cond_signal(&worker->cond);
+  pthread_mutex_unlock(&worker->lock);
+}  
+
+static void
+mark_worker_finished_marking(struct mark_worker *worker) {
+  // Signal controller that we are done with marking.
+  mark_channel_writer_deactivate(&worker->writer);
+}
+
+static void
+mark_worker_request_stop(struct mark_worker *worker) {
+  pthread_mutex_lock(&worker->lock);
+  ASSERT(worker->state == MARK_WORKER_IDLE);
+  worker->state = MARK_WORKER_STOPPING;
+  pthread_cond_signal(&worker->cond);
+  pthread_mutex_unlock(&worker->lock);
+}  
+
 static int
 marker_init(struct context *cx) {
-  return mark_deque_init(&context_marker(cx)->deque);
+  struct marker *marker = context_marker(cx);
+  if (!mark_deque_init(&marker->deque))
+    return 0;
+  mark_channel_init(&marker->overflow);
+  size_t desired_worker_count = 0;
+  if (getenv("GC_MARKERS"))
+    desired_worker_count = atoi(getenv("GC_MARKERS"));
+  if (desired_worker_count == 0)
+    desired_worker_count = number_of_current_processors();
+  if (desired_worker_count > MARK_WORKERS_MAX_COUNT)
+    desired_worker_count = MARK_WORKERS_MAX_COUNT;
+  for (size_t i = 0; i < desired_worker_count; i++) {
+    mark_worker_init(&marker->workers[i], cx, marker);
+    if (mark_worker_spawn(&marker->workers[i]))
+      marker->worker_count++;
+    else
+      break;
+  }
+  return marker->worker_count > 0;
 }
+
 static void marker_prepare(struct context *cx) {}
 static void marker_release(struct context *cx) {
   mark_deque_release(&context_marker(cx)->deque);
@@ -277,9 +611,7 @@ static void marker_release(struct context *cx) {
 
 struct gcobj;
 static inline void marker_visit(void **loc, void *mark_data) ALWAYS_INLINE;
-static inline void marker_trace(struct context *cx,
-                                void (*)(struct gcobj *, void *))
-  ALWAYS_INLINE;
+static inline void trace_one(struct gcobj *obj, void *mark_data) ALWAYS_INLINE;
 static inline int mark_object(struct context *cx,
                               struct gcobj *obj) ALWAYS_INLINE;
 
@@ -290,26 +622,22 @@ marker_visit(void **loc, void *mark_data) {
   if (obj && mark_object(mark->cx, obj)) {
     if (!local_mark_queue_full(&mark->local))
       local_mark_queue_push(&mark->local, (uintptr_t)obj);
-    else
-      mark_deque_push(mark->deque, (uintptr_t)obj);
+    else {
+      mark_channel_write(&mark->worker->writer, (uintptr_t)obj);
+    }
   }
 }
-static inline void
-marker_visit_root(void **loc, struct context *cx) {
-  struct local_marker mark;
-  local_mark_queue_poison(&mark.local);
-  mark.deque = &context_marker(cx)->deque;
-  mark.cx = cx;
-  marker_visit(loc, &mark);
-}
-static inline void
-marker_trace(struct context *cx,
-             void (*trace_one)(struct gcobj *, void *)) {
+
+static void
+mark_worker_mark(struct mark_worker *worker) {
   struct local_marker mark;
+  mark.worker = worker;
+  mark.deque = &context_marker(worker->cx)->deque;
+  mark.cx = worker->cx;
   local_mark_queue_init(&mark.local);
-  mark.deque = &context_marker(cx)->deque;
-  mark.cx = cx;
 
+  size_t n = 0;
+  DEBUG("marker %p: running mark loop\n", worker);
   while (1) {
     uintptr_t addr;
     if (!local_mark_queue_empty(&mark.local)) {
@@ -322,7 +650,56 @@ marker_trace(struct context *cx,
         continue;
     }
     trace_one((struct gcobj*)addr, &mark);
+    n++;
+  }
+  DEBUG("marker %p: done marking, %zu objects traced\n", worker, n);
+
+  mark_worker_finished_marking(worker);
+}
+
+static inline void
+marker_visit_root(void **loc, struct context *cx) {
+  struct gcobj *obj = *loc;
+  if (obj && mark_object(cx, obj))
+    mark_deque_push(&context_marker(cx)->deque, (uintptr_t)obj);
+}
+
+static inline void
+marker_trace(struct context *cx) {
+  struct marker *marker = context_marker(cx);
+
+  DEBUG("starting trace; %zu workers\n", marker->worker_count);
+  while (1) {
+    DEBUG("waking workers\n");
+    for (size_t i = 0; i < marker->worker_count; i++)
+      mark_worker_request_mark(&marker->workers[i]);
+
+    DEBUG("running controller loop\n");
+    size_t n = 0;
+    while (1) {
+      uintptr_t addr = mark_channel_pop(&marker->overflow);
+      if (!addr)
+        break;
+      mark_deque_push(&marker->deque, addr);
+      n++;
+    }
+    DEBUG("controller loop done, %zu objects sent for rebalancing\n", n);
+
+    // As in the ISMM'16 paper, it's possible that a worker decides to
+    // stop because the deque is empty, but actually there was an
+    // in-flight object in the mark channel that we hadn't been able to
+    // push yet.  Loop in that case.
+    {
+      uintptr_t addr = mark_deque_try_pop(&marker->deque);
+      if (addr == mark_deque_empty)
+        break;
+      DEBUG("--> controller looping again due to slop\n");
+      mark_deque_push(&marker->deque, addr);
+    }
   }
+  ASSERT(atomic_load(&marker->overflow.length) == 0);
+  ASSERT(atomic_load(&marker->overflow.head) == marker->overflow.tail);
+  DEBUG("trace finished\n");
 }
 
 #endif // SERIAL_MARK_H
diff --git a/serial-marker.h b/serial-marker.h
index 1c2e305a7..719ba1c51 100644
--- a/serial-marker.h
+++ b/serial-marker.h
@@ -126,9 +126,7 @@ static void marker_release(struct context *cx) {
 
 struct gcobj;
 static inline void marker_visit(void **loc, void *mark_data) ALWAYS_INLINE;
-static inline void marker_trace(struct context *cx,
-                                void (*)(struct gcobj *, void *))
-  ALWAYS_INLINE;
+static inline void trace_one(struct gcobj *obj, void *mark_data) ALWAYS_INLINE;
 static inline int mark_object(struct context *cx,
                               struct gcobj *obj) ALWAYS_INLINE;
 
@@ -144,8 +142,7 @@ marker_visit_root(void **loc, struct context *cx) {
   marker_visit(loc, cx);
 }
 static inline void
-marker_trace(struct context *cx,
-             void (*trace_one)(struct gcobj *, void *)) {
+marker_trace(struct context *cx) {
   struct gcobj *obj;
   while ((obj = mark_queue_pop(&context_marker(cx)->queue)))
     trace_one(obj, cx);

Reply via email to