wingo pushed a commit to branch wip-whippet
in repository guile.

commit dd3953ef1a8aaa6207c572d152f2faf876b82646
Author: Andy Wingo <wi...@igalia.com>
AuthorDate: Mon Jul 8 10:42:58 2024 +0200

    Factor trace deque out to shared-worklist.h
    
    Also increase alignment to account for cache line prefetcher.
---
 src/gc-align.h        |   5 +
 src/parallel-tracer.h | 271 +++-----------------------------------------------
 src/shared-worklist.h | 259 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 280 insertions(+), 255 deletions(-)

diff --git a/src/gc-align.h b/src/gc-align.h
index 117d1cb47..c0758b1e0 100644
--- a/src/gc-align.h
+++ b/src/gc-align.h
@@ -14,4 +14,9 @@ static inline uintptr_t align_up(uintptr_t addr, size_t 
align) {
   return align_down(addr + align - 1, align);
 }
 
+// Poor man's equivalent of std::hardware_destructive_interference_size.
+#define AVOID_FALSE_SHARING 128
+#define ALIGNED_TO_AVOID_FALSE_SHARING \
+  __attribute__((aligned(AVOID_FALSE_SHARING)))
+
 #endif // GC_ALIGN_H
diff --git a/src/parallel-tracer.h b/src/parallel-tracer.h
index b88f1e792..3b4a8d0f1 100644
--- a/src/parallel-tracer.h
+++ b/src/parallel-tracer.h
@@ -9,249 +9,9 @@
 #include "assert.h"
 #include "debug.h"
 #include "gc-inline.h"
+#include "shared-worklist.h"
 #include "spin.h"
 
-// The Chase-Lev work-stealing deque, as initially described in "Dynamic
-// Circular Work-Stealing Deque" (Chase and Lev, SPAA'05)
-// (https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf)
-// and improved with C11 atomics in "Correct and Efficient Work-Stealing
-// for Weak Memory Models" (Lê et al, PPoPP'13)
-// (http://www.di.ens.fr/%7Ezappa/readings/ppopp13.pdf).
-
-struct trace_buf {
-  unsigned log_size;
-  size_t size;
-  uintptr_t *data;
-};
-
-// Min size: 8 kB on 64-bit systems, 4 kB on 32-bit.
-#define trace_buf_min_log_size ((unsigned) 10)
-// Max size: 2 GB on 64-bit systems, 1 GB on 32-bit.
-#define trace_buf_max_log_size ((unsigned) 28)
-
-static int
-trace_buf_init(struct trace_buf *buf, unsigned log_size) {
-  ASSERT(log_size >= trace_buf_min_log_size);
-  ASSERT(log_size <= trace_buf_max_log_size);
-  size_t size = (1 << log_size) * sizeof(uintptr_t);
-  void *mem = mmap(NULL, size, PROT_READ|PROT_WRITE,
-                   MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
-  if (mem == MAP_FAILED) {
-    perror("Failed to grow work-stealing dequeue");
-    DEBUG("Failed to allocate %zu bytes", size);
-    return 0;
-  }
-  buf->log_size = log_size;
-  buf->size = 1 << log_size;
-  buf->data = mem;
-  return 1;
-}
-  
-static inline size_t
-trace_buf_size(struct trace_buf *buf) {
-  return buf->size;
-}
-
-static inline size_t
-trace_buf_byte_size(struct trace_buf *buf) {
-  return trace_buf_size(buf) * sizeof(uintptr_t);
-}
-
-static void
-trace_buf_release(struct trace_buf *buf) {
-  if (buf->data)
-    madvise(buf->data, trace_buf_byte_size(buf), MADV_DONTNEED);
-}
-
-static void
-trace_buf_destroy(struct trace_buf *buf) {
-  if (buf->data) {
-    munmap(buf->data, trace_buf_byte_size(buf));
-    buf->data = NULL;
-    buf->log_size = 0;
-    buf->size = 0;
-  }
-}
-
-static inline struct gc_ref
-trace_buf_get(struct trace_buf *buf, size_t i) {
-  return gc_ref(atomic_load_explicit(&buf->data[i & (buf->size - 1)],
-                                     memory_order_relaxed));
-}
-
-static inline void
-trace_buf_put(struct trace_buf *buf, size_t i, struct gc_ref ref) {
-  return atomic_store_explicit(&buf->data[i & (buf->size - 1)],
-                               gc_ref_value(ref),
-                               memory_order_relaxed);
-}
-
-static inline int
-trace_buf_grow(struct trace_buf *from, struct trace_buf *to,
-              size_t b, size_t t) {
-  if (from->log_size == trace_buf_max_log_size)
-    return 0;
-  if (!trace_buf_init (to, from->log_size + 1))
-    return 0;
-  for (size_t i=t; i<b; i++)
-    trace_buf_put(to, i, trace_buf_get(from, i));
-  return 1;
-}
-
-// Chase-Lev work-stealing deque.  One thread pushes data into the deque
-// at the bottom, and many threads compete to steal data from the top.
-struct trace_deque {
-  // Ensure bottom and top are on different cache lines.
-  union {
-    atomic_size_t bottom;
-    char bottom_padding[64];
-  };
-  union {
-    atomic_size_t top;
-    char top_padding[64];
-  };
-  atomic_int active; // Which trace_buf is active.
-  struct trace_buf bufs[(trace_buf_max_log_size - trace_buf_min_log_size) + 1];
-};
-
-#define LOAD_RELAXED(loc) atomic_load_explicit(loc, memory_order_relaxed)
-#define STORE_RELAXED(loc, o) atomic_store_explicit(loc, o, 
memory_order_relaxed)
-
-#define LOAD_ACQUIRE(loc) atomic_load_explicit(loc, memory_order_acquire)
-#define STORE_RELEASE(loc, o) atomic_store_explicit(loc, o, 
memory_order_release)
-
-#define LOAD_CONSUME(loc) atomic_load_explicit(loc, memory_order_consume)
-
-static int
-trace_deque_init(struct trace_deque *q) {
-  memset(q, 0, sizeof (*q));
-  int ret = trace_buf_init(&q->bufs[0], trace_buf_min_log_size);
-  // Note, this fence isn't in the paper, I added it out of caution.
-  atomic_thread_fence(memory_order_release);
-  return ret;
-}
-
-static void
-trace_deque_release(struct trace_deque *q) {
-  for (int i = LOAD_RELAXED(&q->active); i >= 0; i--)
-    trace_buf_release(&q->bufs[i]);
-}
-
-static void
-trace_deque_destroy(struct trace_deque *q) {
-  for (int i = LOAD_RELAXED(&q->active); i >= 0; i--)
-    trace_buf_destroy(&q->bufs[i]);
-}
-
-static int
-trace_deque_grow(struct trace_deque *q, int cur, size_t b, size_t t) {
-  if (!trace_buf_grow(&q->bufs[cur], &q->bufs[cur + 1], b, t)) {
-    fprintf(stderr, "failed to grow deque!!\n");
-    GC_CRASH();
-  }
-
-  cur++;
-  STORE_RELAXED(&q->active, cur);
-  return cur;
-}
-
-static void
-trace_deque_push(struct trace_deque *q, struct gc_ref x) {
-  size_t b = LOAD_RELAXED(&q->bottom);
-  size_t t = LOAD_ACQUIRE(&q->top);
-  int active = LOAD_RELAXED(&q->active);
-
-  ssize_t size = b - t;
-  if (size > trace_buf_size(&q->bufs[active]) - 1) /* Full queue. */
-    active = trace_deque_grow(q, active, b, t);
-
-  trace_buf_put(&q->bufs[active], b, x);
-  atomic_thread_fence(memory_order_release);
-  STORE_RELAXED(&q->bottom, b + 1);
-}
-
-static void
-trace_deque_push_many(struct trace_deque *q, struct gc_ref *objv, size_t 
count) {
-  size_t b = LOAD_RELAXED(&q->bottom);
-  size_t t = LOAD_ACQUIRE(&q->top);
-  int active = LOAD_RELAXED(&q->active);
-
-  ssize_t size = b - t;
-  while (size > trace_buf_size(&q->bufs[active]) - count) /* Full queue. */
-    active = trace_deque_grow(q, active, b, t);
-
-  for (size_t i = 0; i < count; i++)
-    trace_buf_put(&q->bufs[active], b + i, objv[i]);
-  atomic_thread_fence(memory_order_release);
-  STORE_RELAXED(&q->bottom, b + count);
-}
-
-static struct gc_ref
-trace_deque_try_pop(struct trace_deque *q) {
-  size_t b = LOAD_RELAXED(&q->bottom);
-  int active = LOAD_RELAXED(&q->active);
-  STORE_RELAXED(&q->bottom, b - 1);
-  atomic_thread_fence(memory_order_seq_cst);
-  size_t t = LOAD_RELAXED(&q->top);
-  struct gc_ref x;
-  ssize_t size = b - t;
-  if (size > 0) { // Non-empty queue.
-    x = trace_buf_get(&q->bufs[active], b - 1);
-    if (size == 1) { // Single last element in queue.
-      if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
-                                                   memory_order_seq_cst,
-                                                   memory_order_relaxed))
-        // Failed race.
-        x = gc_ref_null();
-      STORE_RELAXED(&q->bottom, b);
-    }
-  } else { // Empty queue.
-    x = gc_ref_null();
-    STORE_RELAXED(&q->bottom, b);
-  }
-  return x;
-}
-
-static struct gc_ref
-trace_deque_steal(struct trace_deque *q) {
-  while (1) {
-    size_t t = LOAD_ACQUIRE(&q->top);
-    atomic_thread_fence(memory_order_seq_cst);
-    size_t b = LOAD_ACQUIRE(&q->bottom);
-    ssize_t size = b - t;
-    if (size <= 0)
-      return gc_ref_null();
-    int active = LOAD_CONSUME(&q->active);
-    struct gc_ref ref = trace_buf_get(&q->bufs[active], t);
-    if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
-                                                 memory_order_seq_cst,
-                                                 memory_order_relaxed))
-      // Failed race.
-      continue;
-    return ref;
-  }
-}
-
-static ssize_t
-trace_deque_size(struct trace_deque *q) {
-  size_t t = LOAD_ACQUIRE(&q->top);
-  atomic_thread_fence(memory_order_seq_cst);
-  size_t b = LOAD_ACQUIRE(&q->bottom);
-  ssize_t size = b - t;
-  return size;
-}
-
-static int
-trace_deque_can_steal(struct trace_deque *q) {
-  return trace_deque_size(q) > 0;
-}
-
-#undef LOAD_RELAXED
-#undef STORE_RELAXED
-#undef LOAD_ACQUIRE
-#undef STORE_RELEASE
-#undef LOAD_CONSUME
-
 #define LOCAL_TRACE_QUEUE_SIZE 1024
 #define LOCAL_TRACE_QUEUE_MASK (LOCAL_TRACE_QUEUE_SIZE - 1)
 #define LOCAL_TRACE_QUEUE_SHARE_AMOUNT (LOCAL_TRACE_QUEUE_SIZE * 3 / 4)
@@ -319,7 +79,7 @@ struct trace_worker {
   pthread_t thread;
   enum trace_worker_state state;
   pthread_mutex_t lock;
-  struct trace_deque deque;
+  struct shared_worklist deque;
 };
 
 #define TRACE_WORKERS_MAX_COUNT 8
@@ -335,7 +95,7 @@ struct tracer {
 
 struct local_tracer {
   struct trace_worker *worker;
-  struct trace_deque *share_deque;
+  struct shared_worklist *share_deque;
   struct local_trace_queue local;
 };
 
@@ -350,7 +110,7 @@ trace_worker_init(struct trace_worker *worker, struct 
gc_heap *heap,
   worker->steal_id = 0;
   worker->thread = 0;
   pthread_mutex_init(&worker->lock, NULL);
-  return trace_deque_init(&worker->deque);
+  return shared_worklist_init(&worker->deque);
 }
 
 static void trace_worker_trace(struct trace_worker *worker);
@@ -416,7 +176,7 @@ static void tracer_prepare(struct gc_heap *heap) {
 static void tracer_release(struct gc_heap *heap) {
   struct tracer *tracer = heap_tracer(heap);
   for (size_t i = 0; i < tracer->worker_count; i++)
-    trace_deque_release(&tracer->workers[i].deque);
+    shared_worklist_release(&tracer->workers[i].deque);
 }
 
 static inline void
@@ -453,7 +213,7 @@ tracer_share(struct local_tracer *trace) {
   while (to_share) {
     struct gc_ref *objv;
     size_t count = local_trace_queue_pop_many(&trace->local, &objv, to_share);
-    trace_deque_push_many(trace->share_deque, objv, count);
+    shared_worklist_push_many(trace->share_deque, objv, count);
     to_share -= count;
   }
   tracer_maybe_unpark_workers(heap_tracer(trace->worker->heap));
@@ -476,13 +236,13 @@ tracer_visit(struct gc_edge edge, struct gc_heap *heap, 
void *trace_data) {
 static struct gc_ref
 tracer_steal_from_worker(struct tracer *tracer, size_t id) {
   ASSERT(id < tracer->worker_count);
-  return trace_deque_steal(&tracer->workers[id].deque);
+  return shared_worklist_steal(&tracer->workers[id].deque);
 }
 
 static int
 tracer_can_steal_from_worker(struct tracer *tracer, size_t id) {
   ASSERT(id < tracer->worker_count);
-  return trace_deque_can_steal(&tracer->workers[id].deque);
+  return shared_worklist_can_steal(&tracer->workers[id].deque);
 }
 
 static struct gc_ref
@@ -502,7 +262,8 @@ trace_worker_steal_from_any(struct trace_worker *worker, 
struct tracer *tracer)
 }
 
 static int
-trace_worker_can_steal_from_any(struct trace_worker *worker, struct tracer 
*tracer) {
+trace_worker_can_steal_from_any(struct trace_worker *worker,
+                                struct tracer *tracer) {
   DEBUG("tracer #%zu: checking if any worker has tasks\n", worker->id);
   for (size_t i = 0; i < tracer->worker_count; i++) {
     int res = tracer_can_steal_from_worker(tracer, worker->steal_id);
@@ -561,7 +322,7 @@ trace_worker_steal(struct local_tracer *trace) {
   // something from the worker's own queue.
   {
     DEBUG("tracer #%zu: trying to pop worker's own deque\n", worker->id);
-    struct gc_ref obj = trace_deque_try_pop(&worker->deque);
+    struct gc_ref obj = shared_worklist_try_pop(&worker->deque);
     if (gc_ref_is_heap_object(obj))
       return obj;
   }
@@ -610,15 +371,15 @@ trace_worker_trace(struct trace_worker *worker) {
 
 static inline void
 tracer_enqueue_root(struct tracer *tracer, struct gc_ref ref) {
-  struct trace_deque *worker0_deque = &tracer->workers[0].deque;
-  trace_deque_push(worker0_deque, ref);
+  struct shared_worklist *worker0_deque = &tracer->workers[0].deque;
+  shared_worklist_push(worker0_deque, ref);
 }
 
 static inline void
 tracer_enqueue_roots(struct tracer *tracer, struct gc_ref *objv,
                      size_t count) {
-  struct trace_deque *worker0_deque = &tracer->workers[0].deque;
-  trace_deque_push_many(worker0_deque, objv, count);
+  struct shared_worklist *worker0_deque = &tracer->workers[0].deque;
+  shared_worklist_push_many(worker0_deque, objv, count);
 }
 
 static inline void
@@ -629,7 +390,7 @@ tracer_trace(struct gc_heap *heap) {
 
   ssize_t parallel_threshold =
     LOCAL_TRACE_QUEUE_SIZE - LOCAL_TRACE_QUEUE_SHARE_AMOUNT;
-  if (trace_deque_size(&tracer->workers[0].deque) >= parallel_threshold) {
+  if (shared_worklist_size(&tracer->workers[0].deque) >= parallel_threshold) {
     DEBUG("waking workers\n");
     tracer_unpark_all_workers(tracer);
   } else {
diff --git a/src/shared-worklist.h b/src/shared-worklist.h
new file mode 100644
index 000000000..6d5ed3315
--- /dev/null
+++ b/src/shared-worklist.h
@@ -0,0 +1,259 @@
+#ifndef SHARED_WORKLIST_H
+#define SHARED_WORKLIST_H
+
+#include <stdatomic.h>
+#include <sys/mman.h>
+#include <unistd.h>
+
+#include "assert.h"
+#include "debug.h"
+#include "gc-align.h"
+#include "gc-inline.h"
+#include "spin.h"
+
+// The Chase-Lev work-stealing deque, as initially described in "Dynamic
+// Circular Work-Stealing Deque" (Chase and Lev, SPAA'05)
+// (https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf)
+// and improved with C11 atomics in "Correct and Efficient Work-Stealing
+// for Weak Memory Models" (Lê et al, PPoPP'13)
+// (http://www.di.ens.fr/%7Ezappa/readings/ppopp13.pdf).
+
+struct shared_worklist_buf {
+  unsigned log_size;
+  size_t size;
+  uintptr_t *data;
+};
+
+// Min size: 8 kB on 64-bit systems, 4 kB on 32-bit.
+#define shared_worklist_buf_min_log_size ((unsigned) 10)
+// Max size: 2 GB on 64-bit systems, 1 GB on 32-bit.
+#define shared_worklist_buf_max_log_size ((unsigned) 28)
+
+static int
+shared_worklist_buf_init(struct shared_worklist_buf *buf, unsigned log_size) {
+  ASSERT(log_size >= shared_worklist_buf_min_log_size);
+  ASSERT(log_size <= shared_worklist_buf_max_log_size);
+  size_t size = (1 << log_size) * sizeof(uintptr_t);
+  void *mem = mmap(NULL, size, PROT_READ|PROT_WRITE,
+                   MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
+  if (mem == MAP_FAILED) {
+    perror("Failed to grow work-stealing dequeue");
+    DEBUG("Failed to allocate %zu bytes", size);
+    return 0;
+  }
+  buf->log_size = log_size;
+  buf->size = 1 << log_size;
+  buf->data = mem;
+  return 1;
+}
+  
+static inline size_t
+shared_worklist_buf_size(struct shared_worklist_buf *buf) {
+  return buf->size;
+}
+
+static inline size_t
+shared_worklist_buf_byte_size(struct shared_worklist_buf *buf) {
+  return shared_worklist_buf_size(buf) * sizeof(uintptr_t);
+}
+
+static void
+shared_worklist_buf_release(struct shared_worklist_buf *buf) {
+  if (buf->data)
+    madvise(buf->data, shared_worklist_buf_byte_size(buf), MADV_DONTNEED);
+}
+
+static void
+shared_worklist_buf_destroy(struct shared_worklist_buf *buf) {
+  if (buf->data) {
+    munmap(buf->data, shared_worklist_buf_byte_size(buf));
+    buf->data = NULL;
+    buf->log_size = 0;
+    buf->size = 0;
+  }
+}
+
+static inline struct gc_ref
+shared_worklist_buf_get(struct shared_worklist_buf *buf, size_t i) {
+  return gc_ref(atomic_load_explicit(&buf->data[i & (buf->size - 1)],
+                                     memory_order_relaxed));
+}
+
+static inline void
+shared_worklist_buf_put(struct shared_worklist_buf *buf, size_t i,
+                        struct gc_ref ref) {
+  return atomic_store_explicit(&buf->data[i & (buf->size - 1)],
+                               gc_ref_value(ref),
+                               memory_order_relaxed);
+}
+
+static inline int
+shared_worklist_buf_grow(struct shared_worklist_buf *from,
+                         struct shared_worklist_buf *to, size_t b, size_t t) {
+  if (from->log_size == shared_worklist_buf_max_log_size)
+    return 0;
+  if (!shared_worklist_buf_init (to, from->log_size + 1))
+    return 0;
+  for (size_t i=t; i<b; i++)
+    shared_worklist_buf_put(to, i, shared_worklist_buf_get(from, i));
+  return 1;
+}
+
+// Chase-Lev work-stealing deque.  One thread pushes data into the deque
+// at the bottom, and many threads compete to steal data from the top.
+struct shared_worklist {
+  // Ensure bottom and top are on different cache lines.
+  union {
+    atomic_size_t bottom;
+    char bottom_padding[AVOID_FALSE_SHARING];
+  };
+  union {
+    atomic_size_t top;
+    char top_padding[AVOID_FALSE_SHARING];
+  };
+  atomic_int active; // Which shared_worklist_buf is active.
+  struct shared_worklist_buf bufs[(shared_worklist_buf_max_log_size -
+                                   shared_worklist_buf_min_log_size) + 1];
+};
+
+#define LOAD_RELAXED(loc) atomic_load_explicit(loc, memory_order_relaxed)
+#define STORE_RELAXED(loc, o) atomic_store_explicit(loc, o, 
memory_order_relaxed)
+
+#define LOAD_ACQUIRE(loc) atomic_load_explicit(loc, memory_order_acquire)
+#define STORE_RELEASE(loc, o) atomic_store_explicit(loc, o, 
memory_order_release)
+
+#define LOAD_CONSUME(loc) atomic_load_explicit(loc, memory_order_consume)
+
+static int
+shared_worklist_init(struct shared_worklist *q) {
+  memset(q, 0, sizeof (*q));
+  int ret = shared_worklist_buf_init(&q->bufs[0],
+                                     shared_worklist_buf_min_log_size);
+  // Note, this fence isn't in the paper, I added it out of caution.
+  atomic_thread_fence(memory_order_release);
+  return ret;
+}
+
+static void
+shared_worklist_release(struct shared_worklist *q) {
+  for (int i = LOAD_RELAXED(&q->active); i >= 0; i--)
+    shared_worklist_buf_release(&q->bufs[i]);
+}
+
+static void
+shared_worklist_destroy(struct shared_worklist *q) {
+  for (int i = LOAD_RELAXED(&q->active); i >= 0; i--)
+    shared_worklist_buf_destroy(&q->bufs[i]);
+}
+
+static int
+shared_worklist_grow(struct shared_worklist *q, int cur, size_t b, size_t t) {
+  if (!shared_worklist_buf_grow(&q->bufs[cur], &q->bufs[cur + 1], b, t)) {
+    fprintf(stderr, "failed to grow deque!!\n");
+    GC_CRASH();
+  }
+
+  cur++;
+  STORE_RELAXED(&q->active, cur);
+  return cur;
+}
+
+static void
+shared_worklist_push(struct shared_worklist *q, struct gc_ref x) {
+  size_t b = LOAD_RELAXED(&q->bottom);
+  size_t t = LOAD_ACQUIRE(&q->top);
+  int active = LOAD_RELAXED(&q->active);
+
+  ssize_t size = b - t;
+  if (size > shared_worklist_buf_size(&q->bufs[active]) - 1)
+    active = shared_worklist_grow(q, active, b, t); /* Full queue; grow. */
+
+  shared_worklist_buf_put(&q->bufs[active], b, x);
+  atomic_thread_fence(memory_order_release);
+  STORE_RELAXED(&q->bottom, b + 1);
+}
+
+static void
+shared_worklist_push_many(struct shared_worklist *q, struct gc_ref *objv,
+                          size_t count) {
+  size_t b = LOAD_RELAXED(&q->bottom);
+  size_t t = LOAD_ACQUIRE(&q->top);
+  int active = LOAD_RELAXED(&q->active);
+
+  ssize_t size = b - t;
+  while (size > shared_worklist_buf_size(&q->bufs[active]) - count)
+    active = shared_worklist_grow(q, active, b, t); /* Full queue; grow. */
+
+  for (size_t i = 0; i < count; i++)
+    shared_worklist_buf_put(&q->bufs[active], b + i, objv[i]);
+  atomic_thread_fence(memory_order_release);
+  STORE_RELAXED(&q->bottom, b + count);
+}
+
+static struct gc_ref
+shared_worklist_try_pop(struct shared_worklist *q) {
+  size_t b = LOAD_RELAXED(&q->bottom);
+  int active = LOAD_RELAXED(&q->active);
+  STORE_RELAXED(&q->bottom, b - 1);
+  atomic_thread_fence(memory_order_seq_cst);
+  size_t t = LOAD_RELAXED(&q->top);
+  struct gc_ref x;
+  ssize_t size = b - t;
+  if (size > 0) { // Non-empty queue.
+    x = shared_worklist_buf_get(&q->bufs[active], b - 1);
+    if (size == 1) { // Single last element in queue.
+      if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
+                                                   memory_order_seq_cst,
+                                                   memory_order_relaxed))
+        // Failed race.
+        x = gc_ref_null();
+      STORE_RELAXED(&q->bottom, b);
+    }
+  } else { // Empty queue.
+    x = gc_ref_null();
+    STORE_RELAXED(&q->bottom, b);
+  }
+  return x;
+}
+
+static struct gc_ref
+shared_worklist_steal(struct shared_worklist *q) {
+  while (1) {
+    size_t t = LOAD_ACQUIRE(&q->top);
+    atomic_thread_fence(memory_order_seq_cst);
+    size_t b = LOAD_ACQUIRE(&q->bottom);
+    ssize_t size = b - t;
+    if (size <= 0)
+      return gc_ref_null();
+    int active = LOAD_CONSUME(&q->active);
+    struct gc_ref ref = shared_worklist_buf_get(&q->bufs[active], t);
+    if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
+                                                 memory_order_seq_cst,
+                                                 memory_order_relaxed))
+      // Failed race.
+      continue;
+    return ref;
+  }
+}
+
+static ssize_t
+shared_worklist_size(struct shared_worklist *q) {
+  size_t t = LOAD_ACQUIRE(&q->top);
+  atomic_thread_fence(memory_order_seq_cst);
+  size_t b = LOAD_ACQUIRE(&q->bottom);
+  ssize_t size = b - t;
+  return size;
+}
+
+static int
+shared_worklist_can_steal(struct shared_worklist *q) {
+  return shared_worklist_size(q) > 0;
+}
+
+#undef LOAD_RELAXED
+#undef STORE_RELAXED
+#undef LOAD_ACQUIRE
+#undef STORE_RELEASE
+#undef LOAD_CONSUME
+
+#endif // SHARED_WORKLIST_H

Reply via email to