branch: externals/phpinspect commit 6678ba20c6f00df01370aa3cd0d13dc9456273f2 Author: Hugo Thunnissen <de...@hugot.nl> Commit: Hugo Thunnissen <de...@hugot.nl>
Implement async processing pipeline --- phpinspect-pipeline.el | 364 +++++++++++++++++++++++++++++++++++++++++++++++++ phpinspect-queue.el | 32 ++++- phpinspect-worker.el | 4 +- 3 files changed, 395 insertions(+), 5 deletions(-) diff --git a/phpinspect-pipeline.el b/phpinspect-pipeline.el new file mode 100644 index 0000000000..b4356bfc86 --- /dev/null +++ b/phpinspect-pipeline.el @@ -0,0 +1,364 @@ +;;; phpinspect-pipeline.el --- PHP parsing and completion package -*- lexical-binding: t; -*- + +;; Copyright (C) 2021 Free Software Foundation, Inc + +;; Author: Hugo Thunnissen <de...@hugot.nl> +;; Keywords: php, languages, tools, convenience +;; Version: 0 + +;; This program is free software; you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. + +;; This program is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with this program. If not, see <https://www.gnu.org/licenses/>. + +;;; Commentary: + +;;; Code: +(require 'phpinspect-worker) +(require 'phpinspect-queue) + +(define-error 'phpinspect-pipeline-incoming "Signal for incoming pipeline data") +(define-error 'phpinspect-pipeline-error "Signal for pipeline errors") + +(cl-defstruct (phpinspect-pipeline-end (:constructor phpinspect-make-pipeline-end)) + (value nil + :type any) + (thread nil + :type thread)) + +(cl-defstruct (phpinspect-pipeline-thread (:constructor phpinspect-make-pipeline-thread)) + (in-queue nil + :type phpinspect-queue) + (ended nil + :type boolean)) + +(cl-defstruct (phpinspect-pipeline-ctx (:constructor phpinspect-make-pipeline-ctx)) + (threads nil + :type alist)) + +(cl-defmethod phpinspect-pipeline-ctx-register-thread ((ctx phpinspect-pipeline-ctx) thread in-queue) + (push (cons thread (phpinspect-make-pipeline-thread :in-queue in-queue)) + (phpinspect-pipeline-ctx-threads ctx))) + +(cl-defmethod phpinspect-pipeline-ctx-get-thread ((ctx phpinspect-pipeline-ctx) thread) + (alist-get thread (phpinspect-pipeline-ctx-threads ctx) + nil nil #'eq)) + +(cl-defmethod phpinspect-pipeline-ctx-register-end ((ctx phpinspect-pipeline-ctx) (end phpinspect-pipeline-end)) + (let ((thread (phpinspect-pipeline-ctx-get-thread ctx (phpinspect-pipeline-end-thread end)))) + (setf (phpinspect-pipeline-thread-ended thread) t))) + +(cl-defmethod phpinspect-pipeline-ctx-close ((ctx phpinspect-pipeline-ctx)) + (let (errors err ended thread-live) + (dolist (thread (phpinspect-pipeline-ctx-threads ctx)) + (setq err (thread-last-error (car thread)) + ended (phpinspect-pipeline-thread-ended (cdr thread)) + thread-live (thread-live-p (car thread))) + + (when thread-live + (if ended + (setq errors (nconc errors (list (format "Thread %s ended pipeline, but is still running" + (thread-name (car thread)))))) + (setq errors (nconc errors (list (format "Thread %s is still running when pipeline is closing" + (thread-name (car thread)))))))) + + + (when (thread-last-error (car thread)) + (setq errors (nconc errors (list (format "Thread %s signaled error: %s" + (thread-name (car thread)) + (thread-last-error (car thread))))))) + (unless ended + (setq errors (nconc errors (list (format "Thread %s never ended" + (thread-name (car thread))))))) + + (when (thread-live-p (car thread)) + (thread-signal (car thread) 'quit nil))) + + (when errors + (signal 'phpinspect-pipeline-error errors)))) + + +(defmacro phpinspect-pipeline-emit (data) + `(throw 'phpinspect-pipeline-emit ,data)) + +(defmacro phpinspect-pipeline-end (&optional value) + (if value + `(throw 'phpinspect-pipeline-emit + (phpinspect-make-pipeline-end :value ,value :thread (current-thread))) + `(throw 'phpinspect-pipeline-emit + (phpinspect-make-pipeline-end :thread (current-thread))))) + +(define-inline phpinspect-pipeline-pause () + "Pause the current pipeline thread" + (inline-quote + (if (input-pending-p) + (let ((mx (make-mutex))) + (phpinspect-thread-pause 1 mx (make-condition-variable mx "phpinspect-pipeline-pause"))) + (thread-yield)))) + +(defmacro phpinspect-pipeline-generator (queue &rest body) + (declare (indent 1)) + + (let ((result-sym (gensym)) + (queue-sym (gensym))) + `(let (,result-sym + (,queue-sym ,queue)) + (while (setq ,result-sym (progn ,@body)) + (phpinspect-queue-enqueue ,queue-sym ,result-sym) + (phpinspect-pipeline-pause)) + + (phpinspect-queue-enqueue ,queue-sym (phpinspect-make-pipeline-end :thread (current-thread)))))) + +(defun phpinspect--chain-pipeline-steps (steps start-queue end-queue ctx) + (let ((result (gensym "result")) + (incoming (gensym "incoming")) + (outgoing (gensym "outgoing")) + (ctx-sym (gensym "ctx")) + body name step statement) + (while (setq step (pop steps)) + (setq name (phpinspect--pipeline-step-name step)) + + (setq statement + (if (phpinspect--pipeline-step--context-var-name step) + `(,(phpinspect-pipeline-step-name name "create") + ,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step)) + `(,(phpinspect-pipeline-step-name name "create") ,incoming ,outgoing ,ctx-sym))) + (setq body (nconc body `(,(if steps + `(setq ,outgoing (phpinspect-make-queue)) + `(setq ,outgoing ,end-queue)) + (phpinspect-pipeline-ctx-register-thread ,ctx-sym ,statement ,incoming) + (setq ,incoming ,outgoing))))) + + `(let ((,incoming ,start-queue) (,ctx-sym ,ctx) ,result ,outgoing) + ,@body))) + +(cl-defstruct (phpinspect--pipeline-step (:constructor phpinspect--make-pipeline-step)) + (context nil + :type any + :documentation + "An object that is passed as first argument to all step executions") + (-context-var-name nil + :type symbol + :documentation + "Variable name used to store context in") + (name nil + :type symbol + :documentation + "The name of this step")) + +(defmacro phpinspect-pipeline (&rest parameters) + (let (key value steps body let-vars) + (catch 'break + (while parameters + (setq key (pop parameters) + value (pop parameters)) + + (pcase key + (:into + (let ((parameters) + (name) + (construct-params)) + (if (listp value) + (progn + (setq name (car value) + parameters (cdr value))) + (setq name value)) + + (unless (symbolp name) + (error "Step name should be a symbol")) + + (let (key value) + (while parameters + (setq key (pop parameters) + value (pop parameters)) + (when (eq :with-context key) + (setq value `(quote ,value))) + (setq key (intern (string-replace ":with-" ":" (symbol-name key)))) + (setq construct-params (nconc construct-params (list key value))))) + (push (eval `(phpinspect--make-pipeline-step ,@construct-params :name (quote ,name))) + steps))) + (_ (if (keywordp key) + (error "unexpected key %s" key) + (setq body `(,key)) + (throw 'break nil)))))) + + (when value + (setq body (nconc body (list value)))) + + (when parameters + (setq body (nconc body parameters))) + + (setq steps (nreverse steps)) + + (dolist (step steps) + (when (phpinspect--pipeline-step-context step) + (setf (phpinspect--pipeline-step--context-var-name step) (gensym "ctx")) + (push `(,(phpinspect--pipeline-step--context-var-name step) + ,(phpinspect--pipeline-step-context step)) + let-vars))) + + (let ((queue-sym (gensym "queue")) + (end-queue-sym (gensym "end-queue")) + (ctx-sym (gensym "ctx")) + (recv-sym (gensym)) + (errors (gensym)) + (result-sym (gensym)) + (collecting-sym (gensym))) + `(progn + (when (eq main-thread (current-thread)) + (error "Pipelines should not run in the main thread")) + + (let* (,@let-vars + (,ctx-sym (phpinspect-make-pipeline-ctx)) + (,queue-sym (phpinspect-make-queue)) + (,end-queue-sym (phpinspect-make-queue)) + (,collecting-sym t) + ,recv-sym ,result-sym) + + ,(phpinspect--chain-pipeline-steps steps queue-sym end-queue-sym ctx-sym) + + (phpinspect-pipeline-generator ,queue-sym + ,@body) + + (while ,collecting-sym + (ignore-error 'phpinspect-pipeline-incoming + (progn + (phpinspect-pipeline--register-wakeup-function ,end-queue-sym) + (while (not (phpinspect-pipeline-end-p + (setq ,recv-sym (phpinspect-pipeline-receive ,end-queue-sym)))) + (setq ,result-sym (nconc ,result-sym (list ,recv-sym)))) + (setq ,collecting-sym nil)))) + + (phpinspect-pipeline-ctx-close ,ctx-sym) + ,result-sym))))) + +(defmacro phpinspect-pipeline-async (callback &rest parameters) + (declare (indent 1)) + `(make-thread + (lambda () + (condition-case err + (let ((result (phpinspect-pipeline ,@parameters))) + (funcall ,callback result nil)) + (t (funcall ,callback nil err)))) + "phpinspect-pipeline-async")) + +(define-inline phpinspect-pipeline-receive (queue) + (inline-letevals (queue) + `(or (phpinspect-queue-dequeue ,queue) + (let ((mx (make-mutex))) + (with-mutex mx + (condition-wait (make-condition-variable mx "phpinspect-pipeline-receive"))) + (phpinspect-queue-dequeue ,queue))))) + +(defun phpinspect-pipeline-step-name (name &optional suffix) + (intern (concat (symbol-name name) (if suffix (concat "-" suffix) "")))) + +(define-inline phpinspect-pipeline--register-wakeup-function (queue) + (inline-quote + (let ((thread (current-thread))) + (setf (phpinspect-queue-subscription ,queue) + (lambda () (thread-signal thread 'phpinspect-pipeline-incoming nil)))))) + +(defmacro phpinspect-define-pipeline-step (name function-name) + (unless (symbolp name) + (error "name must be a symbol")) + + (unless (symbolp function-name) + (error "function-name must be a symbol")) + + (let ((execute-function (phpinspect-pipeline-step-name name "execute")) + (constructor-function (phpinspect-pipeline-step-name name "create"))) + + `(progn + (define-inline ,execute-function (input &optional context) + (if context + (inline-quote + (catch 'phpinspect-pipeline-emit + ,(append `(,function-name) '(,context) '(,input)) + nil)) + (inline-quote + (catch 'phpinspect-pipeline-emit + ,(append `(,function-name) '(,input)) + nil)))) + + (define-inline ,constructor-function (queue consumer-queue pipeline-ctx &optional context) + (inline-letevals (queue consumer-queue context) + (let ((thread-name ,(concat "phpinspect-pipeline-" (symbol-name name))) + (statement (list (quote ,execute-function)))) + ,@(list + '(let ((incoming (gensym "incoming")) + (outgoing (gensym "outgoing")) + (inc-queue (gensym "queue")) + (out-queue (gensym "queue")) + (context-sym (gensym "context")) + (continue-running (gensym "continue-running")) + (original-thread (gensym "original-thread")) + (pctx-sym (gensym "pipeline-ctx")) + (incoming-end (gensym "incoming-end")) + (end (gensym "end"))) + + (setq statement (nconc statement (list incoming))) + (unless (and (inline-const-p context) (not (inline-const-val context))) + (setq statement (nconc statement (list context-sym)))) + + (inline-quote + (let ((,original-thread (current-thread)) + (,inc-queue ,queue) + (,out-queue ,consumer-queue) + (,context-sym ,context) + (,pctx-sym ,pipeline-ctx)) + (make-thread + (lambda () + (let ((,continue-running t) + ,incoming ,outgoing ,end ,incoming-end) + + (phpinspect-pipeline--register-wakeup-function ,inc-queue) + (while ,continue-running + (condition-case err + (progn + (phpinspect-pipeline-pause) + (catch 'phpinspect-pipeline-break + (while ,continue-running + (setq ,incoming (phpinspect-pipeline-receive ,inc-queue)) + (if (phpinspect-pipeline-end-p ,incoming) + (progn + (setq ,incoming-end ,incoming) + (when (phpinspect-pipeline-end-value ,incoming) + (progn + (setq ,incoming (phpinspect-pipeline-end-value ,incoming) + ,outgoing ,statement) + (phpinspect-queue-enqueue ,out-queue ,outgoing 'no-notify))) + + (setq ,end (phpinspect-make-pipeline-end :thread (current-thread))) + (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) + (setq ,continue-running nil) + (phpinspect-queue-enqueue ,out-queue ,end)) + + ;; Else + (setq ,outgoing ,statement) + (when (phpinspect-pipeline-end-p ,outgoing) + (setq ,end (phpinspect-make-pipeline-end :thread (current-thread))) + (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) + (setq ,continue-running nil)) + (phpinspect-queue-enqueue ,out-queue ,outgoing)) + + (when ,end + (throw 'phpinspect-pipeline-break nil))))) + (phpinspect-pipeline-incoming) + (t (message "Pipeline thread errored: %s" err) + (setq ,continue-running nil) + (phpinspect-pipeline-ctx-register-end + ,pctx-sym + (phpinspect-make-pipeline-end :thread (current-thread)))))))) + ,thread-name))))))))))) + +(provide 'phpinspect-pipeline) +;;; phpinspect-pipeline.el ends here diff --git a/phpinspect-queue.el b/phpinspect-queue.el index 89d851b9c7..c64b7d5dfb 100644 --- a/phpinspect-queue.el +++ b/phpinspect-queue.el @@ -1,3 +1,28 @@ +;;; phpinspect-queue.el --- PHP parsing and completion package -*- lexical-binding: t; -*- + +;; Copyright (C) 2021 Free Software Foundation, Inc + +;; Author: Hugo Thunnissen <de...@hugot.nl> +;; Keywords: php, languages, tools, convenience +;; Version: 0 + +;; This program is free software; you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. + +;; This program is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with this program. If not, see <https://www.gnu.org/licenses/>. + +;;; Commentary: + +;;; Code: + (cl-defstruct (phpinspect-queue (:constructor phpinspect-make-queue-generated)) @@ -11,7 +36,6 @@ "The last item in the queue") (subscription nil :type function - :read-only t :documentation "A function that should be called when items are enqueued.")) @@ -42,7 +66,7 @@ (cl-defmethod phpinspect-queue-last ((queue phpinspect-queue)) (or (phpinspect-queue--last queue) (phpinspect-queue--first queue))) -(cl-defmethod phpinspect-queue-enqueue ((queue phpinspect-queue) value) +(cl-defmethod phpinspect-queue-enqueue ((queue phpinspect-queue) value &optional no-notify) "Add VALUE to the end of the queue that ITEM is part of." (let ((last (phpinspect-queue-last queue)) (new-item (phpinspect-make-queue-item :value value))) @@ -51,7 +75,8 @@ (setf (phpinspect-queue-item-next last) new-item) (setf (phpinspect-queue-item-previous new-item) last)) (setf (phpinspect-queue--last queue) new-item)) - (when (phpinspect-queue-subscription queue) + + (when (and (not no-notify) (phpinspect-queue-subscription queue)) (funcall (phpinspect-queue-subscription queue)))) (cl-defmethod phpinspect-queue-dequeue ((queue phpinspect-queue)) @@ -103,3 +128,4 @@ BODY can be any form." (phpinspect-queue-enqueue queue value))) (provide 'phpinspect-queue) +;;; phpinspect-queue.el ends here diff --git a/phpinspect-worker.el b/phpinspect-worker.el index 1283256f21..8b5cd22f1f 100644 --- a/phpinspect-worker.el +++ b/phpinspect-worker.el @@ -122,7 +122,7 @@ already present in the queue." PAUSE-TIME must be the idle time that the thread should pause for. MX must be a mutex CONTINUE must be a condition-variable" - (phpinspect--log "Worker thead is paused for %d seconds" pause-time) + (phpinspect--log "Thread '%s' is paused for %d seconds" (thread-name (current-thread)) pause-time) (run-with-idle-timer pause-time nil @@ -176,7 +176,7 @@ CONTINUE must be a condition-variable" ;; current buffer. Otherwise, the buffer associated with this thread ;; will be unkillable while the thread is running. (with-temp-buffer - (make-thread (phpinspect-worker-make-thread-function worker))))))) + (make-thread (phpinspect-worker-make-thread-function worker) "phpinspect-worker")))))) (cl-defmethod phpinspect-worker-start ((worker phpinspect-dynamic-worker)) (phpinspect-worker-start (phpinspect-resolve-dynamic-worker worker)))