This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch remove-unused-option-in-couch-workers in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3f34786df752f00de4e2fb7fd51deef66db0542a Author: Nick Vatamaniuc <[email protected]> AuthorDate: Sun Jan 26 20:10:15 2025 -0500 Remove multi_workers option from couch_work_queue Nothing was using it so let's remove it. "Deleted code is debugged code" as they say. It turns out its unit tests never covered the actual use case of the work queue -- sending terms back and form, it just tested binaries. So add a few tests for terms and conver some other error cases, thus increasing test coverage to 100%. --- src/couch/src/couch_work_queue.erl | 46 ++++---- src/couch/test/eunit/couch_work_queue_tests.erl | 137 ++++++++++++++---------- 2 files changed, 101 insertions(+), 82 deletions(-) diff --git a/src/couch/src/couch_work_queue.erl b/src/couch/src/couch_work_queue.erl index ccd9d993a..f7d3fd9ab 100644 --- a/src/couch/src/couch_work_queue.erl +++ b/src/couch/src/couch_work_queue.erl @@ -29,9 +29,8 @@ max_items, items = 0, size = 0, - work_waiters = [], - close_on_dequeue = false, - multi_workers = false + worker = undefined, + close_on_dequeue = false }). new(Options) -> @@ -72,15 +71,17 @@ close(Wq) -> init(Options) -> Q = #q{ max_size = couch_util:get_value(max_size, Options, nil), - max_items = couch_util:get_value(max_items, Options, nil), - multi_workers = couch_util:get_value(multi_workers, Options, false) + max_items = couch_util:get_value(max_items, Options, nil) }, {ok, Q, hibernate}. -terminate(_Reason, #q{work_waiters = Workers}) -> - lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). +terminate(_Reason, #q{worker = undefined}) -> + ok; +terminate(_Reason, #q{worker = {W, _}}) -> + gen_server:reply(W, closed), + ok. -handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> +handle_call({queue, Item, Size}, From, #q{worker = undefined} = Q0) -> Q = Q0#q{ size = Q0#q.size + Size, items = Q0#q.items + 1, @@ -95,23 +96,20 @@ handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> false -> {reply, ok, Q, hibernate} end; -handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> +handle_call({queue, Item, _}, _From, #q{worker = {W, _Max}} = Q) -> gen_server:reply(W, {ok, [Item]}), - {reply, ok, Q#q{work_waiters = Rest}, hibernate}; -handle_call({dequeue, Max}, From, Q) -> - #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, - case {Workers, Multi} of - {[_ | _], false} -> - exit("Only one caller allowed to wait for this work at a time"); - {[_ | _], true} -> - {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; - _ -> - case Count of - 0 -> - {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; - C when C > 0 -> - deliver_queue_items(Max, Q) - end + {reply, ok, Q#q{worker = undefined}, hibernate}; +handle_call({dequeue, _Max}, _From, #q{worker = {_, _}}) -> + % Something went wrong - the same or a different worker is + % trying to dequeue an item. We only only allow one worker to wait + % for work at a time, so we exit with an error. + exit(multiple_workers_error); +handle_call({dequeue, Max}, From, #q{worker = undefined, items = Count} = Q) -> + case Count of + 0 -> + {noreply, Q#q{worker = {From, Max}}}; + C when C > 0 -> + deliver_queue_items(Max, Q) end; handle_call(item_count, _From, Q) -> {reply, Q#q.items, Q}; diff --git a/src/couch/test/eunit/couch_work_queue_tests.erl b/src/couch/test/eunit/couch_work_queue_tests.erl index 7c3854f61..ab0ef9353 100644 --- a/src/couch/test/eunit/couch_work_queue_tests.erl +++ b/src/couch/test/eunit/couch_work_queue_tests.erl @@ -31,27 +31,12 @@ setup_max_size() -> setup_max_items_and_size() -> setup([{max_size, 160}, {max_items, 3}]). -setup_multi_workers() -> - {Q, Producer, Consumer1} = setup([ - {max_size, 160}, - {max_items, 3}, - {multi_workers, true} - ]), - Consumer2 = spawn_consumer(Q), - Consumer3 = spawn_consumer(Q), - {Q, Producer, [Consumer1, Consumer2, Consumer3]}. - -teardown({Q, Producer, Consumers}) when is_list(Consumers) -> +teardown({Q, Producer, Consumer}) -> % consume all to unblock and let producer/consumer stop without timeout - [consume(Consumer, all) || Consumer <- Consumers], - + consume(Consumer, all), ok = close_queue(Q), ok = stop(Producer, "producer"), - R = [stop(Consumer, "consumer") || Consumer <- Consumers], - R = [ok || _ <- Consumers], - ok; -teardown({Q, Producer, Consumer}) -> - teardown({Q, Producer, [Consumer]}). + ok = stop(Consumer, "consumer"). single_consumer_test_() -> { @@ -87,29 +72,16 @@ single_consumer_test_() -> ] }. -multiple_consumers_test_() -> - { - "Single producer and multiple consumers", - [ - { - "Queue with max size of 160 bytes and 3 max items", - { - foreach, - fun setup_multi_workers/0, - fun teardown/1, - common_cases() ++ multiple_consumers() - } - } - ] - }. - common_cases() -> [ fun should_block_consumer_on_dequeue_from_empty_queue/1, fun should_consume_right_item/1, fun should_timeout_on_close_non_empty_queue/1, fun should_not_block_producer_for_non_empty_queue_after_close/1, - fun should_be_closed/1 + fun should_be_closed/1, + fun should_produce_consume_any_term/1, + fun should_crash_on_multiple_consumers/1, + fun should_crash_on_random_messages/1 ]. single_consumer_max_item_count() -> @@ -134,13 +106,6 @@ single_consumer_max_size() -> single_consumer_max_items_and_size() -> single_consumer_max_item_count() ++ single_consumer_max_size(). -multiple_consumers() -> - [ - fun should_have_zero_size_for_new_queue/1, - fun should_have_no_items_for_new_queue/1, - fun should_increase_queue_size_on_produce/1 - ]. - should_have_no_items_for_new_queue({Q, _, _}) -> ?_assertEqual(0, couch_work_queue:item_count(Q)). @@ -279,19 +244,6 @@ should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) -> ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}). -should_be_closed({Q, _, Consumers}) when is_list(Consumers) -> - ok = close_queue(Q), - - [consume(C, 1) || C <- Consumers], - - LastConsumerItems = [last_consumer_items(C) || C <- Consumers], - ItemsCount = couch_work_queue:item_count(Q), - Size = couch_work_queue:size(Q), - - ?_assertEqual( - {[closed, closed, closed], closed, closed}, - {LastConsumerItems, ItemsCount, Size} - ); should_be_closed({Q, _, Consumer}) -> ok = close_queue(Q), @@ -306,6 +258,48 @@ should_be_closed({Q, _, Consumer}) -> {LastConsumerItems, ItemsCount, Size} ). +should_produce_consume_any_term({Q, Producer, Consumer}) -> + Potato = produce_term(Q, Producer, potato, false), + Tomato = produce_term(Q, Producer, {tomato}, false), + Carrot = produce_term(Q, Producer, [carrot], false), + + consume(Consumer, 2), + {ok, Items1} = last_consumer_items(Consumer), + ?_assertEqual([Potato, Tomato], Items1), + + consume(Consumer, 1), + {ok, Items2} = last_consumer_items(Consumer), + ?_assertEqual([Carrot], Items2). + +should_crash_on_multiple_consumers({_Q, _Producer, _Consumer}) -> + % Do not want to crash the set up Q as that would terminate + % the test process, so we make a new one and unlink it + {ok, Q} = couch_work_queue:new([]), + unlink(Q), + Ref = monitor(process, Q), + Pid1 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end), + Pid2 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end), + Reason = + receive + {'DOWN', Ref, _, _, Res} -> Res + end, + exit(Pid1, kill), + exit(Pid2, kill), + ?_assertEqual(multiple_workers_error, Reason). + +should_crash_on_random_messages({_Q, _Producer, _Consumer}) -> + % Do not want to crash the set up Q as that would terminate + % the test process, so we make a new one and unlink it + {ok, Q} = couch_work_queue:new([]), + unlink(Q), + Ref = monitor(process, Q), + Q ! eggplant, + Reason = + receive + {'DOWN', Ref, _, _, Res} -> Res + end, + ?_assertEqual(eggplant, Reason). + close_queue(Q) -> test_util:stop_sync( Q, @@ -329,7 +323,10 @@ consumer_loop(Parent, Q, PrevItem) -> {last_item, Ref} -> Parent ! {item, Ref, PrevItem}, consumer_loop(Parent, Q, PrevItem); - {consume, N} -> + {consume, all} -> + Result = couch_work_queue:dequeue(Q), + consumer_loop(Parent, Q, Result); + {consume, N} when is_integer(N) -> Result = couch_work_queue:dequeue(Q, N), consumer_loop(Parent, Q, Result) end. @@ -345,7 +342,11 @@ producer_loop(Parent, Q) -> {ping, Ref} -> Parent ! {pong, Ref}, producer_loop(Parent, Q); - {produce, Ref, Size} -> + {produce_term, Ref, Term} -> + Parent ! {item, Ref, Term}, + ok = couch_work_queue:queue(Q, Term), + producer_loop(Parent, Q); + {produce_binary, Ref, Size} -> Item = crypto:strong_rand_bytes(Size), Parent ! {item, Ref, Item}, ok = couch_work_queue:queue(Q, Item), @@ -365,10 +366,30 @@ last_consumer_items(Consumer) -> timeout end. +produce_term(Q, Producer, Term, Wait) -> + Ref = make_ref(), + ItemsCount = couch_work_queue:item_count(Q), + Producer ! {produce_term, Ref, Term}, + receive + {item, Ref, Item} when Wait -> + ok = wait_increment(Q, ItemsCount), + Item; + {item, Ref, Item} -> + Item + after ?TIMEOUT -> + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout asking producer to produce a term"} + ]} + ) + end. + produce(Q, Producer, Size, Wait) -> Ref = make_ref(), ItemsCount = couch_work_queue:item_count(Q), - Producer ! {produce, Ref, Size}, + Producer ! {produce_binary, Ref, Size}, receive {item, Ref, Item} when Wait -> ok = wait_increment(Q, ItemsCount),
