This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch parallel-preads
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0a74021f1c72938b6a4b6bbc5fb7dae12c6d5c24
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jan 13 12:12:28 2025 -0500

    Implement parallel preads
    
    Let clients issue concurrent pread calls without blocking each other or 
having
    to wait for all the writes and fsync calls.
    
    Even though at the POSIX level pread calls are thread-safe [1], Erlang OTP 
file
    backend forces a single controlling process for raw file handles. So, all 
our
    reads were always funnelled through the couch_file gen_server, having to 
queue
    up behind potentially slower writes. In particular this is problematic with
    remote file systems, where fsyncs and writes may take a lot longer while 
preads
    can hit the cache and return quicker.
    
    Parallel pread calls are implemented via a NIF which copies the pread and 
file
    closing bits from OTP's prim_file NIF [2]. Access to the shared handle is
    controlled via RW locks similar to how emmap does it [3]. Multiple readers 
can
    "read" acquire the RW lock and issue pread calls in parallel on the same 
file
    descriptor. If a writer acquires it, all the readers will have to wait for 
it.
    This kind of synchronization is necessary to carefully manage the closing
    state.
    
    In order to keep things simple the write path and the opening and handling 
of
    the main couch_file isn't affected. The pread parallel bypass is a pure
    opportunistic optimization when it's enabled; if not enabled, reads can 
proceed
    as they always did - through the gen_server.
    
    The cost of enabling it is using at most one extra file descriptor reference
    obtained via the dup() [4] system call from the main couch_file handle. 
Unlike
    another, newly opened file "descriptrion", the new "descriptor" is just a
    reference pointing to the exact same file description in the kernel and 
sharing
    all the buffers, position, modes, etc, with the main couch_file. The reason 
we
    need a new dup()-ed file descriptor is to manage closing very carefully. 
Since
    on POSIX systems file descriptors are just integers, it's very easy to
    accidentally read from an already closed and re-opened (by something else) 
file
    descriptor. That's why there are locks and a whole new file descriptor which
    our NIF controls.
    
    Another alternative was to use the exact same file descriptor as the main 
file,
    and then, after every single pread validate that the data was read from the
    same file by calling fstat and matching major/minor/inode numbers. Then also
    hoping that a pread on any random pipe/socket/stdio handle will never cause 
any
    issue, block or just quickly return an error.
    
    So far only checked that the cluster starts up, reads and writes go through,
    and a quick sequential benchmark indicates that the plain, sequential reads 
and
    writes haven't gotten worse, they all seemed to have improved a bit:
    
    ```
    > fabric_bench:go(#{q=>1, n=>1, doc_size=>small, docs=>100000}).
     *** Parameters
     * batch_size       : 1000
     * doc_size         : small
     * docs             : 100000
     * individual_docs  : 1000
     * n                : 1
     * q                : 1
    
     *** Environment
     * Nodes        : 1
     * Bench ver.   : 1
     * N            : 1
     * Q            : 1
     * OS           : unix/linux
    ```
    
    Each case ran 5 times and picked the best rate in ops/sec, so higher is 
better:
    
    ```
                                                    Default  CFile
    
    * Add 100000 docs, ok:100/accepted:0     (Hz):   16000    16000
    * Get random doc 100000X                 (Hz):    4900     5800
    * All docs                               (Hz):  120000   140000
    * All docs w/ include_docs               (Hz):   24000    31000
    * Changes                                (Hz):   49000    51000
    * Single doc updates 1000X               (Hz):     380      410
    ```
    
    [1] https://www.man7.org/linux/man-pages/man2/pread.2.html
    [2] 
https://github.com/erlang/otp/blob/maint-25/erts/emulator/nifs/unix/unix_prim_file.c
    [3] https://github.com/saleyn/emmap
    [4] https://www.man7.org/linux/man-pages/man2/dup.2.html
---
 src/couch/.gitignore                     |   1 +
 src/couch/priv/couch_cfile/couch_cfile.c | 436 +++++++++++++++++++++++++++++++
 src/couch/rebar.config.script            |   9 +-
 src/couch/src/couch_cfile.erl            | 245 +++++++++++++++++
 src/couch/src/couch_file.erl             |  94 +++++--
 5 files changed, 761 insertions(+), 24 deletions(-)

diff --git a/src/couch/.gitignore b/src/couch/.gitignore
index 861974adb..dd39c7ddf 100644
--- a/src/couch/.gitignore
+++ b/src/couch/.gitignore
@@ -5,6 +5,7 @@ ebin/
 priv/couch_js/config.h
 priv/couchjs
 priv/couchspawnkillable
+priv/couch_cfile/*.d
 priv/*.exp
 priv/*.lib
 priv/*.dll
diff --git a/src/couch/priv/couch_cfile/couch_cfile.c 
b/src/couch/priv/couch_cfile/couch_cfile.c
new file mode 100644
index 000000000..5bd4f8e79
--- /dev/null
+++ b/src/couch/priv/couch_cfile/couch_cfile.c
@@ -0,0 +1,436 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+// erl_driver.h is for erl_errno_id(errno)
+#include "erl_driver.h"
+#include "erl_nif.h"
+
+#ifndef _WIN32
+
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <stdbool.h>
+
+#endif
+
+static ErlNifResourceType* HANDLE_T;
+static ErlNifPid JANITOR_PID;
+
+static ERL_NIF_TERM ATOM_EOF;
+static ERL_NIF_TERM ATOM_ERROR;
+static ERL_NIF_TERM ATOM_OK;
+static ERL_NIF_TERM ATOM_CLOSE;
+
+typedef int posix_errno_t;
+
+typedef struct {
+    ErlNifRWLock *lock;
+    bool closed;
+    int fd;
+    int old_fd;
+} handle_t;
+
+static ERL_NIF_TERM ok_tup(ErlNifEnv* env, ERL_NIF_TERM res) {
+  return enif_make_tuple2(env, ATOM_OK, res);
+}
+
+static ERL_NIF_TERM err_tup(ErlNifEnv *env, posix_errno_t posix_errno) {
+    ERL_NIF_TERM error = enif_make_atom(env, erl_errno_id(posix_errno));
+    return enif_make_tuple2(env, ATOM_ERROR, error);
+}
+
+// This is a copy from OTP. For the sake of keeping the same behavior we use
+// the same logic. If it changes in OTP, consider updating this as well.
+//
+static void shift_iov(SysIOVec **iov, int *iovlen, ssize_t shift) {
+    SysIOVec *head_vec = (*iov);
+
+    while(shift > 0) {
+
+        if(shift < head_vec->iov_len) {
+            head_vec->iov_base = (char*)head_vec->iov_base + shift;
+            head_vec->iov_len -= shift;
+            break;
+        } else {
+            shift -= head_vec->iov_len;
+            head_vec++;
+        }
+    }
+
+    (*iovlen) -= head_vec - (*iov);
+    (*iov) = head_vec;
+}
+
+// Copied OTP to keep the same logic. Some differences are:
+//   - Use the plain pread only instead of preadv, as we don't load more than 
one
+//     scatter element anyway and preadv was causing compiler API warnings on 
MacOS
+//     due to slight type mismatches (const vs non-const iov vectors)
+//   - Instead of a handle struct, pass in fd and res_errno as separate args
+//
+static long efile_preadv(int fd, long offset, SysIOVec *iov, int iovlen, 
posix_errno_t* res_errno) {
+    unsigned long bytes_read;
+    long result;
+
+    bytes_read = 0;
+
+    do {
+        if(iovlen < 1) {
+            result = 0;
+            break;
+        }
+        result = pread(fd, iov->iov_base, iov->iov_len, offset);
+        if(result > 0) {
+            shift_iov(&iov, &iovlen, result);
+            bytes_read += result;
+            offset += result;
+        }
+    } while(result > 0 || (result < 0 && errno == EINTR));
+
+    *res_errno = errno;
+
+    if(result == 0 && bytes_read > 0) {
+        return bytes_read;
+    }
+
+    return result;
+}
+
+static int get_handle(ErlNifEnv *env, ERL_NIF_TERM arg, handle_t** h) {
+    return enif_get_resource(env, arg, HANDLE_T, (void**)h);
+}
+
+#define LOCK        enif_rwlock_rwlock(hdl->lock)
+#define UNLOCK      enif_rwlock_rwunlock(hdl->lock)
+#define READ_LOCK   enif_rwlock_rlock(hdl->lock)
+#define READ_UNLOCK enif_rwlock_runlock(hdl->lock)
+
+
+// Duplicate a file descriptor. This doesn't open a new "file description"
+// entry, just creates another lightweight file "descriptor" id for it.
+// Position, buffers, permission bits are all shared with the main description
+// entry. See https://www.man7.org/linux/man-pages/man2/dup.2.html for details
+//
+static ERL_NIF_TERM dup_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM 
argv[])
+{
+#ifndef _WIN32
+   int fd, newfd;
+   handle_t* hdl;
+   ErlNifRWLock *lock;
+   ERL_NIF_TERM res;
+
+   if (argc != 1 || !enif_is_number(env, argv[0])) {
+       return enif_make_badarg(env);
+   }
+   if (!enif_get_int(env, argv[0], &fd) || fd < 0) {
+       return err_tup(env, EINVAL);
+   }
+
+   lock = enif_rwlock_create("couch_cfile:rwlock");
+   if(!lock) {
+       return err_tup(env, ENOMEM);
+   }
+
+   newfd = dup(fd);
+   if (newfd < 0) {
+       enif_rwlock_destroy(lock);
+       return err_tup(env, errno);
+   }
+
+   hdl = (handle_t*) enif_alloc_resource(HANDLE_T, sizeof(handle_t));
+   if (!hdl) {
+       close(newfd);
+       enif_rwlock_destroy(lock);
+       return err_tup(env, ENOMEM);
+   }
+
+   hdl->lock = lock;
+   hdl->fd = newfd;
+   hdl->old_fd = fd;
+   hdl->closed = false;
+
+   res = enif_make_resource(env, hdl);
+   enif_release_resource(hdl);
+   return ok_tup(env, res);
+#else
+   return err_tup(env, ENOTSUP);
+#endif
+}
+
+static ERL_NIF_TERM close_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM 
argv[])
+{
+#ifndef _WIN32
+    handle_t* hdl;
+
+    if (argc != 1 || !get_handle(env, argv[0], &hdl)) {
+        return enif_make_badarg(env);
+    }
+
+    // ------ Critical section start ------
+    LOCK;
+    if (hdl->closed) {
+         UNLOCK;
+         return err_tup(env, EBADF);
+    }
+    hdl->closed = true;
+    if (close(hdl->fd) < 0) {
+        UNLOCK;
+        return err_tup(env, errno);
+    }
+    UNLOCK;
+    // ------ Critical section end ------
+
+    return ATOM_OK;
+#else
+   return err_tup(env, ENOTSUP);
+#endif
+}
+
+// This should be called from the janitor process only
+//
+static ERL_NIF_TERM close_fd_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM 
argv[])
+{
+#ifndef _WIN32
+   int fd;
+
+   if (argc != 1 || !enif_is_number(env, argv[0])) {
+       return enif_make_badarg(env);
+   }
+   if (!enif_get_int(env, argv[0], &fd) || fd < 0) {
+       return err_tup(env, EINVAL);
+   }
+
+   if (close(fd) < 0) {
+        return err_tup(env, errno);
+   }
+
+   return ATOM_OK;
+#else
+   return err_tup(env, ENOTSUP);
+#endif
+}
+
+
+// Follows pread_nif_impl from prim_file_nif.c
+//
+// Arguments:
+//   ref : handle
+//   offset : position
+//   block_size : how many bytes to read
+//
+static ERL_NIF_TERM pread_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM 
argv[])
+{
+#ifndef _WIN32
+   handle_t* hdl;
+   long offset, block_size, bytes_read;
+   SysIOVec io_vec[1];
+   int pread_errno = 0;
+   ErlNifBinary result;
+
+   if (argc != 3
+       || !get_handle(env, argv[0], &hdl)
+       || !enif_is_number(env, argv[1])
+       || !enif_is_number(env, argv[2])
+   ) {
+     return enif_make_badarg(env);
+   }
+
+   if (!enif_get_int64(env, argv[1], &offset)
+       || !enif_get_int64(env, argv[2], &block_size)
+       || offset < 0
+       || block_size < 0
+   ) {
+        return err_tup(env, EINVAL);
+   }
+
+   if (!enif_alloc_binary((size_t) block_size, &result)) {
+       return err_tup(env, ENOMEM);
+   }
+
+   io_vec[0].iov_base = (char *)result.data;
+   io_vec[0].iov_len = result.size;
+
+   // ------ Critical section start ------
+   READ_LOCK;
+   if (hdl->closed) {
+       READ_UNLOCK;
+       enif_release_binary(&result);
+       return err_tup(env, EBADF);
+   }
+   bytes_read = efile_preadv(hdl->fd, offset, io_vec, 1, &pread_errno);
+   READ_UNLOCK;
+   // ------ Critical section end ------
+
+   if (bytes_read < 0) {
+       enif_release_binary(&result);
+       return err_tup(env, pread_errno);
+   }
+   if (bytes_read == 0) {
+       enif_release_binary(&result);
+       return ok_tup(env, ATOM_EOF);
+   }
+   if (bytes_read < block_size && !enif_realloc_binary(&result, bytes_read)) {
+       enif_release_binary(&result);
+       return err_tup(env, ENOMEM);
+   }
+   return ok_tup(env, enif_make_binary(env, &result));
+#else
+   return err_tup(env, ENOTSUP);
+#endif
+}
+
+// Return a tuple with info about the handle The fields are:
+//   fd : file descriptor (int)
+//   old_fd : file descriptor we dup()-ed from
+//
+static ERL_NIF_TERM info_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM 
argv[])
+{
+#ifndef _WIN32
+    handle_t* hdl;
+    int fd, old_fd;
+
+    if (argc != 1 || !get_handle(env, argv[0], &hdl)) {
+        return enif_make_badarg(env);
+    }
+
+    // ------ Critical section start ------
+    READ_LOCK;
+    if (hdl->closed) {
+        READ_UNLOCK;
+        return err_tup(env, EBADF);
+    }
+    fd = hdl->fd;
+    old_fd = hdl->old_fd;
+    READ_UNLOCK;
+    // ------ Critical section end ------
+
+    return ok_tup(env, enif_make_tuple2(env,
+        enif_make_int(env, fd),
+        enif_make_int(env, old_fd)
+    ));
+#else
+    return err_tup(env, ENOTSUP);
+#endif
+}
+
+// Return the eof marker (the file size)
+//
+static ERL_NIF_TERM eof_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM 
argv[])
+{
+#ifndef _WIN32
+    handle_t* hdl;
+    struct stat data;
+
+    if (argc != 1 || !get_handle(env, argv[0], &hdl)) {
+        return enif_make_badarg(env);
+    }
+
+    // ------ Critical section start ------
+    READ_LOCK;
+    if (hdl->closed) {
+        READ_UNLOCK;
+        return err_tup(env, EBADF);
+    }
+    if (fstat(hdl->fd, &data) < 0) {
+      READ_UNLOCK;
+      return err_tup(env, errno);
+    }
+    READ_UNLOCK;
+    // ------ Critical section end ------
+
+    return ok_tup(env, enif_make_int64(env, data.st_size));
+#else
+    return err_tup(env, ENOTSUP);
+#endif
+}
+
+static void handle_dtor(ErlNifEnv* env, void *obj)
+{
+    handle_t* hdl = (handle_t*) obj;
+    bool delay_close = false;
+    int fd;
+    ERL_NIF_TERM msg;
+
+    // ------ Critical section start ------
+    LOCK;
+    if (!hdl->closed) {
+        hdl->closed = true;
+        delay_close = true;
+    }
+    fd = hdl->fd;
+    UNLOCK;
+    // ------ Critical section end ------
+
+    enif_rwlock_destroy(hdl->lock);
+
+    // We cannot block the main scheduler in GC to close fds. NFS remote files
+    // could be delayed indefinitely so we, at least, want to be make sure it
+    // happens on a dirty scheduler. See OTP prim_file_nif as example of this
+    // pattern. There the delayed closing is kicked of from a monitor instead.
+    if(delay_close) {
+        msg = enif_make_tuple2(env, ATOM_CLOSE, enif_make_int(env, fd));
+        enif_send(env, &JANITOR_PID, NULL, msg);
+    }
+}
+
+static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM pid)
+{
+
+  if(!enif_get_local_pid(env, pid, &JANITOR_PID)) {
+      return -1;
+  }
+
+  ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE);
+  HANDLE_T = enif_open_resource_type(env, NULL, "couch_cfile:hdl", 
handle_dtor, flags, NULL);
+  if(!HANDLE_T) {
+      return -1;
+  }
+
+  ATOM_EOF    = enif_make_atom(env, "eof");
+  ATOM_ERROR  = enif_make_atom(env, "error");
+  ATOM_OK     = enif_make_atom(env, "ok");
+  ATOM_CLOSE  = enif_make_atom(env, "close");
+
+  *priv_data = NULL;
+
+  return 0;
+}
+
+static void unload(ErlNifEnv *env, void* priv_data)
+{
+}
+
+static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, 
ERL_NIF_TERM load_info)
+{
+    if(*old_priv_data != NULL) {
+        return -1;
+    }
+    if(*priv_data != NULL) {
+        return -1;
+    }
+    if(load(env, priv_data, load_info)) {
+        return -1;
+    }
+    return 0;
+}
+
+static ErlNifFunc funcs[] = {
+    {"dup_nif",      1, dup_nif,      ERL_NIF_DIRTY_JOB_IO_BOUND},
+    {"close_nif",    1, close_nif,    ERL_NIF_DIRTY_JOB_IO_BOUND},
+    {"close_fd_nif", 1, close_fd_nif, ERL_NIF_DIRTY_JOB_IO_BOUND},
+    {"pread_nif",    3, pread_nif,    ERL_NIF_DIRTY_JOB_IO_BOUND},
+    {"eof_nif",      1, eof_nif,      ERL_NIF_DIRTY_JOB_IO_BOUND},
+    {"info_nif",     1, info_nif}
+};
+
+ERL_NIF_INIT(couch_cfile, funcs, load, NULL, upgrade, unload);
diff --git a/src/couch/rebar.config.script b/src/couch/rebar.config.script
index e26b6b608..aad8d3a5e 100644
--- a/src/couch/rebar.config.script
+++ b/src/couch/rebar.config.script
@@ -295,6 +295,11 @@ IcuWinEnv = [{"CFLAGS", "$DRV_CFLAGS /DXP_WIN"},
 ComparePath = "priv/couch_ejson_compare.so".
 CompareSrc = ["priv/couch_ejson_compare/*.c"].
 
+CouchCFileEnv = [{"CFLAGS", "$CFLAGS -Wall -Werror -DNDEBUG -O3"}].
+%CouchCFileEnv = [{"CFLAGS", "$CFLAGS -Wall -Werror -DDEBUG -O"}].
+CouchCFilePath = "priv/couch_cfile.so".
+CouchCFileSrc = ["priv/couch_cfile/*.c"].
+
 SpidermonkeySpecs = case WithSpidermonkey of
     true -> [{".*", CouchJSPath, CouchJSSrc, [{env, CouchJSEnv}]}];
     false -> []
@@ -305,7 +310,9 @@ PortSpecs = SpidermonkeySpecs ++ [
         {"darwin", ComparePath, CompareSrc, [{env, IcuEnv ++ IcuDarwinEnv}]},
         {"linux",  ComparePath, CompareSrc, [{env, IcuEnv}]},
         {"bsd",   ComparePath, CompareSrc, [{env, IcuEnv ++ IcuBsdEnv}]},
-        {"win32",  ComparePath, CompareSrc, [{env, IcuWinEnv}]}
+        {"win32",  ComparePath, CompareSrc, [{env, IcuWinEnv}]},
+        {"(linux|bsd|darwin)", CouchCFilePath, CouchCFileSrc, [{env, 
CouchCFileEnv}]},
+        {"win32",  CouchCFilePath, CouchCFileSrc, []}
 ].
 
 %% hack required until switch to enc/rebar3
diff --git a/src/couch/src/couch_cfile.erl b/src/couch/src/couch_cfile.erl
new file mode 100644
index 000000000..3a4dae4ce
--- /dev/null
+++ b/src/couch/src/couch_cfile.erl
@@ -0,0 +1,245 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This module can dup(licate)s raw file handles to create lightweight read-only
+% fds which are not tied to a single process. They also let clients completely
+% bypass the couch_file gen_server message queue to do reads.
+%
+% At the POSIX API level pread() functions are thread-safe so calls can be
+% issued in parallel by multiple threads. See these links to find out more
+% about dup() and pread():
+%
+%  - https://www.man7.org/linux/man-pages/man2/dup.2.html
+%  - https://www.man7.org/linux/man-pages/man2/pread.2.html
+%
+% Specifically note about pread:
+%  > The pread() and pwrite() system calls are especially useful in
+%    multithreaded applications. They allow multiple threads to
+%    perform I/O on the same file descriptor without being affected by
+%    changes to the file offset by other threads
+
+
+-module(couch_cfile).
+
+-export([
+    dup/1,
+    pread/2,
+    pread/3,
+    close/1,
+    position/2,
+    fd/1
+]).
+
+% Internal exports
+%
+-export([
+    janitor/0
+]).
+
+-on_load(init/0).
+
+-nifs([
+    dup_nif/1,
+    close_nif/1,
+    close_fd_nif/1,
+    pread_nif/3,
+    info_nif/1,
+    eof_nif/1
+]).
+
+-include_lib("kernel/include/file.hrl").
+
+% Dup(licate) an open file handle to obtain a read-only handle. The read-only
+% handle can used by any processs, not just the single controller process.
+%
+% The input argument handle can be an already existing read-only handle created
+% by a previous dup/1 call. In that case it will create a new one fd, which can
+% be closed indepentently. It will still point to the same underlying file
+% "description" object in the kernel.
+%
+% Handle objects returned from dup/1 follow the standard Erlang/OTP
+% #file_descriptor{} protocol, so they can be in fact be transparently used by
+% regular `file` module.
+%
+dup(#file_descriptor{module = Mod} = Fd)
+  when Mod =:= prim_file orelse Mod =:= ?MODULE
+->
+    case fd(Fd) of
+        {ok, FdInt} ->
+            case dup_nif(FdInt) of
+                {ok, Ref} -> make_handle(Fd, Ref);
+                {error, _} = Error -> Error
+            end;
+        {error, _} = Error ->
+            Error
+    end;
+dup(Fd) when is_integer(Fd) ->
+    case dup_nif(Fd) of
+        {ok, Ref} -> make_handle(Fd, Ref);
+        {error, _} = Error -> Error
+    end;
+dup(_) ->
+    {error, einval}.
+
+close(#file_descriptor{module = ?MODULE, data = Data}) ->
+    #{handle := Ref} = Data,
+    close_nif(Ref);
+close(_) ->
+    {error, einval}.
+
+pread(#file_descriptor{module = ?MODULE, data = Data}, Pos, Len) ->
+    #{handle := Ref} = Data,
+    pread_nif(Ref, Pos, Len);
+pread(_, _, _) ->
+    {error, einval}.
+
+pread(#file_descriptor{module = ?MODULE, data = Data}, LocNums) ->
+    #{handle := Ref} = Data,
+    pread_list(Ref, LocNums, []);
+pread(_, _) ->
+    {error, einval}.
+
+% Only position(Fd, eof) is supported as file size metadata read.
+%
+position(#file_descriptor{module = ?MODULE, data = Data}, eof) ->
+    #{handle := Ref} = Data,
+    eof_nif(Ref);
+position(_, _) ->
+    {error, einval}.
+
+fd(#file_descriptor{module = prim_file} = RawFd) ->
+    case prim_file:get_handle(RawFd) of
+        <<FdInt:32/native-signed-integer>> -> {ok, FdInt};
+        {error, _} = Error -> Error
+    end;
+fd(#file_descriptor{module = ?MODULE, data = Data}) ->
+    #{handle := Ref} = Data,
+    case info_nif(Ref) of
+        {ok, {FdInt, _}} -> {ok, FdInt};
+        {error, _} = Error -> Error
+    end;
+fd(Fd) when is_integer(Fd) ->
+    {ok, Fd};
+fd(_) ->
+    {error, einval}.
+
+% Internal helpers
+
+make_handle(#file_descriptor{module = Mod} = Orig, Ref)
+  when Mod =:= prim_file orelse Mod =:= ?MODULE
+->
+    Dup = #file_descriptor{module = ?MODULE, data = #{handle => Ref}},
+    case sanity_check(Orig, Dup) of
+        true ->
+            {ok, Dup};
+        false ->
+            close_nif(Ref),
+            {error, einval}
+    end;
+make_handle(Orig, Ref) when is_integer(Orig) ->
+    Dup = #file_descriptor{module = ?MODULE, data = #{handle => Ref}},
+    case sanity_check(Orig, Dup) of
+        true ->
+            {ok, Dup};
+        false ->
+            close_nif(Ref),
+            {error, einval}
+    end.
+
+sanity_check(Orig, #file_descriptor{} = Dup) ->
+    % Compare original and dup-ed saved origin fds. This should run after the
+    % dup call. Not sure how this could even fail (somehow the raw fd crashed
+    % and re-opened by someone else right before dup-ing?). Another note is
+    % that re-fetching both descriptors implicitly is asserting they haven't
+    % closed in the meantime.
+    case fd(Orig) of
+        {ok, Fd1} when is_integer(Fd1), Fd1 > -1 ->
+            #file_descriptor{module = ?MODULE, data = Data} = Dup,
+            #{handle := Ref} = Data,
+            case info_nif(Ref) of
+                {ok, {_, Fd2}} when is_integer(Fd2), Fd2 > -1 -> Fd1 =:= Fd2;
+                {ok, {_, _}} -> false;
+                {error, _} -> false
+            end;
+        {error, _} ->
+            false
+    end.
+
+% This is copied from the OTP pread logic.
+%
+pread_list(_Fd, [], ResultList) ->
+    {ok, lists:reverse(ResultList)};
+pread_list(Fd, [{Pos, Len} | Rest], ResultList) ->
+    case pread_nif(Fd, Pos, Len) of
+        {ok, Data} -> pread_list(Fd, Rest, [Data | ResultList]);
+        eof -> pread_list(Fd, Rest, [eof | ResultList]);
+        {error, _} = Error -> Error
+    end.
+
+% Nif init
+%
+init() ->
+    PrivDir =
+        case code:priv_dir(?MODULE) of
+            {error, _} ->
+                EbinDir = filename:dirname(code:which(?MODULE)),
+                AppPath = filename:dirname(EbinDir),
+                filename:join(AppPath, "priv");
+            Path ->
+                Path
+        end,
+    erlang:load_nif(filename:join(PrivDir, "couch_cfile"), spawn_janitor()).
+
+% Spawn a janitor process to run all the delayed close calls on the dirty IO
+% schedulers. This is what OTP does so we stick to the same pattern in order to
+% avoid re-inventing the wheel
+%
+spawn_janitor() ->
+    case whereis(?MODULE) of
+        ExistingPid when is_pid(ExistingPid) ->
+            ExistingPid;
+        _ ->
+            Pid = spawn(?MODULE, janitor, []),
+            register(?MODULE, Pid),
+            Pid
+    end.
+
+janitor() ->
+    % We want to crash the node if this process dies. This approximates the
+    % behavior of erts_internal:spawn_system_process/3.
+    link(whereis(init)),
+    loop().
+
+loop() ->
+    receive
+        {close, Fd} when is_integer(Fd) -> close_fd_nif(Fd);
+        _ -> ok
+    end,
+    loop().
+
+dup_nif(_) ->
+    erlang:nif_error(nif_not_loaded).
+
+close_nif(_) ->
+    erlang:nif_error(nif_not_loaded).
+
+close_fd_nif(_) ->
+    erlang:nif_error(nif_not_loaded).
+
+pread_nif(_, _, _) ->
+    erlang:nif_error(nif_not_loaded).
+
+eof_nif(_) ->
+    erlang:nif_error(nif_not_loaded).
+
+info_nif(_) ->
+    erlang:nif_error(nif_not_loaded).
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 8c7370688..fa6e60561 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -24,6 +24,9 @@
 -define(WRITE_XXHASH_CHECKSUMS_KEY, {?MODULE, write_xxhash_checksums}).
 -define(WRITE_XXHASH_CHECKSUMS_DEFAULT, false).
 
+-define(USE_CFILE_DEFAULT, false).
+-define(CFILE_HANDLE, cfile_handle).
+
 -type block_id() :: non_neg_integer().
 -type location() :: non_neg_integer().
 -type header_size() :: non_neg_integer().
@@ -191,11 +194,17 @@ pread_binaries(Fd, PosList) ->
     ZipFun = fun(Pos, {IoList, Checksum}) ->
         verify_checksum(Fd, Pos, iolist_to_binary(IoList), Checksum, false)
     end,
-    case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
+    case pread_iolists(Fd, PosList) of
         {ok, DataAndChecksums} -> {ok, lists:zipwith(ZipFun, PosList, 
DataAndChecksums)};
         Error -> Error
     end.
 
+pread_iolists(Fd, PosList) ->
+    case get_cfile(Fd) of
+        undefined -> ioq:call(Fd, {pread_iolists, PosList}, 
erlang:get(io_priority));
+        #file{} = CFile -> pread(CFile, PosList)
+    end.
+
 append_terms(Fd, Terms) ->
     append_terms(Fd, Terms, []).
 
@@ -434,7 +443,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                                     ok = fsync(Fd),
                                     maybe_track_open_os_files(Options),
                                     erlang:send_after(?INITIAL_WAIT, self(), 
maybe_close),
-                                    {ok, #file{fd = Fd, is_sys = IsSys}};
+                                    {ok, dup(#file{fd = Fd, is_sys = IsSys})};
                                 false ->
                                     ok = file:close(Fd),
                                     init_status_error(ReturnPid, Ref, {error, 
eexist})
@@ -442,7 +451,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                         false ->
                             maybe_track_open_os_files(Options),
                             erlang:send_after(?INITIAL_WAIT, self(), 
maybe_close),
-                            {ok, #file{fd = Fd, is_sys = IsSys}}
+                            {ok, dup(#file{fd = Fd, is_sys = IsSys})}
                     end;
                 Error ->
                     init_status_error(ReturnPid, Ref, Error)
@@ -459,7 +468,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                             maybe_track_open_os_files(Options),
                             {ok, Eof} = file:position(Fd, eof),
                             erlang:send_after(?INITIAL_WAIT, self(), 
maybe_close),
-                            {ok, #file{fd = Fd, eof = Eof, is_sys = IsSys}};
+                            {ok, dup(#file{fd = Fd, eof = Eof, is_sys = 
IsSys})};
                         Error ->
                             init_status_error(ReturnPid, Ref, Error)
                     end;
@@ -494,25 +503,7 @@ handle_call(close, _From, #file{fd = Fd} = File) ->
     {stop, normal, file:close(Fd), File#file{fd = nil}};
 handle_call({pread_iolists, PosL}, _From, File) ->
     update_read_timestamp(),
-    LocNums1 = [{Pos, 4} || Pos <- PosL],
-    DataSizes = read_multi_raw_iolists_int(File, LocNums1),
-    MapFun = fun({LenIoList, NextPos}) ->
-        case iolist_to_binary(LenIoList) of
-            % a checksum-prefixed term
-            <<1:1/integer, Len:31/integer>> -> {NextPos, Len + 16};
-            <<0:1/integer, Len:31/integer>> -> {NextPos, Len}
-        end
-    end,
-    LocNums2 = lists:map(MapFun, DataSizes),
-    Resps = read_multi_raw_iolists_int(File, LocNums2),
-    ZipFun = fun({LenIoList, _}, {FullIoList, _}) ->
-        case iolist_to_binary(LenIoList) of
-            <<1:1/integer, _:31/integer>> -> extract_checksum(FullIoList);
-            <<0:1/integer, _:31/integer>> -> {FullIoList, <<>>}
-        end
-    end,
-    Extracted = lists:zipwith(ZipFun, DataSizes, Resps),
-    {reply, {ok, Extracted}, File};
+    {reply, pread(File, PosL), File};
 handle_call(bytes, _From, #file{fd = Fd} = File) ->
     {reply, file:position(Fd, eof), File};
 handle_call({set_db_pid, Pid}, _From, #file{db_monitor = OldRef} = File) ->
@@ -597,6 +588,27 @@ format_status(_Opt, [PDict, #file{} = File]) ->
     {_Fd, FilePath} = couch_util:get_value(couch_file_fd, PDict),
     [{data, [{"State", File}, {"InitialFilePath", FilePath}]}].
 
+pread(#file{} = File, PosL) ->
+     LocNums1 = [{Pos, 4} || Pos <- PosL],
+    DataSizes = read_multi_raw_iolists_int(File, LocNums1),
+    MapFun = fun({LenIoList, NextPos}) ->
+        case iolist_to_binary(LenIoList) of
+            % a checksum-prefixed term
+            <<1:1/integer, Len:31/integer>> -> {NextPos, Len + 16};
+            <<0:1/integer, Len:31/integer>> -> {NextPos, Len}
+        end
+    end,
+    LocNums2 = lists:map(MapFun, DataSizes),
+    Resps = read_multi_raw_iolists_int(File, LocNums2),
+    ZipFun = fun({LenIoList, _}, {FullIoList, _}) ->
+        case iolist_to_binary(LenIoList) of
+            <<1:1/integer, _:31/integer>> -> extract_checksum(FullIoList);
+            <<0:1/integer, _:31/integer>> -> {FullIoList, <<>>}
+        end
+    end,
+    Extracted = lists:zipwith(ZipFun, DataSizes, Resps),
+    {ok, Extracted}.
+
 fsync(Fd) ->
     T0 = erlang:monotonic_time(),
     % We do not rely on mtime/atime for our safety/consitency so we can use
@@ -911,6 +923,42 @@ generate_xxhash_checksums() ->
             Val
     end.
 
+% couch_cfile handling
+%
+get_cfile(FdPid) when is_pid(FdPid) ->
+    case get({?CFILE_HANDLE, FdPid}) of
+        undefined ->
+            case couch_util:process_dict_get(FdPid, ?CFILE_HANDLE) of
+                undefined ->
+                    undefined;
+                #file{} = CFile ->
+                    put({?CFILE_HANDLE, FdPid}, CFile),
+                    CFile
+            end;
+        #file{} = CachedCFile ->
+            CachedCFile
+    end.
+
+dup(#file{fd = Fd} = File) ->
+    Default = ?USE_CFILE_DEFAULT,
+    Enabled = config:get_boolean("couchdb", "use_cfile", Default),
+    case {Enabled, os:type()} of
+        {_, {win32, _}} ->
+            File;
+        {true, {_, _}} ->
+            case couch_cfile:dup(Fd) of
+                {ok, CFd} ->
+                    % Can use atomics to share the exact eof position.
+                    % Effective infinity for now while kicking the tires
+                    put(?CFILE_HANDLE, File#file{fd = CFd, eof = 1 bsl 60}),
+                    File;
+                {error, _Error} ->
+                    File
+            end;
+        {false, {_, _}} ->
+            File
+    end.
+
 -ifdef(TEST).
 -include_lib("couch/include/couch_eunit.hrl").
 

Reply via email to