branch: externals/futur
commit 44112d22595f1bc3df580236ecb92f7c649d4132
Author: Stefan Monnier <[email protected]>
Commit: Stefan Monnier <[email protected]>
futur.el: Begin implementing bound on parallelism
* futur.el (futur-process-max): New custom.
(futur--process-active, futur--process-waiting): New vars.
(futur--process-bounded, futur--process-next): New functions.
(futur-blocker-abort) <waiting>: New method.
(futur--process-make): Rename from `futur-process-make`.
---
futur.el | 42 ++++++++++++++++++++++++++++++++++++++++--
1 file changed, 40 insertions(+), 2 deletions(-)
diff --git a/futur.el b/futur.el
index 2096c8d6c9..12fda77f5e 100644
--- a/futur.el
+++ b/futur.el
@@ -600,6 +600,43 @@ If IDLE is non-nil, then wait for that amount of idle
time."
;;;; Processes
+(defcustom futur-process-max
+ (if (fboundp 'num-processors) (num-processors) 2)
+ "Maximum number of concurrent subprocesses."
+ :type 'integer)
+
+(defvar futur--process-active nil
+ "List of active process-futures.")
+
+(defvar futur--process-waiting nil
+ "List of process-futures waiting to start.")
+
+(defun futur--process-bounded (&rest args)
+ (if (< (length futur--process-active) futur-process-max)
+ (let ((new (apply #'funcall args)))
+ (push new futur--process-active)
+ (futur-register-callback
+ new (oclosure-lambda (futur--aux) (_ _) (futur--process-next new)))
+ new)
+ (let ((new (futur--waiting 'waiting)))
+ (push (cons new args) futur--process-waiting)
+ new)))
+
+(defun futur--process-next (done)
+ (setq futur--process-active (delq done futur--process-active))
+ (cl-block nil
+ (while futur--process-waiting
+ (pcase-let ((`(,fut . ,call) (pop futur--process-waiting)))
+ (pcase fut
+ ((futur--waiting)
+ (let ((new (apply #'futur--process-bounded call)))
+ (futur-register-callback
+ new (lambda (err val) (futur--deliver new err val)))
+ (cl-return))))))))
+
+(cl-defmethod futur-blocker-abort ((_ (eql 'waiting)) _error)
+ nil)
+
(defun futur--process-completed-p (proc)
(memq (process-status proc) '(exit signal closed failed)))
@@ -607,7 +644,7 @@ If IDLE is non-nil, then wait for that amount of idle time."
(when (futur--process-completed-p proc)
(futur-deliver-value futur (process-exit-status proc))))
-(defun futur-process-make (&rest args)
+(defun futur--process-make (&rest args)
"Create a process and return a future that delivers its exit code.
The ARGS are like those of `make-process' except that they can't include
`:sentinel' because that is used internally."
@@ -642,7 +679,7 @@ The DISPLAY argument is ignored: redisplay always happens."
(`(:file ,(and file (pred stringp)))
(setq destination (expand-file-name file)))
(`(,_ . ,_) (error "Separate handling of stderr is not supported yet")))
- (let* ((futur (futur-process-make
+ (let* ((futur (futur--process-make
:name program
:command (cons program args)
:coding (if (stringp destination) '(binary . nil))
@@ -651,6 +688,7 @@ The DISPLAY argument is ignored: redisplay always happens."
:filter (if (bufferp destination) nil
#'futur-process-call--filter)))
(proc (pcase-exhaustive futur ((futur--waiting blocker) blocker))))
+ (push futur futur--process-active)
(when (stringp destination)
(write-region "" nil destination nil 'silent))
(pcase-exhaustive infile