branch: main
commit 52abfc170ed705c845d5a42b4ba9f0488383a190
Author: Romain GARBAGE <[email protected]>
AuthorDate: Mon Feb 24 15:38:08 2025 +0100
base: Add event-log agent.
* src/cuirass/base.scm (event-log-service, spawn-event-log-service): New
variables.
* tests/base.scm: Add tests for the event-log-service agent.
Signed-off-by: Ludovic Courtès <[email protected]>
---
src/cuirass/base.scm | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++-
tests/base.scm | 46 ++++++++++++++++++++++++++++++++++
2 files changed, 116 insertions(+), 1 deletion(-)
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 631e81f..fa3481f 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -73,6 +73,7 @@
spawn-remote-builder
spawn-channel-update-service
spawn-jobset-evaluator
+ spawn-event-log-service
spawn-jobset-registry
spawn-gc-root-cleaner
spawn-build-maintainer
@@ -113,7 +114,11 @@
;;; 'guix-daemon' process, while the remote build delegates builds to
;;; 'cuirass remote-server'.
;;;
-;;; - Each jobset as an associated "monitor"; it requests channel updates,
+;;; - The "event-log" agent receives events from the different agents. It
+;;; also dispatches these events to the agents which subscribed to event
+;;; notification.
+;;;
+;;; - Each jobset has an associated "monitor"; it requests channel updates,
;;; evaluations, and builds to the actors above. It also receives requests
;;; such as evaluation triggers that can come, for example, from the
;;; /jobset/NAME/hook/evaluate HTTP endpoint.
@@ -770,6 +775,70 @@ concurrently; it sends derivation build requests to
BUILDER."
max-parallel-evaluations))
channel))
+;;;
+;;; Logging events
+;;;
+
+(define event-log-buffer-size
+ (make-parameter 1000))
+
+(define (event-log-service channel)
+ "Keep events received on CHANNEL in a circular buffer, keeping track of
+notification subscriptions."
+ (lambda ()
+ (define events (ring-buffer (event-log-buffer-size)))
+
+ (let loop ((events events)
+ (subscribers '()))
+ (match (get-message channel)
+ (`(subscribe ,channel)
+ (loop events (cons channel subscribers)))
+ (`(unsubscribe ,channel)
+ (loop events (delq channel subscribers)))
+ (`(new-event ,event)
+ ;; Events are stored as a list in the format
+ ;; (event-type timestamp rest-of-data)
+ (let* ((data (match event
+ ((event-type . rest)
+ (append `(,event-type ,(current-time time-utc))
+ rest))))
+ (events (ring-buffer-insert data events)))
+ (match event
+ ;; This is what is received from the builders. This code aims to
+ ;; rebuild the original build object related to the derivation
+ ;; (when it exists), so it can be used by other agents.
+ (`(derivation-built ,derivation ,status)
+ (spawn-fiber
+ (lambda ()
+ (let ((build (db-get-build derivation)))
+ (when build
+ (put-message channel
+ `(new-event (build-status-changed
,build))))))))
+ (_ #t))
+ ;; For now, every new event is sent to all subscribers.
+ (for-each (lambda (reply)
+ (put-message reply data))
+ subscribers)
+ (loop events subscribers)))
+ (`(recent-events ,reply)
+ (put-message reply (ring-buffer->list events))
+ (loop events subscribers))
+ ;; Catchall for malformed messages.
+ (message
+ (log-error "malformed message sent to the event-log-service: ~s"
+ message)
+ (loop events subscribers))))))
+
+(define (spawn-event-log-service)
+ "Spawn an actor responsible for centralizing events."
+ (let ((channel (make-channel)))
+ (spawn-fiber (event-log-service channel))
+ channel))
+
+;;;
+;;; Monitoring jobsets
+;;;
+
(define %jobset-trigger-rate-window
;; Window (seconds) over which the jobset trigger rate is computed.
(* 5 60)) ;5 minutes
diff --git a/tests/base.scm b/tests/base.scm
index 1cfbecb..e3709ee 100644
--- a/tests/base.scm
+++ b/tests/base.scm
@@ -17,6 +17,10 @@
;;; along with Cuirass. If not, see <http://www.gnu.org/licenses/>.
(use-modules (cuirass base)
+ (fibers)
+ (fibers channels)
+ (ice-9 match)
+ (srfi srfi-1)
(srfi srfi-64))
(test-begin "base")
@@ -25,4 +29,46 @@
'wrong-type-arg
(%package-cachedir #f))
+(test-equal "event-log recent-events"
+ ;; Elements are in reverse order since they come from a ring-buffer.
+ '((test-event "Third message")
+ (test-event "Second message")
+ (test-event "First message"))
+ (run-fibers
+ (lambda ()
+ (let ((event-log (spawn-event-log-service))
+ (reply-channel (make-channel)))
+ (put-message event-log '(new-event (test-event "First message")))
+ (put-message event-log '(new-event (test-event "Second message")))
+ (put-message event-log '(new-event (test-event "Third message")))
+ (put-message event-log `(recent-events ,reply-channel))
+ (filter-map (match-lambda
+ ((type timestamp value)
+ ;; Drop timestamp.
+ (list type value))
+ (_ #f))
+ (get-message reply-channel))))))
+
+(test-equal "event-log recent-events (empty buffer)"
+ '()
+ (run-fibers
+ (lambda ()
+ (let ((event-log (spawn-event-log-service))
+ (reply-channel (make-channel)))
+ (put-message event-log `(recent-events ,reply-channel))
+ (get-message reply-channel)))))
+
+(test-equal "event-log subscribe"
+ '(test-event "test-value")
+ (run-fibers
+ (lambda ()
+ (let ((event-log (spawn-event-log-service))
+ (reply (make-channel)))
+ (put-message event-log `(subscribe ,reply))
+ (put-message event-log '(new-event (test-event "test-value")))
+ (match (get-message reply)
+ ((type timestamp value)
+ ;; Drop timestamp.
+ (list type value)))))))
+
(test-end)