branch: externals/phpinspect
commit 6678ba20c6f00df01370aa3cd0d13dc9456273f2
Author: Hugo Thunnissen <de...@hugot.nl>
Commit: Hugo Thunnissen <de...@hugot.nl>

    Implement async processing pipeline
---
 phpinspect-pipeline.el | 364 +++++++++++++++++++++++++++++++++++++++++++++++++
 phpinspect-queue.el    |  32 ++++-
 phpinspect-worker.el   |   4 +-
 3 files changed, 395 insertions(+), 5 deletions(-)

diff --git a/phpinspect-pipeline.el b/phpinspect-pipeline.el
new file mode 100644
index 0000000000..b4356bfc86
--- /dev/null
+++ b/phpinspect-pipeline.el
@@ -0,0 +1,364 @@
+;;; phpinspect-pipeline.el --- PHP parsing and completion package  -*- 
lexical-binding: t; -*-
+
+;; Copyright (C) 2021  Free Software Foundation, Inc
+
+;; Author: Hugo Thunnissen <de...@hugot.nl>
+;; Keywords: php, languages, tools, convenience
+;; Version: 0
+
+;; This program is free software; you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+
+;; This program is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+;; GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License
+;; along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+;;; Commentary:
+
+;;; Code:
+(require 'phpinspect-worker)
+(require 'phpinspect-queue)
+
+(define-error 'phpinspect-pipeline-incoming "Signal for incoming pipeline 
data")
+(define-error 'phpinspect-pipeline-error "Signal for pipeline errors")
+
+(cl-defstruct (phpinspect-pipeline-end (:constructor 
phpinspect-make-pipeline-end))
+  (value nil
+         :type any)
+  (thread nil
+          :type thread))
+
+(cl-defstruct (phpinspect-pipeline-thread (:constructor 
phpinspect-make-pipeline-thread))
+  (in-queue nil
+            :type phpinspect-queue)
+  (ended nil
+         :type boolean))
+
+(cl-defstruct (phpinspect-pipeline-ctx (:constructor 
phpinspect-make-pipeline-ctx))
+  (threads nil
+           :type alist))
+
+(cl-defmethod phpinspect-pipeline-ctx-register-thread ((ctx 
phpinspect-pipeline-ctx) thread in-queue)
+  (push (cons thread (phpinspect-make-pipeline-thread :in-queue in-queue))
+        (phpinspect-pipeline-ctx-threads ctx)))
+
+(cl-defmethod phpinspect-pipeline-ctx-get-thread ((ctx 
phpinspect-pipeline-ctx) thread)
+  (alist-get thread (phpinspect-pipeline-ctx-threads ctx)
+             nil nil #'eq))
+
+(cl-defmethod phpinspect-pipeline-ctx-register-end ((ctx 
phpinspect-pipeline-ctx) (end phpinspect-pipeline-end))
+  (let ((thread (phpinspect-pipeline-ctx-get-thread ctx 
(phpinspect-pipeline-end-thread end))))
+    (setf (phpinspect-pipeline-thread-ended thread) t)))
+
+(cl-defmethod phpinspect-pipeline-ctx-close ((ctx phpinspect-pipeline-ctx))
+  (let (errors err ended thread-live)
+    (dolist (thread (phpinspect-pipeline-ctx-threads ctx))
+      (setq err (thread-last-error (car thread))
+            ended (phpinspect-pipeline-thread-ended (cdr thread))
+            thread-live (thread-live-p (car thread)))
+
+      (when thread-live
+        (if ended
+            (setq errors (nconc errors (list (format "Thread %s ended 
pipeline, but is still running"
+                                                     (thread-name (car 
thread))))))
+          (setq errors (nconc errors (list (format "Thread %s is still running 
when pipeline is closing"
+                                                   (thread-name (car 
thread))))))))
+
+
+      (when (thread-last-error (car thread))
+        (setq errors (nconc errors (list (format "Thread %s signaled error: %s"
+                                                 (thread-name (car thread))
+                                                 (thread-last-error (car 
thread)))))))
+      (unless ended
+        (setq errors (nconc errors (list (format "Thread %s never ended"
+                                                 (thread-name (car 
thread)))))))
+
+      (when (thread-live-p (car thread))
+        (thread-signal (car thread) 'quit nil)))
+
+    (when errors
+      (signal 'phpinspect-pipeline-error errors))))
+
+
+(defmacro phpinspect-pipeline-emit (data)
+  `(throw 'phpinspect-pipeline-emit ,data))
+
+(defmacro phpinspect-pipeline-end (&optional value)
+  (if value
+      `(throw 'phpinspect-pipeline-emit
+              (phpinspect-make-pipeline-end :value ,value :thread 
(current-thread)))
+  `(throw 'phpinspect-pipeline-emit
+          (phpinspect-make-pipeline-end :thread (current-thread)))))
+
+(define-inline phpinspect-pipeline-pause ()
+  "Pause the current pipeline thread"
+  (inline-quote
+   (if (input-pending-p)
+       (let ((mx (make-mutex)))
+         (phpinspect-thread-pause 1 mx (make-condition-variable mx 
"phpinspect-pipeline-pause")))
+     (thread-yield))))
+
+(defmacro phpinspect-pipeline-generator (queue &rest body)
+  (declare (indent 1))
+
+  (let ((result-sym (gensym))
+        (queue-sym (gensym)))
+    `(let (,result-sym
+           (,queue-sym ,queue))
+       (while (setq ,result-sym (progn ,@body))
+         (phpinspect-queue-enqueue ,queue-sym ,result-sym)
+         (phpinspect-pipeline-pause))
+
+       (phpinspect-queue-enqueue ,queue-sym (phpinspect-make-pipeline-end 
:thread (current-thread))))))
+
+(defun phpinspect--chain-pipeline-steps (steps start-queue end-queue ctx)
+  (let ((result (gensym "result"))
+        (incoming (gensym "incoming"))
+        (outgoing (gensym "outgoing"))
+        (ctx-sym (gensym "ctx"))
+        body name step statement)
+    (while (setq step (pop steps))
+      (setq name (phpinspect--pipeline-step-name step))
+
+      (setq statement
+            (if (phpinspect--pipeline-step--context-var-name step)
+                `(,(phpinspect-pipeline-step-name name "create")
+                  ,incoming ,outgoing ,ctx-sym 
,(phpinspect--pipeline-step--context-var-name step))
+              `(,(phpinspect-pipeline-step-name name "create") ,incoming 
,outgoing ,ctx-sym)))
+      (setq body (nconc body `(,(if steps
+                                    `(setq ,outgoing (phpinspect-make-queue))
+                                  `(setq ,outgoing ,end-queue))
+                               (phpinspect-pipeline-ctx-register-thread 
,ctx-sym ,statement ,incoming)
+                               (setq ,incoming ,outgoing)))))
+
+    `(let ((,incoming ,start-queue) (,ctx-sym ,ctx) ,result ,outgoing)
+       ,@body)))
+
+(cl-defstruct (phpinspect--pipeline-step (:constructor 
phpinspect--make-pipeline-step))
+  (context nil
+           :type any
+           :documentation
+           "An object that is passed as first argument to all step executions")
+  (-context-var-name nil
+                     :type symbol
+                     :documentation
+                     "Variable name used to store context in")
+  (name nil
+        :type symbol
+        :documentation
+        "The name of this step"))
+
+(defmacro phpinspect-pipeline (&rest parameters)
+  (let (key value steps body let-vars)
+    (catch 'break
+      (while parameters
+        (setq key (pop parameters)
+              value (pop parameters))
+
+        (pcase key
+          (:into
+           (let ((parameters)
+                 (name)
+                 (construct-params))
+             (if (listp value)
+                 (progn
+                   (setq name (car value)
+                         parameters (cdr value)))
+               (setq name value))
+
+             (unless (symbolp name)
+               (error "Step name should be a symbol"))
+
+             (let (key value)
+               (while parameters
+                 (setq key (pop parameters)
+                       value (pop parameters))
+                 (when (eq :with-context key)
+                   (setq value `(quote ,value)))
+                 (setq key (intern (string-replace ":with-" ":" (symbol-name 
key))))
+                 (setq construct-params (nconc construct-params (list key 
value)))))
+             (push (eval `(phpinspect--make-pipeline-step ,@construct-params 
:name (quote ,name)))
+                   steps)))
+          (_ (if (keywordp key)
+                 (error "unexpected key %s" key)
+               (setq body `(,key))
+               (throw 'break nil))))))
+
+    (when value
+      (setq body (nconc body (list value))))
+
+    (when parameters
+      (setq body (nconc body parameters)))
+
+    (setq steps (nreverse steps))
+
+    (dolist (step steps)
+      (when (phpinspect--pipeline-step-context step)
+        (setf (phpinspect--pipeline-step--context-var-name step) (gensym 
"ctx"))
+        (push `(,(phpinspect--pipeline-step--context-var-name step)
+                ,(phpinspect--pipeline-step-context step))
+              let-vars)))
+
+    (let ((queue-sym (gensym "queue"))
+          (end-queue-sym (gensym "end-queue"))
+          (ctx-sym (gensym "ctx"))
+          (recv-sym (gensym))
+          (errors (gensym))
+          (result-sym (gensym))
+          (collecting-sym (gensym)))
+      `(progn
+             (when (eq main-thread (current-thread))
+               (error "Pipelines should not run in the main thread"))
+
+             (let* (,@let-vars
+                    (,ctx-sym (phpinspect-make-pipeline-ctx))
+                    (,queue-sym (phpinspect-make-queue))
+                    (,end-queue-sym (phpinspect-make-queue))
+                    (,collecting-sym t)
+                    ,recv-sym ,result-sym)
+
+               ,(phpinspect--chain-pipeline-steps steps queue-sym 
end-queue-sym ctx-sym)
+
+               (phpinspect-pipeline-generator ,queue-sym
+                 ,@body)
+
+               (while ,collecting-sym
+                 (ignore-error 'phpinspect-pipeline-incoming
+                     (progn
+                       (phpinspect-pipeline--register-wakeup-function 
,end-queue-sym)
+                       (while (not (phpinspect-pipeline-end-p
+                                    (setq ,recv-sym 
(phpinspect-pipeline-receive ,end-queue-sym))))
+                         (setq ,result-sym (nconc ,result-sym (list 
,recv-sym))))
+                       (setq ,collecting-sym nil))))
+
+               (phpinspect-pipeline-ctx-close ,ctx-sym)
+               ,result-sym)))))
+
+(defmacro phpinspect-pipeline-async (callback &rest parameters)
+  (declare (indent 1))
+  `(make-thread
+    (lambda ()
+      (condition-case err
+          (let ((result (phpinspect-pipeline ,@parameters)))
+            (funcall ,callback result nil))
+        (t (funcall ,callback nil err))))
+    "phpinspect-pipeline-async"))
+
+(define-inline phpinspect-pipeline-receive (queue)
+  (inline-letevals (queue)
+    `(or (phpinspect-queue-dequeue ,queue)
+         (let ((mx (make-mutex)))
+           (with-mutex mx
+             (condition-wait (make-condition-variable mx 
"phpinspect-pipeline-receive")))
+           (phpinspect-queue-dequeue ,queue)))))
+
+(defun phpinspect-pipeline-step-name (name &optional suffix)
+  (intern (concat (symbol-name name) (if suffix (concat "-" suffix) ""))))
+
+(define-inline phpinspect-pipeline--register-wakeup-function (queue)
+  (inline-quote
+   (let ((thread (current-thread)))
+     (setf (phpinspect-queue-subscription ,queue)
+           (lambda () (thread-signal thread 'phpinspect-pipeline-incoming 
nil))))))
+
+(defmacro phpinspect-define-pipeline-step (name function-name)
+  (unless (symbolp name)
+    (error "name must be a symbol"))
+
+  (unless (symbolp function-name)
+    (error "function-name must be a symbol"))
+
+  (let ((execute-function (phpinspect-pipeline-step-name name "execute"))
+        (constructor-function (phpinspect-pipeline-step-name name "create")))
+
+    `(progn
+       (define-inline ,execute-function (input &optional context)
+         (if context
+             (inline-quote
+              (catch 'phpinspect-pipeline-emit
+                ,(append `(,function-name) '(,context) '(,input))
+                nil))
+           (inline-quote
+            (catch 'phpinspect-pipeline-emit
+              ,(append `(,function-name) '(,input))
+              nil))))
+
+       (define-inline ,constructor-function (queue consumer-queue pipeline-ctx 
&optional context)
+         (inline-letevals (queue consumer-queue context)
+           (let ((thread-name ,(concat "phpinspect-pipeline-" (symbol-name 
name)))
+                 (statement (list (quote ,execute-function))))
+             ,@(list
+                '(let ((incoming (gensym "incoming"))
+                       (outgoing (gensym "outgoing"))
+                       (inc-queue (gensym "queue"))
+                       (out-queue (gensym "queue"))
+                       (context-sym (gensym "context"))
+                       (continue-running (gensym "continue-running"))
+                       (original-thread (gensym "original-thread"))
+                       (pctx-sym (gensym "pipeline-ctx"))
+                       (incoming-end (gensym "incoming-end"))
+                       (end (gensym "end")))
+
+                   (setq statement (nconc statement (list incoming)))
+                   (unless (and (inline-const-p context) (not 
(inline-const-val context)))
+                     (setq statement (nconc statement (list context-sym))))
+
+                   (inline-quote
+                    (let ((,original-thread (current-thread))
+                          (,inc-queue ,queue)
+                          (,out-queue ,consumer-queue)
+                          (,context-sym ,context)
+                          (,pctx-sym ,pipeline-ctx))
+                      (make-thread
+                       (lambda ()
+                         (let ((,continue-running t)
+                               ,incoming ,outgoing ,end ,incoming-end)
+
+                           (phpinspect-pipeline--register-wakeup-function 
,inc-queue)
+                           (while ,continue-running
+                             (condition-case err
+                                 (progn
+                                   (phpinspect-pipeline-pause)
+                                   (catch 'phpinspect-pipeline-break
+                                     (while ,continue-running
+                                       (setq ,incoming 
(phpinspect-pipeline-receive ,inc-queue))
+                                       (if (phpinspect-pipeline-end-p 
,incoming)
+                                           (progn
+                                             (setq ,incoming-end ,incoming)
+                                             (when 
(phpinspect-pipeline-end-value ,incoming)
+                                               (progn
+                                                 (setq ,incoming 
(phpinspect-pipeline-end-value ,incoming)
+                                                       ,outgoing ,statement)
+                                                 (phpinspect-queue-enqueue 
,out-queue ,outgoing 'no-notify)))
+
+                                             (setq ,end 
(phpinspect-make-pipeline-end :thread (current-thread)))
+                                             
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
+                                             (setq ,continue-running nil)
+                                             (phpinspect-queue-enqueue 
,out-queue ,end))
+
+                                         ;; Else
+                                         (setq ,outgoing ,statement)
+                                         (when (phpinspect-pipeline-end-p 
,outgoing)
+                                           (setq ,end 
(phpinspect-make-pipeline-end :thread (current-thread)))
+                                           
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
+                                           (setq ,continue-running nil))
+                                         (phpinspect-queue-enqueue ,out-queue 
,outgoing))
+
+                                       (when ,end
+                                         (throw 'phpinspect-pipeline-break 
nil)))))
+                               (phpinspect-pipeline-incoming)
+                               (t (message "Pipeline thread errored: %s" err)
+                                  (setq ,continue-running nil)
+                                  (phpinspect-pipeline-ctx-register-end
+                                   ,pctx-sym
+                                   (phpinspect-make-pipeline-end :thread 
(current-thread))))))))
+                       ,thread-name)))))))))))
+
+(provide 'phpinspect-pipeline)
+;;; phpinspect-pipeline.el ends here
diff --git a/phpinspect-queue.el b/phpinspect-queue.el
index 89d851b9c7..c64b7d5dfb 100644
--- a/phpinspect-queue.el
+++ b/phpinspect-queue.el
@@ -1,3 +1,28 @@
+;;; phpinspect-queue.el --- PHP parsing and completion package  -*- 
lexical-binding: t; -*-
+
+;; Copyright (C) 2021  Free Software Foundation, Inc
+
+;; Author: Hugo Thunnissen <de...@hugot.nl>
+;; Keywords: php, languages, tools, convenience
+;; Version: 0
+
+;; This program is free software; you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+
+;; This program is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+;; GNU General Public License for more details.
+
+;; You should have received a copy of the GNU General Public License
+;; along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+;;; Commentary:
+
+;;; Code:
+
 
 (cl-defstruct (phpinspect-queue
                (:constructor phpinspect-make-queue-generated))
@@ -11,7 +36,6 @@
         "The last item in the queue")
   (subscription  nil
                  :type function
-                 :read-only t
                 :documentation
                 "A function that should be called when items are
                 enqueued."))
@@ -42,7 +66,7 @@
 (cl-defmethod phpinspect-queue-last ((queue phpinspect-queue))
   (or (phpinspect-queue--last queue) (phpinspect-queue--first queue)))
 
-(cl-defmethod phpinspect-queue-enqueue ((queue phpinspect-queue) value)
+(cl-defmethod phpinspect-queue-enqueue ((queue phpinspect-queue) value 
&optional no-notify)
   "Add VALUE to the end of the queue that ITEM is part of."
   (let ((last (phpinspect-queue-last queue))
         (new-item (phpinspect-make-queue-item :value value)))
@@ -51,7 +75,8 @@
       (setf (phpinspect-queue-item-next last) new-item)
       (setf (phpinspect-queue-item-previous new-item) last))
     (setf (phpinspect-queue--last queue) new-item))
-  (when (phpinspect-queue-subscription queue)
+
+  (when (and (not no-notify) (phpinspect-queue-subscription queue))
     (funcall (phpinspect-queue-subscription queue))))
 
 (cl-defmethod phpinspect-queue-dequeue ((queue phpinspect-queue))
@@ -103,3 +128,4 @@ BODY can be any form."
     (phpinspect-queue-enqueue queue value)))
 
 (provide 'phpinspect-queue)
+;;; phpinspect-queue.el ends here
diff --git a/phpinspect-worker.el b/phpinspect-worker.el
index 1283256f21..8b5cd22f1f 100644
--- a/phpinspect-worker.el
+++ b/phpinspect-worker.el
@@ -122,7 +122,7 @@ already present in the queue."
 PAUSE-TIME must be the idle time that the thread should pause for.
 MX must be a mutex
 CONTINUE must be a condition-variable"
-  (phpinspect--log "Worker thead is paused for %d seconds" pause-time)
+  (phpinspect--log "Thread '%s' is paused for %d seconds" (thread-name 
(current-thread)) pause-time)
   (run-with-idle-timer
    pause-time
    nil
@@ -176,7 +176,7 @@ CONTINUE must be a condition-variable"
             ;; current buffer. Otherwise, the buffer associated with this 
thread
             ;; will be unkillable while the thread is running.
             (with-temp-buffer
-              (make-thread (phpinspect-worker-make-thread-function 
worker)))))))
+              (make-thread (phpinspect-worker-make-thread-function worker) 
"phpinspect-worker"))))))
 
 (cl-defmethod phpinspect-worker-start ((worker phpinspect-dynamic-worker))
   (phpinspect-worker-start (phpinspect-resolve-dynamic-worker worker)))

Reply via email to