Hello Guix!

One of the main sources of slowness when talking to a remote daemon, as
with GUIX_DAEMON_SOCKET=guix://…, is the many RPCs that translate in
lots of network round trips:

--8<---------------cut here---------------start------------->8---
$ GUIX_PROFILING=rpc ./pre-inst-env guix build inkscape -d --no-grafts
/gnu/store/iymxyy5sn0qrkivppl6fn0javnmr3nss-inkscape-0.92.1.drv
Remote procedure call summary: 1006 RPCs
  built-in-builders              ...     1
  add-to-store                   ...   136
  add-text-to-store              ...   869
--8<---------------cut here---------------end--------------->8---

In this example we’re making ~1,000 round trips; not good!

Before changing the protocol, an idea that came to mind is to do “RPC
pipelining”: send as many RPC requests at once, then read all the
corresponding responses.

It turns out to necessitate a small change in the daemon, though, but
the attached patch demonstrates it: the client buffers all
‘add-text-to-store’ RPCs, and writes them all at once when another RPC
is made (because other RPCs, which are not buffered, might depend on the
effect of those ‘add-text-to-store’ RPCs) or when the connection is
closed.  In practice, on the example above, it manages to buffer all 869
RPCs and send them all at once.

To estimate the effectiveness of this approach, I introduced delay on
the loopback device with tc-netem(8) and measured execution time (the
first run uses pipelining, the second doesn’t):

--8<---------------cut here---------------start------------->8---
$ sudo tc qdisc add dev lo root netem delay 150ms
$ time GUIX_DAEMON_SOCKET=guix://localhost ./pre-inst-env guix build inkscape 
-d --no-grafts
accepted connection from 127.0.0.1
/gnu/store/iymxyy5sn0qrkivppl6fn0javnmr3nss-inkscape-0.92.1.drv

;;; (flush-pending-rpcs 869)

real    0m47.796s
user    0m1.307s
sys     0m0.056s
$ time GUIX_DAEMON_SOCKET=guix://localhost guix build inkscape -d --no-grafts
accepted connection from 127.0.0.1
/gnu/store/iymxyy5sn0qrkivppl6fn0javnmr3nss-inkscape-0.92.1.drv

real    5m7.226s
user    0m1.392s
sys     0m0.056s
$ sudo tc qdisc del dev lo root
--8<---------------cut here---------------end--------------->8---

So the wall-clock time is divided by 6 thanks to ‘add-text-to-store’
pipelining, but it’s still pretty high due to the 136 ‘add-to-store’
RPCs which are still *not* pipelined.

It’s less clear what to do with these.  Buffering them would require
clients to compute the store file name of the files that are passed to
‘add-to-store’, which involves computing the hash of the files itself,
which can be quite costly and redundant with what the daemon will do
eventually anyway.  The CPU cost might be compensated for when latency
is high, but not when latency is low.

Anyway, food for thought!

For now, if those using Guix on clusters are willing to test the patch
below (notice that you need to run the patched guix-daemon as well), I’d
be interested in seeing how representative the above test is!

Ludo’.

diff --git a/guix/store.scm b/guix/store.scm
index b15da5485..1ba22cf2d 100644
--- a/guix/store.scm
+++ b/guix/store.scm
@@ -40,6 +40,7 @@
   #:use-module (ice-9 regex)
   #:use-module (ice-9 vlist)
   #:use-module (ice-9 popen)
+  #:use-module (ice-9 format)
   #:use-module (web uri)
   #:export (%daemon-socket-uri
             %gc-roots-directory
@@ -322,7 +323,7 @@
 
 (define-record-type <nix-server>
   (%make-nix-server socket major minor
-                    buffer flush
+                    buffer flush pending-rpcs
                     ats-cache atts-cache)
   nix-server?
   (socket nix-server-socket)
@@ -332,6 +333,10 @@
   (buffer nix-server-output-port)                 ;output port
   (flush  nix-server-flush-output)                ;thunk
 
+  ;; List of pending 'add-text-to-store' RPC arguments.
+  (pending-rpcs nix-server-pending-rpcs
+                set-nix-server-pending-rpcs!)
+
   ;; Caches.  We keep them per-connection, because store paths build
   ;; during the session are temporary GC roots kept for the duration of
   ;; the session.
@@ -509,7 +514,7 @@ for this connection will be pinned.  Return a server object."
                       (let ((conn (%make-nix-server port
                                                     (protocol-major v)
                                                     (protocol-minor v)
-                                                    output flush
+                                                    output flush '()
                                                     (make-hash-table 100)
                                                     (make-hash-table 100))))
                         (let loop ((done? (process-stderr conn)))
@@ -521,8 +526,17 @@ for this connection will be pinned.  Return a server object."
   (force-output (nix-server-output-port server))
   ((nix-server-flush-output server)))
 
+(define (flush-pending-rpcs server)
+  (let ((len (length (nix-server-pending-rpcs server))))
+    (when (> len 0)
+      (pk 'flush-pending-rpcs len)
+      (add-data-to-store/multiple server
+                                  (reverse (nix-server-pending-rpcs server)))
+      (set-nix-server-pending-rpcs! server '()))))
+
 (define (close-connection server)
   "Close the connection to SERVER."
+  (flush-pending-rpcs server)
   (close (nix-server-socket server)))
 
 (define-syntax-rule (with-store store exp ...)
@@ -811,6 +825,8 @@ bytevector) as its internal buffer, and a thunk to flush this output port."
        docstring
        (let* ((s (nix-server-socket server))
               (buffered (nix-server-output-port server)))
+         (unless (eq? 'name 'add-text-to-store)
+           (flush-pending-rpcs server))
          (record-operation 'name)
          (write-int (operation-id name) buffered)
          (write-arg type arg buffered)
@@ -822,6 +838,32 @@ bytevector) as its internal buffer, and a thunk to flush this output port."
            (or done? (loop (process-stderr server))))
          (values (read-arg return s) ...))))))
 
+
+(define-syntax operation-pipeline
+  (syntax-rules ()
+    "Define a client-side RPC stub for the given operation."
+    ((_ (name (type arg) ...) docstring return ...)
+     (lambda (server arg-list)
+       docstring
+       (let* ((s (nix-server-socket server))
+              (buffered (nix-server-output-port server)))
+         (record-operation 'name)
+         (for-each (match-lambda
+                     ((arg ...)
+                      (write-int (operation-id name) buffered)
+                      (write-arg type arg buffered)
+                      ...))
+                   arg-list)
+         (write-buffered-output server)
+
+         (map (lambda (_)
+                ;; Loop until the server is done sending error output.
+                (let loop ((done? (process-stderr server)))
+                  (or done? (loop (process-stderr server))))
+
+                (list (read-arg return s) ...))
+              arg-list))))))
+
 (define-syntax-rule (define-operation (name args ...)
                       docstring return ...)
   (define name
@@ -856,6 +898,20 @@ string).  Raise an error if no such path exists."
   "Return the info (hash, references, etc.) for PATH."
   path-info)
 
+(define add-data-to-store/multiple
+  (operation-pipeline
+   (add-text-to-store (string name) (bytevector text)
+                      (string-list references))
+   #f
+   store-path))
+
+(define (add-data-to-store/buffer server name bytes references)
+  (let ((pending (nix-server-pending-rpcs server)))
+    (set-nix-server-pending-rpcs! server
+                                  (cons (list name bytes references)
+                                        pending))
+    (text-output-path name bytes references)))
+
 (define add-data-to-store
   ;; A memoizing version of `add-to-store', to avoid repeated RPCs with
   ;; the very same arguments during a given session.
@@ -871,7 +927,7 @@ path."
       (let* ((args  `(,bytes ,name ,references))
              (cache (nix-server-add-text-to-store-cache server)))
         (or (hash-ref cache args)
-            (let ((path (add-text-to-store server name bytes references)))
+            (let ((path (add-data-to-store/buffer server name bytes references)))
               (hash-set! cache args path)
               path))))))
 
@@ -1485,6 +1541,16 @@ the derivation called NAME with hash HASH."
                   name
                   (string-append name "-" output))))
 
+(define (text-output-path name bv references)
+  "Return an output path for NAME, with contents BV and the given REFERENCES.
+The result is the same as that produced by 'add-data-to-store' with the same
+arguments."
+  (store-path (string-append "text"
+                             (string-join (sort references string<?)
+                                          ":" 'prefix))
+              (sha256 bv)
+              name))
+
 (define* (fixed-output-path name hash
                             #:key
                             (output "out")
diff --git a/nix/nix-daemon/nix-daemon.cc b/nix/nix-daemon/nix-daemon.cc
index 7d26b6135..72851e1cb 100644
--- a/nix/nix-daemon/nix-daemon.cc
+++ b/nix/nix-daemon/nix-daemon.cc
@@ -9,6 +9,7 @@
 #include "builtins.hh"
 
 #include <algorithm>
+#include <iostream>
 
 #include <cstring>
 #include <unistd.h>
@@ -79,8 +80,7 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
 
 
 /* Return true if the remote side has closed its end of the
-   connection, false otherwise.  Should not be called on any socket on
-   which we expect input! */
+   connection, false otherwise.  */
 static bool isFarSideClosed(int socket)
 {
     struct timeval timeout;
@@ -95,17 +95,24 @@ static bool isFarSideClosed(int socket)
 
     if (!FD_ISSET(socket, &fds)) return false;
 
-    /* Destructive read to determine whether the select() marked the
-       socket as readable because there is actual input or because
-       we've reached EOF (i.e., a read of size 0 is available). */
-    char c;
-    int rd;
-    if ((rd = read(socket, &c, 1)) > 0)
-        throw Error("EOF expected (protocol error?)");
-    else if (rd == -1 && errno != ECONNRESET)
-        throw SysError("expected connection reset or EOF");
+    /* Check whether whether 'select' marked the socket as readable because
+       there is actual input or because we've reached EOF (i.e., a read of
+       size 0 is available).  */
+    char c; int rd;
+    do {
+	rd = recv(socket, &c, sizeof c, MSG_PEEK);
+    }
+    while (rd == -1 && errno == EINTR);
 
-    return true;
+    if (rd == -1) {
+	if (errno == ECONNRESET)
+	    /* Remote side is definitely closed.  */
+	    return true;
+	else
+	    throw SysError("while peeking client input");
+    }
+
+    return rd == 0;
 }
 
 
@@ -136,9 +143,6 @@ static void sigPollHandler(int sigNo)
                 const char * s = "SIGPOLL\n";
                 write(STDERR_FILENO, s, strlen(s));
             }
-        } else {
-            const char * s = "spurious SIGPOLL\n";
-            write(STDERR_FILENO, s, strlen(s));
         }
     }
     catch (Error & e) {
@@ -847,8 +851,8 @@ static void acceptConnection(int fdSocket)
 
 	  /* If we're on a TCP connection, disable Nagle's algorithm so that
 	     data is sent as soon as possible.  */
-	  (void) setsockopt(remote, SOL_TCP, TCP_NODELAY,
-			    &enabled, sizeof enabled);
+	  // (void) setsockopt(remote, SOL_TCP, TCP_NODELAY,
+	  // 		    &enabled, sizeof enabled);
 
 #if defined(TCP_QUICKACK)
 	  /* Enable TCP quick-ack if applicable; this might help a little.  */

Reply via email to