This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 82321a579 Improve fabric streams cleanup on error and timeouts
82321a579 is described below

commit 82321a579758e6660aa0d241761fd3b89041027a
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Fri Aug 2 02:25:04 2024 -0400

    Improve fabric streams cleanup on error and timeouts
    
    Previously, we performed cleanup only for specific errors such as
    `ddoc_updated`, and `insufficient_storage`. In case of other errors, or
    timeouts, there was a chance we would leak workers waiting to be either 
started
    or canceled. Those workers would then wait around until the 5 minute rexi
    timeout fires, and then they emit an error in the logs. It's not a big deal 
as
    that happens on errors only, and the processes are all waiting in receive,
    however, they do hold a Db handle open, so they can waste resources from 
that
    point of view.
    
    To fix that, this commit extends cleanup to other errors and timeouts.
    
    Moreover, in case of timeouts, we log fabric worker timeout errors. In 
order to
    do that we export the `fabric_streams` internal `#stream_acc` record to 
every
    `fabric_streams` user. That's a bit untidy, so make the timeout error return
    the defunct workers only, and so, we can avoid leaking the `#stream_acc` 
record
    outside the fabric_streams module.
    
    Related to https://github.com/apache/couchdb/issues/5127
---
 .../src/couch_replicator_fabric.erl                | 10 +---
 src/fabric/include/fabric.hrl                      |  8 ----
 src/fabric/src/fabric_streams.erl                  | 55 +++++++++++++++++++---
 src/fabric/src/fabric_view_all_docs.erl            | 10 +---
 src/fabric/src/fabric_view_changes.erl             | 11 +----
 src/fabric/src/fabric_view_map.erl                 | 12 +----
 src/fabric/src/fabric_view_reduce.erl              | 12 +----
 7 files changed, 59 insertions(+), 59 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl 
b/src/couch_replicator/src/couch_replicator_fabric.erl
index 43321f26f..cb441fea7 100644
--- a/src/couch_replicator/src/couch_replicator_fabric.erl
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -34,14 +34,8 @@ docs(DbName, Options, QueryArgs, Callback, Acc) ->
                 after
                     fabric_streams:cleanup(Workers)
                 end;
-            {timeout, NewState} ->
-                DefunctWorkers = fabric_util:remove_done_workers(
-                    NewState#stream_acc.workers, waiting
-                ),
-                fabric_util:log_timeout(
-                    DefunctWorkers,
-                    "replicator docs"
-                ),
+            {timeout, DefunctWorkers} ->
+                fabric_util:log_timeout(DefunctWorkers, "replicator docs"),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)
diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl
index dd312f028..6312741c2 100644
--- a/src/fabric/include/fabric.hrl
+++ b/src/fabric/include/fabric.hrl
@@ -32,14 +32,6 @@
     update_seq
 }).
 
--record(stream_acc, {
-    workers,
-    ready,
-    start_fun,
-    replacements,
-    ring_opts
-}).
-
 -record(view_row, {key, id, value, doc, worker}).
 
 -type row_property_key() :: id | key | value | doc | worker.
diff --git a/src/fabric/src/fabric_streams.erl 
b/src/fabric/src/fabric_streams.erl
index 3f14cde45..85a4978c4 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -23,9 +23,16 @@
     add_worker_to_cleaner/2
 ]).
 
--include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
+-record(stream_acc, {
+    workers,
+    ready,
+    start_fun,
+    replacements,
+    ring_opts
+}).
+
 -define(WORKER_CLEANER, fabric_worker_cleaner).
 
 % This is the streams equivalent of fabric_util:submit_jobs/4. Besides
@@ -77,7 +84,12 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
                 Workers
             ),
             {ok, AckedWorkers};
+        {timeout, #stream_acc{workers = Defunct}} ->
+            cleanup(Workers0),
+            DefunctWorkers = fabric_util:remove_done_workers(Defunct, waiting),
+            {timeout, DefunctWorkers};
         Else ->
+            cleanup(Workers0),
             Else
     end.
 
@@ -165,10 +177,7 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) 
->
                     {stop, St#stream_acc{workers = [], ready = Ready1}}
             end
     end;
-handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == 
insufficient_storage ->
-    WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
-    ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
-    cleanup(WaitingWorkers ++ ReadyWorkers),
+handle_stream_start({ok, Error}, _, _) when Error == ddoc_updated; Error == 
insufficient_storage ->
     {stop, Error};
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
@@ -236,7 +245,9 @@ worker_cleaner_test_() ->
                 ?TDEF_FE(should_clean_additional_worker_too),
                 ?TDEF_FE(coordinator_is_killed_if_client_disconnects),
                 ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
-                ?TDEF_FE(submit_jobs_sets_up_cleaner)
+                ?TDEF_FE(submit_jobs_sets_up_cleaner),
+                ?TDEF_FE(cleanup_called_on_timeout),
+                ?TDEF_FE(cleanup_called_on_error)
             ]
         }
     }.
@@ -442,7 +453,39 @@ submit_jobs_sets_up_cleaner(_) ->
         ?assert(is_process_alive(Cleaner))
     end.
 
+cleanup_called_on_timeout(_) ->
+    Ref1 = make_ref(),
+    Ref2 = make_ref(),
+    W1 = #shard{node = 'n1', ref = Ref1},
+    W2 = #shard{node = 'n2', ref = Ref2},
+    Workers = [W1, W2],
+    meck:expect(rexi_utils, recv, fun(_, _, _, Acc, _, _) ->
+        {timeout, Acc#stream_acc{workers = [{W2, waiting}]}}
+    end),
+    meck:reset(fabric_util),
+    Res = start(Workers, #shard.ref, undefined, undefined, []),
+    ?assertEqual({timeout, [W2]}, Res),
+    ?assert(meck:called(fabric_util, cleanup, 1)).
+
+cleanup_called_on_error(_) ->
+    Ref1 = make_ref(),
+    Ref2 = make_ref(),
+    W1 = #shard{node = 'n1', ref = Ref1},
+    W2 = #shard{node = 'n2', ref = Ref2},
+    Workers = [W1, W2],
+    meck:expect(rexi_utils, recv, fun(_, _, _, _, _, _) ->
+        {error, foo}
+    end),
+    meck:reset(fabric_util),
+    Res = start(Workers, #shard.ref, undefined, undefined, []),
+    ?assertEqual({error, foo}, Res),
+    ?assert(meck:called(fabric_util, cleanup, 1)).
+
 setup() ->
+    ok = meck:new(rexi_utils, [passthrough]),
+    ok = meck:new(config, [passthrough]),
+    ok = meck:new(fabric_util, [passthrough]),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
     ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
     % Speed up disconnect socket timeout for the test to 200 msec
     ok = meck:expect(chttpd_util, mochiweb_client_req_check_msec, 0, 200).
diff --git a/src/fabric/src/fabric_view_all_docs.erl 
b/src/fabric/src/fabric_view_all_docs.erl
index 3a03357c2..2d0133acb 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -37,14 +37,8 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, 
Callback, Acc) ->
                 after
                     fabric_streams:cleanup(Workers)
                 end;
-            {timeout, NewState} ->
-                DefunctWorkers = fabric_util:remove_done_workers(
-                    NewState#stream_acc.workers, waiting
-                ),
-                fabric_util:log_timeout(
-                    DefunctWorkers,
-                    "all_docs"
-                ),
+            {timeout, DefunctWorkers} ->
+                fabric_util:log_timeout(DefunctWorkers, "all_docs"),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)
diff --git a/src/fabric/src/fabric_view_changes.erl 
b/src/fabric/src/fabric_view_changes.erl
index 85bc7370c..410c057c2 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -199,15 +199,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, 
AccIn, Timeout) ->
                 after
                     fabric_streams:cleanup(Workers)
                 end;
-            {timeout, NewState} ->
-                DefunctWorkers = fabric_util:remove_done_workers(
-                    NewState#stream_acc.workers,
-                    waiting
-                ),
-                fabric_util:log_timeout(
-                    DefunctWorkers,
-                    "changes"
-                ),
+            {timeout, DefunctWorkers} ->
+                fabric_util:log_timeout(DefunctWorkers, "changes"),
                 throw({error, timeout});
             {error, Reason} ->
                 throw({error, Reason});
diff --git a/src/fabric/src/fabric_view_map.erl 
b/src/fabric/src/fabric_view_map.erl
index 6f13270a9..cc8ed6cf1 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -16,7 +16,6 @@
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) when
@@ -66,15 +65,8 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
                 after
                     fabric_streams:cleanup(Workers)
                 end;
-            {timeout, NewState} ->
-                DefunctWorkers = fabric_util:remove_done_workers(
-                    NewState#stream_acc.workers,
-                    waiting
-                ),
-                fabric_util:log_timeout(
-                    DefunctWorkers,
-                    "map_view"
-                ),
+            {timeout, DefunctWorkers} ->
+                fabric_util:log_timeout(DefunctWorkers, "map_view"),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)
diff --git a/src/fabric/src/fabric_view_reduce.erl 
b/src/fabric/src/fabric_view_reduce.erl
index 04d73bd94..3206d01a4 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -16,7 +16,6 @@
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) 
->
@@ -55,15 +54,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
                 after
                     fabric_streams:cleanup(Workers)
                 end;
-            {timeout, NewState} ->
-                DefunctWorkers = fabric_util:remove_done_workers(
-                    NewState#stream_acc.workers,
-                    waiting
-                ),
-                fabric_util:log_timeout(
-                    DefunctWorkers,
-                    "reduce_view"
-                ),
+            {timeout, DefunctWorkers} ->
+                fabric_util:log_timeout(DefunctWorkers, "reduce_view"),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)

Reply via email to