This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 82321a579 Improve fabric streams cleanup on error and timeouts
82321a579 is described below
commit 82321a579758e6660aa0d241761fd3b89041027a
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Fri Aug 2 02:25:04 2024 -0400
Improve fabric streams cleanup on error and timeouts
Previously, we performed cleanup only for specific errors such as
`ddoc_updated`, and `insufficient_storage`. In case of other errors, or
timeouts, there was a chance we would leak workers waiting to be either
started
or canceled. Those workers would then wait around until the 5 minute rexi
timeout fires, and then they emit an error in the logs. It's not a big deal
as
that happens on errors only, and the processes are all waiting in receive,
however, they do hold a Db handle open, so they can waste resources from
that
point of view.
To fix that, this commit extends cleanup to other errors and timeouts.
Moreover, in case of timeouts, we log fabric worker timeout errors. In
order to
do that we export the `fabric_streams` internal `#stream_acc` record to
every
`fabric_streams` user. That's a bit untidy, so make the timeout error return
the defunct workers only, and so, we can avoid leaking the `#stream_acc`
record
outside the fabric_streams module.
Related to https://github.com/apache/couchdb/issues/5127
---
.../src/couch_replicator_fabric.erl | 10 +---
src/fabric/include/fabric.hrl | 8 ----
src/fabric/src/fabric_streams.erl | 55 +++++++++++++++++++---
src/fabric/src/fabric_view_all_docs.erl | 10 +---
src/fabric/src/fabric_view_changes.erl | 11 +----
src/fabric/src/fabric_view_map.erl | 12 +----
src/fabric/src/fabric_view_reduce.erl | 12 +----
7 files changed, 59 insertions(+), 59 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl
b/src/couch_replicator/src/couch_replicator_fabric.erl
index 43321f26f..cb441fea7 100644
--- a/src/couch_replicator/src/couch_replicator_fabric.erl
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -34,14 +34,8 @@ docs(DbName, Options, QueryArgs, Callback, Acc) ->
after
fabric_streams:cleanup(Workers)
end;
- {timeout, NewState} ->
- DefunctWorkers = fabric_util:remove_done_workers(
- NewState#stream_acc.workers, waiting
- ),
- fabric_util:log_timeout(
- DefunctWorkers,
- "replicator docs"
- ),
+ {timeout, DefunctWorkers} ->
+ fabric_util:log_timeout(DefunctWorkers, "replicator docs"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl
index dd312f028..6312741c2 100644
--- a/src/fabric/include/fabric.hrl
+++ b/src/fabric/include/fabric.hrl
@@ -32,14 +32,6 @@
update_seq
}).
--record(stream_acc, {
- workers,
- ready,
- start_fun,
- replacements,
- ring_opts
-}).
-
-record(view_row, {key, id, value, doc, worker}).
-type row_property_key() :: id | key | value | doc | worker.
diff --git a/src/fabric/src/fabric_streams.erl
b/src/fabric/src/fabric_streams.erl
index 3f14cde45..85a4978c4 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -23,9 +23,16 @@
add_worker_to_cleaner/2
]).
--include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
+-record(stream_acc, {
+ workers,
+ ready,
+ start_fun,
+ replacements,
+ ring_opts
+}).
+
-define(WORKER_CLEANER, fabric_worker_cleaner).
% This is the streams equivalent of fabric_util:submit_jobs/4. Besides
@@ -77,7 +84,12 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Workers
),
{ok, AckedWorkers};
+ {timeout, #stream_acc{workers = Defunct}} ->
+ cleanup(Workers0),
+ DefunctWorkers = fabric_util:remove_done_workers(Defunct, waiting),
+ {timeout, DefunctWorkers};
Else ->
+ cleanup(Workers0),
Else
end.
@@ -165,10 +177,7 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St)
->
{stop, St#stream_acc{workers = [], ready = Ready1}}
end
end;
-handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error ==
insufficient_storage ->
- WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
- ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
- cleanup(WaitingWorkers ++ ReadyWorkers),
+handle_stream_start({ok, Error}, _, _) when Error == ddoc_updated; Error ==
insufficient_storage ->
{stop, Error};
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
@@ -236,7 +245,9 @@ worker_cleaner_test_() ->
?TDEF_FE(should_clean_additional_worker_too),
?TDEF_FE(coordinator_is_killed_if_client_disconnects),
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
- ?TDEF_FE(submit_jobs_sets_up_cleaner)
+ ?TDEF_FE(submit_jobs_sets_up_cleaner),
+ ?TDEF_FE(cleanup_called_on_timeout),
+ ?TDEF_FE(cleanup_called_on_error)
]
}
}.
@@ -442,7 +453,39 @@ submit_jobs_sets_up_cleaner(_) ->
?assert(is_process_alive(Cleaner))
end.
+cleanup_called_on_timeout(_) ->
+ Ref1 = make_ref(),
+ Ref2 = make_ref(),
+ W1 = #shard{node = 'n1', ref = Ref1},
+ W2 = #shard{node = 'n2', ref = Ref2},
+ Workers = [W1, W2],
+ meck:expect(rexi_utils, recv, fun(_, _, _, Acc, _, _) ->
+ {timeout, Acc#stream_acc{workers = [{W2, waiting}]}}
+ end),
+ meck:reset(fabric_util),
+ Res = start(Workers, #shard.ref, undefined, undefined, []),
+ ?assertEqual({timeout, [W2]}, Res),
+ ?assert(meck:called(fabric_util, cleanup, 1)).
+
+cleanup_called_on_error(_) ->
+ Ref1 = make_ref(),
+ Ref2 = make_ref(),
+ W1 = #shard{node = 'n1', ref = Ref1},
+ W2 = #shard{node = 'n2', ref = Ref2},
+ Workers = [W1, W2],
+ meck:expect(rexi_utils, recv, fun(_, _, _, _, _, _) ->
+ {error, foo}
+ end),
+ meck:reset(fabric_util),
+ Res = start(Workers, #shard.ref, undefined, undefined, []),
+ ?assertEqual({error, foo}, Res),
+ ?assert(meck:called(fabric_util, cleanup, 1)).
+
setup() ->
+ ok = meck:new(rexi_utils, [passthrough]),
+ ok = meck:new(config, [passthrough]),
+ ok = meck:new(fabric_util, [passthrough]),
+ meck:expect(config, get, fun(_, _, Default) -> Default end),
ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
% Speed up disconnect socket timeout for the test to 200 msec
ok = meck:expect(chttpd_util, mochiweb_client_req_check_msec, 0, 200).
diff --git a/src/fabric/src/fabric_view_all_docs.erl
b/src/fabric/src/fabric_view_all_docs.erl
index 3a03357c2..2d0133acb 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -37,14 +37,8 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs,
Callback, Acc) ->
after
fabric_streams:cleanup(Workers)
end;
- {timeout, NewState} ->
- DefunctWorkers = fabric_util:remove_done_workers(
- NewState#stream_acc.workers, waiting
- ),
- fabric_util:log_timeout(
- DefunctWorkers,
- "all_docs"
- ),
+ {timeout, DefunctWorkers} ->
+ fabric_util:log_timeout(DefunctWorkers, "all_docs"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
diff --git a/src/fabric/src/fabric_view_changes.erl
b/src/fabric/src/fabric_view_changes.erl
index 85bc7370c..410c057c2 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -199,15 +199,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs,
AccIn, Timeout) ->
after
fabric_streams:cleanup(Workers)
end;
- {timeout, NewState} ->
- DefunctWorkers = fabric_util:remove_done_workers(
- NewState#stream_acc.workers,
- waiting
- ),
- fabric_util:log_timeout(
- DefunctWorkers,
- "changes"
- ),
+ {timeout, DefunctWorkers} ->
+ fabric_util:log_timeout(DefunctWorkers, "changes"),
throw({error, timeout});
{error, Reason} ->
throw({error, Reason});
diff --git a/src/fabric/src/fabric_view_map.erl
b/src/fabric/src/fabric_view_map.erl
index 6f13270a9..cc8ed6cf1 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -16,7 +16,6 @@
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) when
@@ -66,15 +65,8 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
after
fabric_streams:cleanup(Workers)
end;
- {timeout, NewState} ->
- DefunctWorkers = fabric_util:remove_done_workers(
- NewState#stream_acc.workers,
- waiting
- ),
- fabric_util:log_timeout(
- DefunctWorkers,
- "map_view"
- ),
+ {timeout, DefunctWorkers} ->
+ fabric_util:log_timeout(DefunctWorkers, "map_view"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
diff --git a/src/fabric/src/fabric_view_reduce.erl
b/src/fabric/src/fabric_view_reduce.erl
index 04d73bd94..3206d01a4 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -16,7 +16,6 @@
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId)
->
@@ -55,15 +54,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
after
fabric_streams:cleanup(Workers)
end;
- {timeout, NewState} ->
- DefunctWorkers = fabric_util:remove_done_workers(
- NewState#stream_acc.workers,
- waiting
- ),
- fabric_util:log_timeout(
- DefunctWorkers,
- "reduce_view"
- ),
+ {timeout, DefunctWorkers} ->
+ fabric_util:log_timeout(DefunctWorkers, "reduce_view"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)