branch: externals/llm commit 8a384aa073a568ec276a066b8b58c9f13c217dc4 Author: Roman Scherer <ro...@burningswell.com> Commit: Roman Scherer <ro...@burningswell.com>
Run handler code via a timer --- llm-request-plz.el | 7 +-- plz-event-source.el | 9 ++- plz-media-type.el | 168 ++++++++++++++++++++++++++++------------------------ plz.el | 14 ++++- 4 files changed, 112 insertions(+), 86 deletions(-) diff --git a/llm-request-plz.el b/llm-request-plz.el index 7224e6dced..feae69f049 100644 --- a/llm-request-plz.el +++ b/llm-request-plz.el @@ -102,12 +102,7 @@ TIMEOUT is the number of seconds to wait for a response." (defun llm-request-plz--handle-error (error on-error) "Handle the ERROR with the ON-ERROR callback." - (cond ((plz-media-type-filter-error-p error) - (let ((cause (plz-media-type-filter-error-cause error)) - (response (plz-error-response error))) - (funcall on-error 'error - (format "Error with cause: %s, response: %s" cause response)))) - ((plz-error-curl-error error) + (cond ((plz-error-curl-error error) (let ((curl-error (plz-error-curl-error error))) (funcall on-error 'error (format "curl error code %d: %s" diff --git a/plz-event-source.el b/plz-event-source.el index e0a4b83a9b..942661f1a5 100644 --- a/plz-event-source.el +++ b/plz-event-source.el @@ -290,7 +290,14 @@ (with-slots (handlers) source (dolist (pair handlers) (when (equal (car pair) (oref event type)) - (funcall (cdr pair) source event))))) + (let ((timer (timer-create))) + (timer-set-time timer (current-time)) + (timer-set-function timer + (lambda (handler source event) + (with-temp-buffer + (funcall handler source event))) + (list (cdr pair) source event)) + (timer-activate timer)))))) (defun plz-event-source-dispatch-events (source events) "Dispatch the EVENTS to the listeners of event SOURCE." diff --git a/plz-media-type.el b/plz-media-type.el index b350a8f641..07ce937447 100644 --- a/plz-media-type.el +++ b/plz-media-type.el @@ -44,13 +44,6 @@ (require 'eieio) (require 'plz) -(define-error 'plz-media-type-filter-error - "plz-media-type: Error in process filter" - 'plz-error) - -(cl-defstruct (plz-media-type-filter-error (:include plz-error)) - cause) - (defclass plz-media-type () ((coding-system :documentation "The coding system to use for the media type." @@ -166,7 +159,43 @@ an alist of parameters." (defvar-local plz-media-type--response nil "The response of the process buffer.") -(defun plz-media-type-process-filter (process media-types chunk) +(defun plz-media-type--schedule (handler messages) + "Schedule MESSAGES to be processed with the HANDLER on a timer." + (cl-loop with time = (current-time) + for msg = (pop messages) while msg + do (let ((timer (timer-create))) + (timer-set-time timer time) + (timer-set-function timer + (lambda (handler msg) + (with-temp-buffer (funcall handler msg))) + (list handler msg)) + (timer-activate timer)))) + +(defun plz-media-type--parse-headers () + "Parse the HTTP response headers in the current buffer." + (forward-line 1) + (let ((limit (save-excursion + (re-search-forward plz-http-end-of-headers-regexp nil) + (point)))) + (cl-loop while (re-search-forward (rx bol (group (1+ (not (in ":")))) ":" (1+ blank) + (group (1+ (not (in "\r\n"))))) + limit t) + collect (cons (intern (downcase (match-string 1))) (match-string 2))))) + +(cl-defun plz-media-type--parse-response () + "Parse the response in the current buffer." + (when (re-search-forward plz-http-end-of-headers-regexp nil t) + (let ((end-of-headers (point))) + (goto-char (point-min)) + (when (looking-at plz-http-response-status-line-regexp) + (prog1 (make-plz-response + :version (string-to-number (match-string 1)) + :status (string-to-number (match-string 2)) + :headers (plz-media-type--parse-headers) + :body (buffer-substring end-of-headers (point-max))) + (goto-char end-of-headers)))))) + +(defun plz-media-type-process-filter (process media-types string) "The process filter that handles different content types. PROCESS is the process. @@ -174,37 +203,34 @@ PROCESS is the process. MEDIA-TYPES is an association list from media type to an instance of a content type class. -CHUNK is a part of the HTTP body." +STRING which is output just received from the process." (when (buffer-live-p (process-buffer process)) (with-current-buffer (process-buffer process) (let ((moving (= (point) (process-mark process)))) (if-let (media-type plz-media-type--current) (let ((response plz-media-type--response)) - (setf (plz-response-body response) chunk) + (setf (plz-response-body response) string) (plz-media-type-process media-type process response)) (progn (save-excursion (goto-char (process-mark process)) - (insert chunk) + (insert string) (set-marker (process-mark process) (point))) (goto-char (point-min)) - (when (re-search-forward plz-http-end-of-headers-regexp nil t) - (let ((body-start (point))) - (goto-char (point-min)) - (let* ((response (prog1 (plz--response) (widen))) - (media-type (plz-media-type-of-response media-types response))) - (setq-local plz-media-type--current media-type) - (setq-local plz-media-type--response - (make-plz-response - :headers (plz-response-headers response) - :status (plz-response-status response) - :version (plz-response-version response))) - (when-let (body (plz-response-body response)) - (when (> (length body) 0) - (setf (plz-response-body response) body) - (delete-region body-start (point-max)) - (set-marker (process-mark process) (point)) - (plz-media-type-process media-type process response)))))))) + (when-let (chunk (plz-media-type--parse-response)) + (delete-region (point) (point-max)) + (let ((media-type (plz-media-type-of-response media-types chunk))) + (setq-local plz-media-type--current media-type) + (setq-local plz-media-type--response + (make-plz-response + :headers (plz-response-headers chunk) + :status (plz-response-status chunk) + :version (plz-response-version chunk))) + (when-let (body (plz-response-body chunk)) + (when (> (length body) 0) + (setf (plz-response-body chunk) body) + (set-marker (process-mark process) (point)) + (plz-media-type-process media-type process chunk))))))) (when moving (goto-char (process-mark process))))))) @@ -339,24 +365,25 @@ will always be set to nil.") (defun plz-media-type:application/json-array--parse-stream (media-type) "Parse all lines of the newline delimited JSON MEDIA-TYPE in the PROCESS buffer." - (with-slots (handler) media-type + (let ((objects)) (unless plz-media-type--position (setq-local plz-media-type--position (point))) (goto-char plz-media-type--position) (when-let (result (plz-media-type:application/json-array--consume-next media-type)) (while result - (when (and (equal :array-element (car result)) - (functionp handler)) - (funcall handler (cdr result))) - (setq result (plz-media-type:application/json-array--consume-next media-type)))))) + (when (equal :array-element (car result)) + (push (cdr result) objects)) + (setq result (plz-media-type:application/json-array--consume-next media-type)))) + objects)) (cl-defmethod plz-media-type-process ((media-type plz-media-type:application/json-array) process chunk) "Process the CHUNK according to MEDIA-TYPE using PROCESS." - (ignore media-type) (cl-call-next-method media-type process chunk) - (plz-media-type:application/json-array--parse-stream media-type) - (set-marker (process-mark process) (point-max))) + (with-slots (handler) media-type + (let ((objects (plz-media-type:application/json-array--parse-stream media-type))) + (set-marker (process-mark process) (point-max)) + (plz-media-type--schedule handler objects)))) (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/json-array) response) @@ -396,21 +423,24 @@ will always be set to nil.") (defun plz-media-type:application/x-ndjson--parse-stream (media-type) "Parse all lines of the newline delimited JSON MEDIA-TYPE in the PROCESS buffer." (with-slots (handler) media-type - (unless plz-media-type--position - (setq-local plz-media-type--position (point))) - (goto-char plz-media-type--position) - (when-let (object (plz-media-type:application/x-ndjson--parse-line media-type)) - (while object - (setq-local plz-media-type--position (point)) - (when (functionp handler) - (funcall handler object)) - (setq object (plz-media-type:application/x-ndjson--parse-line media-type)))))) + (let (objects) + (unless plz-media-type--position + (setq-local plz-media-type--position (point))) + (goto-char plz-media-type--position) + (when-let (object (plz-media-type:application/x-ndjson--parse-line media-type)) + (while object + (setq-local plz-media-type--position (point)) + (push object objects) + (setq object (plz-media-type:application/x-ndjson--parse-line media-type)))) + objects))) (cl-defmethod plz-media-type-process ((media-type plz-media-type:application/x-ndjson) process chunk) "Process the CHUNK according to MEDIA-TYPE using PROCESS." (cl-call-next-method media-type process chunk) - (plz-media-type:application/x-ndjson--parse-stream media-type)) + (with-slots (handler) media-type + (let ((objects (plz-media-type:application/x-ndjson--parse-stream media-type))) + (plz-media-type--schedule handler objects)))) (cl-defmethod plz-media-type-then ((media-type plz-media-type:application/x-ndjson) response) @@ -490,15 +520,18 @@ parsing the HTTP response body with the (defun plz-media-type--handle-sync-error (error media-types) "Handle the synchronous ERROR using MEDIA-TYPES." (cond - ((plz-media-type-filter-error-p error) - (signal 'plz-media-type-filter-error - (list (plz-media-type-filter-error-message error) - (plz-media-type-filter-error-response error) - (plz-media-type-filter-error-cause error)))) ((eq 'plz-http-error (car error)) (plz-media-type--handle-sync-http-error error media-types)) (t (signal (car error) (cdr error))))) +(defun plz-media-type--handle-sync-response (buffer) + "Handle a successful synchronous response in BUFFER." + (unwind-protect + (with-current-buffer buffer + (plz-media-type-then plz-media-type--current plz-media-type--response)) + (when (buffer-live-p buffer) + (kill-buffer buffer)))) + (cl-defun plz-media-type-request (method url @@ -616,7 +649,7 @@ not. (if-let (media-types (pcase as (`(media-types ,media-types) media-types))) - (let ((buffer) (filter-error)) + (let ((buffer)) (condition-case error (let* ((plz-curl-default-args (cons "--no-buffer" plz-curl-default-args)) (result (plz method url @@ -628,10 +661,9 @@ not. :else (lambda (error) (setq buffer (current-buffer)) (when (or (functionp else) (symbolp else)) - (funcall else (or filter-error - (plz-media-type-else - plz-media-type--current - error))))) + (funcall else (plz-media-type-else + plz-media-type--current + error)))) :finally (lambda () (unwind-protect (when (functionp finally) @@ -640,19 +672,8 @@ not. (kill-buffer buffer)))) :headers headers :noquery noquery - :process-filter (lambda (process chunk) - (condition-case cause - (plz-media-type-process-filter process media-types chunk) - (error - (let ((buffer (process-buffer process))) - (setq filter-error - (make-plz-media-type-filter-error - :cause cause - :message (format "error in process filter: %S" cause) - :response (when (buffer-live-p buffer) - (with-current-buffer buffer - plz-media-type--response)))) - (delete-process process))))) + :filter (lambda (process chunk) + (plz-media-type-process-filter process media-types chunk)) :timeout timeout :then (if (symbolp then) then @@ -663,17 +684,12 @@ not. plz-media-type--current plz-media-type--response)))))))) (cond ((bufferp result) - (unwind-protect - (with-current-buffer result - (plz-media-type-then plz-media-type--current plz-media-type--response)) - (when (buffer-live-p result) - (kill-buffer result)))) + (plz-media-type--handle-sync-response result)) ((processp result) result) (t (user-error "Unexpected response: %s" result)))) ;; TODO: How to kill the buffer for sync requests that raise an error? - (plz-error - (plz-media-type--handle-sync-error (or filter-error error) media-types)))) + (plz-error (plz-media-type--handle-sync-error error media-types)))) (apply #'plz (append (list method url) rest)))) ;;;; Footer diff --git a/plz.el b/plz.el index 3a9271bca6..5b5605bb59 100644 --- a/plz.el +++ b/plz.el @@ -254,7 +254,7 @@ connection phase and waiting to receive the response (the ;;;;; Public -(cl-defun plz (method url &rest rest &key headers body else finally noquery process-filter +(cl-defun plz (method url &rest rest &key headers body else filter finally noquery (as 'string) (then 'sync) (body-type 'text) (decode t decode-s) (connect-timeout plz-connect-timeout) (timeout plz-timeout)) @@ -330,6 +330,15 @@ from a host, respectively. NOQUERY is passed to `make-process', which see. +FILTER is an optional function to be used as the process filter +for the curl process. It can be used to handle HTTP responses in +a streaming way. The function must accept 2 arguments, the +process object running curl, and a string which is output +received from the process. The default process filter inserts +the output of the process into the process buffer. The provided +FILTER function should at least insert output up to the HTTP body +into the process buffer. + \(To silence checkdoc, we mention the internal argument REST.)" ;; FIXME(v0.8): Remove the note about error changes from the docstring. ;; FIXME(v0.8): Update error signals in docstring. @@ -404,7 +413,7 @@ NOQUERY is passed to `make-process', which see. :coding 'binary :command (append (list plz-curl-program) curl-command-line-args) :connection-type 'pipe - :filter process-filter + :filter filter :sentinel #'plz--sentinel :stderr stderr-process :noquery noquery)) @@ -755,7 +764,6 @@ argument passed to `plz--sentinel', which see." (pcase-exhaustive status ((or 0 "finished\n") ;; Curl exited normally: check HTTP status code. - (widen) (goto-char (point-min)) (plz--skip-proxy-headers) (while (plz--skip-redirect-headers))