branch: externals/crdt commit 692d2cc4dba825429ce31158cc5219b58d25f6da Author: Qiantan Hong <qh...@mit.edu> Commit: Qiantan Hong <qh...@mit.edu>
initial work for lazily pulling buffer --- HACKING.org | 14 ++- crdt.el | 327 ++++++++++++++++++++++++++++++++++++------------------------ 2 files changed, 206 insertions(+), 135 deletions(-) diff --git a/HACKING.org b/HACKING.org index 374f5ad..59c3c04 100644 --- a/HACKING.org +++ b/HACKING.org @@ -50,7 +50,7 @@ be invoked with no argument in any CRDT shared buffer to access or modify that s - cursor :: body takes the form - =(site-id point-position-hint point-crdt-id mark-position-hint mark-crdt-id)= + =(buffer-name site-id point-position-hint point-crdt-id mark-position-hint mark-crdt-id)= =*-crdt-id= can be either a CRDT ID, or - =nil=, which means clear the point/mark - =""=, which means =(point-max)= @@ -85,11 +85,15 @@ be invoked with no argument in any CRDT shared buffer to access or modify that s - =content= is the string in the buffer - =crdt-id-list= is generated from =CRDT--DUMP-IDS= - - desync :: - Indicates that the server has stopped sharing a buffer. - body takes the form =(buffer-name)= + - add :: + Indicates that the server has started sharing some buffers. + body takes the form =buffer-name-list= + + - remove :: + Indicates that the server has stopped sharing some buffers. + body takes the form =buffer-name-list= - - resync :: + - get :: (TODO) Request the server to resend =sync= message for a buffer. body takes the form =(buffer-name)= diff --git a/crdt.el b/crdt.el index 95bc576..5f6d46e 100644 --- a/crdt.el +++ b/crdt.el @@ -1,4 +1,4 @@ -;;; crdt.el --- collaborative editing using Conflict-free Replicated Data Types +;;; crdt.el --- collaborative editing using Conflict-free Replicated Data Types -*- lexical-binding: t; -*- ;; ;; Copyright (C) 2020 Qiantan Hong ;; @@ -298,8 +298,6 @@ to avoid recusive calling of CRDT synchronization functions.") (crdt--defvar-session crdt--contact-table nil "A hash table that maps SITE-ID to CRDT--CONTACT-METADATAs.") -(defvar-local crdt--active-users-tmp nil) - (cl-defstruct (crdt--overlay-metadata (:constructor crdt--make-overlay-metadata (lamport-timestamp species front-advance rear-advance plist)) @@ -316,6 +314,8 @@ to avoid recusive calling of CRDT synchronization functions.") (crdt--defvar-permanent-local crdt--buffer-network-name) +(crdt--defvar-permanent-local crdt--buffer-sync-callback) + (crdt--defvar-session crdt--local-name nil) (crdt--defvar-session crdt--session-name nil) @@ -340,15 +340,29 @@ to avoid recusive calling of CRDT synchronization functions.") ;;; crdt-mode +(defun crdt--kill-buffer-hook () + (when crdt--buffer-network-name + (puthash crdt--buffer-network-name nil (crdt--buffer-table)) + (crdt--broadcast-maybe (crdt--format-message + `(cursor ,crdt--buffer-network-name + ,(crdt--local-id) nil nil nil nil))) + (when (eq (crdt--focused-buffer-name) crdt--buffer-network-name) + (crdt--broadcast-maybe (crdt--format-message + `(focus ,(crdt--local-id) nil))) + (setf (crdt--focused-buffer-name) nil)) + (crdt--refresh-users-maybe))) + (defun crdt--install-hooks () (add-hook 'after-change-functions #'crdt--after-change nil t) (add-hook 'before-change-functions #'crdt--before-change nil t) - (add-hook 'post-command-hook #'crdt--post-command nil t)) + (add-hook 'post-command-hook #'crdt--post-command nil t) + (add-hook 'kill-buffer-hook #'crdt--kill-buffer-hook nil t)) (defun crdt--uninstall-hooks () (remove-hook 'after-change-functions #'crdt--after-change t) (remove-hook 'before-change-functions #'crdt--before-change t) - (remove-hook 'post-command-hook #'crdt--post-command t)) + (remove-hook 'post-command-hook #'crdt--post-command t) + (remove-hook 'kill-buffer-hook #'crdt--kill-buffer-hook t)) (defsubst crdt--clear-pseudo-cursor-table () (when crdt--pseudo-cursor-table @@ -369,6 +383,46 @@ to avoid recusive calling of CRDT synchronization functions.") (crdt--clear-pseudo-cursor-table) (setq crdt--overlay-table nil))) +;;; Shared buffer utils +(defun crdt--with-buffer-name-make-body (name body fallback) + `(let (crdt-buffer) + (setq crdt-buffer (gethash ,name crdt--buffer-table)) + (if (and crdt-buffer (buffer-live-p crdt-buffer)) + (with-current-buffer crdt-buffer + ,@body) + (unless (process-contact crdt--network-process :server) + (setq crdt-buffer (generate-new-buffer (format "crdt - %s" ,name))) + (puthash ,name crdt-buffer crdt--buffer-table) + (let ((status-buffer (current-buffer))) + (with-current-buffer crdt-buffer + (setq crdt--buffer-network-name ,name) + (setq crdt--status-buffer status-buffer) + (crdt-mode) + ,(funcall fallback name body))))))) + +(defmacro crdt--with-buffer-name (name &rest body) + "Find CRDT shared buffer associated with NAME and evaluate BODY in it. +Must be called when CURRENT-BUFFER is a CRDT status buffer. +If such buffer doesn't exist yet, do nothing." + `(let (crdt-buffer) + (setq crdt-buffer (gethash ,name crdt--buffer-table)) + (when (and crdt-buffer (buffer-live-p crdt-buffer)) + (with-current-buffer crdt-buffer + (save-excursion + (widen) + ,@body))))) + +(defmacro crdt--with-buffer-name-pull (name &rest body) + (crdt--with-buffer-name-make-body + name body + (lambda (name body) + `(progn + (crdt--broadcast-maybe (crdt--format-message `(get ,,name))) + (let ((crdt--inhibit-update t)) + (insert "Synchronizing with server...")) + (setq crdt--buffer-sync-callback + (lambda () + ,@body)))))) ;;; Session menu (defun crdt--session-menu-goto () @@ -443,12 +497,19 @@ Otherwise use a dedicated buffer for displaying active users on CRDT-BUFFER." ;;; Buffer menu (defun crdt--buffer-menu-goto () (interactive) - (switch-to-buffer-other-window (tabulated-list-get-id))) + (let ((name (tabulated-list-get-id))) + (with-current-buffer crdt--status-buffer + (crdt--with-buffer-name-pull name + (switch-to-buffer-other-window (current-buffer)))))) (defun crdt--buffer-menu-kill () (interactive) - (with-current-buffer (tabulated-list-get-id) - (crdt-stop-share-buffer))) + (if (crdt--server-p) + (let ((name (tabulated-list-get-id))) + (with-current-buffer crdt--status-buffer + (crdt--with-buffer-name name + (crdt-stop-share-buffer)))) + (message "Only server can stop sharing a buffer."))) (defvar crdt-buffer-menu-mode-map (let ((map (make-sparse-keymap))) @@ -458,7 +519,7 @@ Otherwise use a dedicated buffer for displaying active users on CRDT-BUFFER." (define-derived-mode crdt-buffer-menu-mode tabulated-list-mode "CRDT User List" - (setq tabulated-list-format [("Buffer" 15 t) + (setq tabulated-list-format [("Local Buffer" 15 t) ("Network Name" 15 t) ("Users" 15 t)])) @@ -493,25 +554,22 @@ Otherwise use a dedicated buffer for displaying active users on CRDT-BUFFER." (with-current-buffer display-buffer (crdt-buffer-menu-mode) (setq tabulated-list-entries nil) - (maphash (lambda (k v) - (crdt--with-current-buffer - (gethash (crdt--contact-metadata-focused-buffer-name v) - (crdt--buffer-table)) - (push (crdt--contact-metadata-display-name v) crdt--active-users-tmp))) - (crdt--contact-table)) - (crdt--with-current-buffer - (gethash (crdt--focused-buffer-name) (crdt--buffer-table)) - (push (crdt--local-name) - crdt--active-users-tmp)) - (maphash (lambda (k v) - (push (list v (vector (buffer-name v) k - (mapconcat #'identity - (with-current-buffer v crdt--active-users-tmp) - ", "))) - tabulated-list-entries) - (with-current-buffer v - (setq crdt--active-users-tmp nil))) - (crdt--buffer-table)) + (let ((tmp-hashtable (make-hash-table :test 'equal))) + (maphash (lambda (k v) + (push (crdt--contact-metadata-display-name v) + (gethash (crdt--contact-metadata-focused-buffer-name v) + tmp-hashtable))) + (crdt--contact-table)) + (push (crdt--local-name) + (gethash (crdt--focused-buffer-name) + tmp-hashtable)) + (maphash (lambda (k v) + (push (list k (vector (if (and v (buffer-live-p v)) + (buffer-name v) + "--") + k (mapconcat #'identity (gethash k tmp-hashtable) ", "))) + tabulated-list-entries)) + (crdt--buffer-table))) (tabulated-list-init-header) (tabulated-list-print))) @@ -520,20 +578,24 @@ Otherwise use a dedicated buffer for displaying active users on CRDT-BUFFER." (crdt-refresh-buffers (crdt--buffer-menu-buffer))) (crdt--refresh-sessions-maybe)) - ;;; User menu (defun crdt--user-menu-goto () (interactive) - (let* ((site-id (tabulated-list-get-id)) - (focused-buffer - (with-current-buffer crdt--status-buffer - (gethash - (crdt--contact-metadata-focused-buffer-name - (gethash site-id crdt--contact-table)) - crdt--buffer-table)))) - (switch-to-buffer-other-window focused-buffer) - (when site-id - (goto-char (overlay-start (car (gethash site-id crdt--pseudo-cursor-table))))))) + (let ((site-id (tabulated-list-get-id))) + (if (eq site-id (crdt--local-id)) + (switch-to-buffer-other-window + (gethash (crdt--focused-buffer-name) (crdt--buffer-table))) + (unless + (cl-block nil + (let* ((metadata (or (gethash site-id (crdt--contact-table)) (cl-return))) + (buffer-name (or (crdt--contact-metadata-focused-buffer-name metadata) (cl-return)))) + (with-current-buffer crdt--status-buffer + (crdt--with-buffer-name-pull + buffer-name + (switch-to-buffer-other-window (current-buffer)) + (ignore-errors (goto-char (overlay-start (car (gethash site-id crdt--pseudo-cursor-table))))) + t)))) + (message "Doesn't have position information for this user yet."))))) (defvar crdt-user-menu-mode-map (let ((map (make-sparse-keymap))) @@ -790,7 +852,7 @@ Start the search from POS." (point-max))) (defun crdt--remote-cursor (site-id point-position-hint point-crdt-id mark-position-hint mark-crdt-id) - (when site-id + (when (and site-id (not (eq site-id (crdt--local-id)))) (let ((ov-pair (gethash site-id crdt--pseudo-cursor-table))) (if point-crdt-id (let* ((point (crdt--id-to-pos point-crdt-id point-position-hint)) @@ -955,7 +1017,27 @@ to server when WITHOUT is T." `(cursor ,crdt--buffer-network-name ,site-id ,point ,point-id-base64 ,mark ,mark-id-base64)))))) crdt--pseudo-cursor-table) - (process-send-string process (crdt--format-message (crdt--local-cursor nil))))) + (process-send-string process (crdt--format-message (crdt--local-cursor nil))) + + ;; synchronize tracked overlay + (maphash (lambda (k ov) + (let ((meta (overlay-get ov 'crdt-meta))) + (process-send-string + process + (crdt--format-message (crdt--overlay-add-message + (car k) (cdr k) + (crdt--overlay-metadata-species meta) + (crdt--overlay-metadata-front-advance meta) + (crdt--overlay-metadata-rear-advance meta) + (overlay-start ov) + (overlay-end ov)))) + (cl-loop for (prop value) on (crdt--overlay-metadata-plist meta) by #'cddr + do (process-send-string + process + (crdt--format-message `(overlay-put ,(car k) ,(cdr k) ,prop ,value)))))) + crdt--overlay-table) + + (process-send-string process (crdt--format-message `(ready ,crdt--buffer-network-name))))) (defun crdt--greet-client (process) (with-current-buffer (process-get process 'status-buffer) @@ -970,9 +1052,8 @@ to server when WITHOUT is T." `(login ,client-id ,crdt--session-name))) (cl-incf crdt--next-client-id)) - (maphash (lambda (k buffer) - (crdt--sync-buffer-to-client buffer process)) - crdt--buffer-table) + (process-send-string process (crdt--format-message + (cons 'add (hash-table-keys crdt--buffer-table)))) ;; synchronize contact (maphash (lambda (k v) (process-send-string @@ -991,48 +1072,7 @@ to server when WITHOUT is T." (let ((contact-message `(contact ,client-id ,(process-get process 'client-name) ,(process-contact process :host) ,(process-contact process :service)))) - (crdt-process-message contact-message process)) - ;; synchronize tracked overlay - (maphash (lambda (k buffer) - (with-current-buffer buffer - (maphash (lambda (k ov) - (let ((meta (overlay-get ov 'crdt-meta))) - (process-send-string - process - (crdt--format-message (crdt--overlay-add-message - (car k) (cdr k) - (crdt--overlay-metadata-species meta) - (crdt--overlay-metadata-front-advance meta) - (crdt--overlay-metadata-rear-advance meta) - (overlay-start ov) - (overlay-end ov)))) - (cl-loop for (prop value) on (crdt--overlay-metadata-plist meta) by #'cddr - do (process-send-string - process - (crdt--format-message `(overlay-put ,(car k) ,(cdr k) ,prop ,value)))))) - crdt--overlay-table))) - crdt--buffer-table)))) - -(defmacro crdt--with-buffer-name (name &rest body) - "Find CRDT shared buffer associated with NAME and evaluate BODY in it. -Must be called when CURRENT-BUFFER is a CRDT status buffer." - `(let (crdt-buffer) - (setq crdt-buffer (gethash ,name crdt--buffer-table)) - (if crdt-buffer - (with-current-buffer crdt-buffer - (save-excursion - (widen) - ,@body)) - (unless (process-contact crdt--network-process :server) - (setq crdt-buffer (generate-new-buffer (format "crdt - %s" ,name))) - (puthash ,name crdt-buffer crdt--buffer-table) - (with-current-buffer crdt-buffer - (setq crdt--buffer-network-name ,name) - (setq crdt--status-buffer (process-get process 'status-buffer)) - (crdt-mode) - (save-excursion - (widen) - ,@body)))))) + (crdt-process-message contact-message process))))) (cl-defgeneric crdt-process-message (message process)) @@ -1070,29 +1110,53 @@ Must be called when CURRENT-BUFFER is a CRDT status buffer." (defsubst crdt--server-p () (process-contact (crdt--network-process) :server)) +(cl-defmethod crdt-process-message ((message (head get)) process) + (cl-destructuring-bind (buffer-name) (cdr message) + (let ((buffer (gethash buffer-name crdt--buffer-table))) + (if (and buffer (buffer-live-p buffer)) + (crdt--sync-buffer-to-client buffer process) + (process-send-string process (crdt--format-message `(desync ,buffer-name))))))) + (cl-defmethod crdt-process-message ((message (head sync)) process) (unless (crdt--server-p) ; server shouldn't receive this (cl-destructuring-bind (buffer-name mode . ids) (cdr message) (crdt--with-buffer-name buffer-name - (erase-buffer) - (if (fboundp mode) - (unless (eq major-mode mode) - (funcall mode) ; trust your server... - (crdt-mode)) - (message "Server uses %s, but not available locally." mode)) - (crdt--load-ids ids))) + (let ((crdt--inhibit-update t)) + (erase-buffer) + (if (fboundp mode) + (unless (eq major-mode mode) + (funcall mode) ; trust your server... + (crdt-mode)) + (message "Server uses %s, but not available locally." mode)) + (crdt--load-ids ids)))) (crdt--refresh-buffers-maybe))) -(cl-defmethod crdt-process-message ((message (head desync)) process) - (cl-destructuring-bind (buffer-name) (cdr message) +(cl-defmethod crdt-process-message ((message (head ready)) process) + (unless (crdt--server-p) ; server shouldn't receive this + (cl-destructuring-bind (buffer-name) (cdr message) + (crdt--with-buffer-name + buffer-name + (when crdt--buffer-sync-callback + (funcall crdt--buffer-sync-callback) + (setq crdt--buffer-sync-callback nil)))))) + +(cl-defmethod crdt-process-message ((message (head add)) process) + (dolist (buffer-name (cdr message)) + (unless (gethash buffer-name crdt--buffer-table) + (puthash buffer-name nil crdt--buffer-table)))) + +(cl-defmethod crdt-process-message ((message (head remove)) process) + (dolist (buffer-name (cdr message)) (let ((buffer (gethash buffer-name crdt--buffer-table))) (when buffer - (with-current-buffer buffer - (crdt-mode 0) - (setq crdt--status-buffer nil)) - (remhash buffer-name crdt--buffer-table) - (message "Server stopped sharing %s." buffer-name)))) + (when (buffer-live-p buffer) + (with-current-buffer buffer + (crdt-mode 0) + (setq crdt--status-buffer nil))) + (remhash buffer-name crdt--buffer-table)))) + (message "Server stopped sharing %s." + (mapconcat #'identity (cdr message) ", ")) (crdt--broadcast-maybe (crdt--format-message message) (when process (process-get process 'client-id))) (crdt--refresh-buffers-maybe)) @@ -1136,9 +1200,9 @@ Must be called when CURRENT-BUFFER is a CRDT status buffer." (site-id buffer-name) (cdr message) (let ((existing-item (gethash site-id crdt--contact-table))) (setf (crdt--contact-metadata-focused-buffer-name existing-item) buffer-name)) - (when (and (= site-id 0) (not crdt--focused-buffer-name)) - (setq crdt--focused-buffer-name buffer-name) - (switch-to-buffer (gethash buffer-name crdt--buffer-table))) + ;; (when (and (= site-id 0) (not crdt--focused-buffer-name)) + ;; (setq crdt--focused-buffer-name buffer-name) + ;; (switch-to-buffer (gethash buffer-name crdt--buffer-table))) (crdt--refresh-users-maybe)) (crdt--broadcast-maybe (crdt--format-message message) (process-get process 'client-id))) @@ -1282,8 +1346,8 @@ If SESSION-NAME is empty, use the buffer name of the current buffer." (if (crdt--server-p) (let ((buffer-name crdt--buffer-network-name)) (with-current-buffer crdt--status-buffer - (let ((desync-message `(desync ,buffer-name))) - (crdt-process-message desync-message nil)))) + (let ((remove-message `(remove ,buffer-name))) + (crdt-process-message remove-message nil)))) (message "Only server can stop sharing a buffer.")) (message "Not a CRDT shared buffer."))) @@ -1342,9 +1406,11 @@ Disconnect if it's a client session, or stop serving if it's a server session." (kill-buffer (process-buffer client)))) (when crdt--user-menu-buffer (kill-buffer crdt--user-menu-buffer)) + (when crdt--buffer-menu-buffer + (kill-buffer crdt--buffer-menu-buffer)) (maphash (lambda (k v) - (when (buffer-live-p v) + (when (and v (buffer-live-p v)) (with-current-buffer v (setq crdt--status-buffer nil) (crdt-mode 0)))) @@ -1391,30 +1457,31 @@ Open a new buffer to display the shared content." port))) (unless name (setq name (crdt--read-name))) - (setq crdt--status-buffer - (with-current-buffer (generate-new-buffer "*crdt-client*") - (setq crdt--local-name name) - (condition-case err - (setq crdt--network-process - (make-network-process - :name "CRDT Client" - :buffer (current-buffer) - :host address - :family 'ipv4 - :service port - :filter #'crdt--network-filter - :sentinel #'crdt--client-process-sentinel - :plist `(status-buffer ,(current-buffer)))) - (t (kill-buffer (current-buffer)) - (signal (car err) (cdr err)))) - (setq crdt--session-name (format "%s:%s" address port)) - (push (current-buffer) crdt--session-list) - (setq crdt--local-clock 0) - (process-send-string crdt--network-process - (crdt--format-message `(hello ,name))) - (setq crdt--contact-table (make-hash-table :test 'equal)) - (setq crdt--buffer-table (make-hash-table :test 'equal)) - (setq crdt--status-buffer (current-buffer))))) + (crdt-list-buffer (with-current-buffer + (with-current-buffer (generate-new-buffer "*crdt-client*") + (setq crdt--local-name name) + (condition-case err + (setq crdt--network-process + (make-network-process + :name "CRDT Client" + :buffer (current-buffer) + :host address + :family 'ipv4 + :service port + :filter #'crdt--network-filter + :sentinel #'crdt--client-process-sentinel + :plist `(status-buffer ,(current-buffer)))) + (t (kill-buffer (current-buffer)) + (signal (car err) (cdr err)))) + (setq crdt--session-name (format "%s:%s" address port)) + (push (current-buffer) crdt--session-list) + (setq crdt--local-clock 0) + (process-send-string crdt--network-process + (crdt--format-message `(hello ,name))) + (setq crdt--contact-table (make-hash-table :test 'equal)) + (setq crdt--buffer-table (make-hash-table :test 'equal)) + (setq crdt--status-buffer (current-buffer))) + ))) (defun crdt-test-client () (interactive)