cbaines pushed a commit to branch master
in repository data-service.

commit 2a80304e0c6b9c3c6fbb65afe8f2249d9b23fc1c
Author: Christopher Baines <[email protected]>
AuthorDate: Sat Jul 20 00:20:20 2024 +0100

    Add worker thread utils
---
 guix-data-service/utils.scm | 232 ++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 232 insertions(+)

diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index 736e24d..e5b4a45 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -29,6 +29,8 @@
   #:use-module (ice-9 ports internal)
   #:use-module (ice-9 suspendable-ports)
   #:use-module (lzlib)
+  #:use-module ((guix build syscalls)
+                #:select (set-thread-name))
   #:use-module (fibers)
   #:use-module (fibers channels)
   #:use-module (fibers operations)
@@ -48,6 +50,11 @@
             with-resource-from-pool
             resource-pool-stats
 
+            make-worker-thread-channel
+            %worker-thread-default-timeout
+            call-with-worker-thread
+            worker-thread-timeout-error?
+
             fibers-delay
             fibers-force
 
@@ -464,6 +471,231 @@ available.  Return the resource once PROC has returned."
           (raise-exception
            (make-resource-pool-timeout-error))))))
 
+(define %worker-thread-args
+  (make-parameter #f))
+
+(define* (make-worker-thread-channel initializer
+                                     #:key (parallelism 1)
+                                     (delay-logger (lambda _ #f))
+                                     (duration-logger (const #f))
+                                     destructor
+                                     lifetime
+                                     (log-exception? (const #t))
+                                     (expire-on-exception? #f)
+                                     (name "unnamed"))
+  "Return a channel used to offload work to a dedicated thread.  ARGS are the
+arguments of the worker thread procedure."
+  (define thread-proc-vector
+    (make-vector parallelism #f))
+
+  (define (initializer/safe)
+    (let ((args
+           (with-exception-handler
+               (lambda (exn)
+                 (simple-format
+                  (current-error-port)
+                  "exception running initializer in worker thread (~A): ~A:\n  
~A\n"
+                  name
+                  initializer
+                  exn)
+                 #f)
+             (lambda ()
+               (with-throw-handler #t
+                 initializer
+                 (lambda args
+                   (backtrace))))
+             #:unwind? #t)))
+
+      (if args
+          args
+          ;; never give up, just keep retrying
+          (begin
+            (sleep 1)
+            (initializer/safe)))))
+
+  (define (destructor/safe args)
+    (let ((success?
+           (with-exception-handler
+               (lambda (exn)
+                 (simple-format
+                  (current-error-port)
+                  "exception running destructor in worker thread (~A): ~A:\n  
~A\n"
+                  name
+                  destructor
+                  exn)
+                 #f)
+             (lambda ()
+               (with-throw-handler #t
+                 (lambda ()
+                   (apply destructor args)
+                   #t)
+                 (lambda _
+                   (backtrace))))
+             #:unwind? #t)))
+
+      (or success?
+          #t
+          (begin
+            (sleep 1)
+            (destructor/safe args)))))
+
+  (define (process thread-index channel args)
+    (let loop ((current-lifetime lifetime))
+      (let ((exception?
+             (match (get-message channel)
+               (((? channel? reply) sent-time (? procedure? proc))
+                (let ((time-delay
+                       (- (get-internal-real-time)
+                          sent-time)))
+                  (delay-logger (/ time-delay
+                                   internal-time-units-per-second))
+
+                  (let* ((start-time (get-internal-real-time))
+                         (response
+                          (with-exception-handler
+                              (lambda (exn)
+                                (list 'worker-thread-error
+                                      (/ (- (get-internal-real-time)
+                                            start-time)
+                                         internal-time-units-per-second)
+                                      exn))
+                            (lambda ()
+                              (vector-set! thread-proc-vector
+                                           thread-index
+                                           proc)
+                              (with-throw-handler #t
+                                (lambda ()
+                                  (call-with-values
+                                      (lambda ()
+                                        (start-stack
+                                         'worker-thread
+                                         (apply proc args)))
+                                    (lambda vals
+                                      (cons (/ (- (get-internal-real-time)
+                                                  start-time)
+                                               internal-time-units-per-second)
+                                            vals))))
+                                (lambda args
+                                  (when (match args
+                                          (('%exception exn)
+                                           (log-exception? exn))
+                                          (_ #t))
+                                    (simple-format
+                                     (current-error-port)
+                                     "worker-thread: exception: ~A\n" args)
+                                    (backtrace)))))
+                            #:unwind? #t)))
+                    (put-message reply
+                                 response)
+
+                    (vector-set! thread-proc-vector
+                                 thread-index
+                                 #f)
+
+                    (match response
+                      (('worker-thread-error duration _)
+                       (when duration-logger
+                         (duration-logger duration proc))
+                       #t)
+                      ((duration . _)
+                       (when duration-logger
+                         (duration-logger duration proc))
+                       #f))))))))
+        (unless (and expire-on-exception?
+                     exception?)
+          (if (number? current-lifetime)
+              (unless (< current-lifetime 0)
+                (loop (if current-lifetime
+                          (- current-lifetime 1)
+                          #f)))
+              (loop #f))))))
+
+  (let ((channel (make-channel)))
+    (for-each
+     (lambda (thread-index)
+       (call-with-new-thread
+        (lambda ()
+          (catch 'system-error
+            (lambda ()
+              (set-thread-name
+               (string-append
+                name " w t "
+                (number->string thread-index))))
+            (const #t))
+
+          (let init ((args (initializer/safe)))
+            (with-exception-handler
+                (lambda (exn)
+                  (simple-format
+                   (current-error-port)
+                   "worker-thread-channel: exception: ~A\n" exn))
+              (lambda ()
+                (parameterize ((%worker-thread-args args))
+                  (process thread-index channel args)))
+              #:unwind? #t)
+
+            (when destructor
+              (destructor/safe args))
+
+            (init (initializer/safe))))))
+     (iota parallelism))
+
+    (values channel
+            thread-proc-vector)))
+
+(define &worker-thread-timeout
+  (make-exception-type '&worker-thread-timeout
+                       &error
+                       '()))
+
+(define make-worker-thread-timeout-error
+  (record-constructor &worker-thread-timeout))
+
+(define worker-thread-timeout-error?
+  (record-predicate &worker-thread-timeout))
+
+(define %worker-thread-default-timeout
+  (make-parameter 30))
+
+(define* (call-with-worker-thread channel proc #:key duration-logger
+                                  (timeout (%worker-thread-default-timeout)))
+  "Send PROC to the worker thread through CHANNEL.  Return the result of PROC.
+If already in the worker thread, call PROC immediately."
+  (let ((args (%worker-thread-args)))
+    (if args
+        (apply proc args)
+        (let* ((reply (make-channel))
+               (operation-success?
+                (perform-operation
+                 (let ((put
+                        (wrap-operation
+                         (put-operation channel
+                                        (list reply
+                                              (get-internal-real-time)
+                                              proc))
+                         (const #t))))
+
+                   (if timeout
+                       (choice-operation
+                        put
+                        (wrap-operation (sleep-operation timeout)
+                                        (const #f)))
+                       put)))))
+
+          (unless operation-success?
+            (raise-exception
+             (make-worker-thread-timeout-error)))
+
+          (match (get-message reply)
+            (('worker-thread-error duration exn)
+             (when duration-logger
+               (duration-logger duration))
+             (raise-exception exn))
+            ((duration . result)
+             (when duration-logger
+               (duration-logger duration))
+             (apply values result)))))))
+
 (define-record-type <fibers-promise>
   (make-fibers-promise thunk values-box evaluated-condition)
   fibers-promise?

Reply via email to