This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch ra in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit c81de8490830065667ff983b9bfc06d43108153a Author: Robert Newson <[email protected]> AuthorDate: Sun Sep 8 00:50:36 2024 +0100 couch write queue --- src/chttpd/src/chttpd_app.erl | 34 ++++++++++++++++++++++++++++ src/couch/src/couch_app.erl | 6 ----- src/couch/src/couch_write_queue.erl | 44 +++++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 6 deletions(-) diff --git a/src/chttpd/src/chttpd_app.erl b/src/chttpd/src/chttpd_app.erl index d7a5aef86..698ddbd34 100644 --- a/src/chttpd/src/chttpd_app.erl +++ b/src/chttpd/src/chttpd_app.erl @@ -15,7 +15,41 @@ -export([start/2, stop/1]). start(_Type, StartArgs) -> + start_ra(), chttpd_sup:start_link(StartArgs). stop(_State) -> ok. + +start_ra() -> + %% ensure Ra logs reach couch.log + logger:add_handler(couch_log, couch_logger_handler, #{}), + + %% start Ra + RaEnv = [ + {data_dir, config:get("couchdb", "ra_data_dir")} + ], + ra:start(RaEnv), + + %% create Ra clusters for ordering doc writes + [start_cluster(Nodes) || Nodes <- combinations(3, mem3:nodes())]. + +start_cluster(Nodes) -> + Name = list_to_atom( + "couch_write_queue_" ++ + lists:flatten(lists:join("_", [atom_to_list(N) || N <- lists:sort(Nodes)])) + ), + ra:start_cluster( + default, + Name, + {module, couch_write_queue, #{}}, + [{Name, N} || N <- Nodes] + ). + +%% https://rosettacode.org/wiki/Combinations#Erlang +combinations(0, _Nodes) -> + [[]]; +combinations(_, []) -> + []; +combinations(N, [Node | Rest]) -> + [[Node | List] || List <- combinations(N - 1, Rest)] ++ combinations(N, Rest). diff --git a/src/couch/src/couch_app.erl b/src/couch/src/couch_app.erl index fe6e48466..8cd8c8482 100644 --- a/src/couch/src/couch_app.erl +++ b/src/couch/src/couch_app.erl @@ -21,12 +21,6 @@ ]). start(_Type, _) -> - logger:add_handler(couch_log, couch_logger_handler, #{}), - RaEnv = [ - {data_dir, config:get("couchdb", "ra_data_dir")} - ], - ra:start(RaEnv), - case couch_sup:start_link() of {ok, _} = Resp -> {Time, _} = statistics(wall_clock), diff --git a/src/couch/src/couch_write_queue.erl b/src/couch/src/couch_write_queue.erl new file mode 100644 index 000000000..957e1bc5e --- /dev/null +++ b/src/couch/src/couch_write_queue.erl @@ -0,0 +1,44 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_write_queue). + +-behaviour(ra_machine). + +-export([ + init/1, + apply/3, + init_aux/1, + handle_aux/5 +]). + +-include_lib("couch/include/couch_db.hrl"). + +init(_Conf) -> + nil. + +apply(_Meta, {update_docs, DbName, Docs} = Command, State) when is_binary(DbName), is_list(Docs) -> + {State, ok, [{aux, Command}]}. + +init_aux(_Name) -> + nil. + +handle_aux(_RaftState, cast, {update_docs, DbName, Docs}, AuxState, IntState) -> + {ok, Db} = couch_db:open(DbName, [{create_if_missing, true}, ?ADMIN_CTX]), + try + Reply = couch_db:update_docs(Db, Docs, [], ?INTERACTIVE_EDIT), + {reply, Reply, AuxState, IntState} + after + couch_db:close(Db) + end; +handle_aux(_RaftState, cast, _Command, AuxState, IntState) -> + {no_reply, AuxState, IntState}.
