This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch remove-unused-option-in-couch-workers in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7f278f42d2ca10acf5451a4cf5997d706d7c40b6 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Sun Jan 26 20:10:15 2025 -0500 Remove multi_workers option from couch_work_queue Nothing was using it so let's remove it. "Deleted code is debugged code" as they say. It turns out its unit tests never covered the actual use case of the work queue -- sending terms back and form, it just tested binaries. So add a few tests for terms and check some other error cases, thus increasing test coverage to 100%. --- src/couch/src/couch_work_queue.erl | 46 ++++---- src/couch/test/eunit/couch_work_queue_tests.erl | 135 ++++++++++++++---------- 2 files changed, 99 insertions(+), 82 deletions(-) diff --git a/src/couch/src/couch_work_queue.erl b/src/couch/src/couch_work_queue.erl index ccd9d993a..f7d3fd9ab 100644 --- a/src/couch/src/couch_work_queue.erl +++ b/src/couch/src/couch_work_queue.erl @@ -29,9 +29,8 @@ max_items, items = 0, size = 0, - work_waiters = [], - close_on_dequeue = false, - multi_workers = false + worker = undefined, + close_on_dequeue = false }). new(Options) -> @@ -72,15 +71,17 @@ close(Wq) -> init(Options) -> Q = #q{ max_size = couch_util:get_value(max_size, Options, nil), - max_items = couch_util:get_value(max_items, Options, nil), - multi_workers = couch_util:get_value(multi_workers, Options, false) + max_items = couch_util:get_value(max_items, Options, nil) }, {ok, Q, hibernate}. -terminate(_Reason, #q{work_waiters = Workers}) -> - lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). +terminate(_Reason, #q{worker = undefined}) -> + ok; +terminate(_Reason, #q{worker = {W, _}}) -> + gen_server:reply(W, closed), + ok. -handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> +handle_call({queue, Item, Size}, From, #q{worker = undefined} = Q0) -> Q = Q0#q{ size = Q0#q.size + Size, items = Q0#q.items + 1, @@ -95,23 +96,20 @@ handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> false -> {reply, ok, Q, hibernate} end; -handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> +handle_call({queue, Item, _}, _From, #q{worker = {W, _Max}} = Q) -> gen_server:reply(W, {ok, [Item]}), - {reply, ok, Q#q{work_waiters = Rest}, hibernate}; -handle_call({dequeue, Max}, From, Q) -> - #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, - case {Workers, Multi} of - {[_ | _], false} -> - exit("Only one caller allowed to wait for this work at a time"); - {[_ | _], true} -> - {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; - _ -> - case Count of - 0 -> - {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; - C when C > 0 -> - deliver_queue_items(Max, Q) - end + {reply, ok, Q#q{worker = undefined}, hibernate}; +handle_call({dequeue, _Max}, _From, #q{worker = {_, _}}) -> + % Something went wrong - the same or a different worker is + % trying to dequeue an item. We only only allow one worker to wait + % for work at a time, so we exit with an error. + exit(multiple_workers_error); +handle_call({dequeue, Max}, From, #q{worker = undefined, items = Count} = Q) -> + case Count of + 0 -> + {noreply, Q#q{worker = {From, Max}}}; + C when C > 0 -> + deliver_queue_items(Max, Q) end; handle_call(item_count, _From, Q) -> {reply, Q#q.items, Q}; diff --git a/src/couch/test/eunit/couch_work_queue_tests.erl b/src/couch/test/eunit/couch_work_queue_tests.erl index 7c3854f61..c41cf598e 100644 --- a/src/couch/test/eunit/couch_work_queue_tests.erl +++ b/src/couch/test/eunit/couch_work_queue_tests.erl @@ -31,27 +31,12 @@ setup_max_size() -> setup_max_items_and_size() -> setup([{max_size, 160}, {max_items, 3}]). -setup_multi_workers() -> - {Q, Producer, Consumer1} = setup([ - {max_size, 160}, - {max_items, 3}, - {multi_workers, true} - ]), - Consumer2 = spawn_consumer(Q), - Consumer3 = spawn_consumer(Q), - {Q, Producer, [Consumer1, Consumer2, Consumer3]}. - -teardown({Q, Producer, Consumers}) when is_list(Consumers) -> +teardown({Q, Producer, Consumer}) -> % consume all to unblock and let producer/consumer stop without timeout - [consume(Consumer, all) || Consumer <- Consumers], - + consume(Consumer, all), ok = close_queue(Q), ok = stop(Producer, "producer"), - R = [stop(Consumer, "consumer") || Consumer <- Consumers], - R = [ok || _ <- Consumers], - ok; -teardown({Q, Producer, Consumer}) -> - teardown({Q, Producer, [Consumer]}). + ok = stop(Consumer, "consumer"). single_consumer_test_() -> { @@ -87,29 +72,16 @@ single_consumer_test_() -> ] }. -multiple_consumers_test_() -> - { - "Single producer and multiple consumers", - [ - { - "Queue with max size of 160 bytes and 3 max items", - { - foreach, - fun setup_multi_workers/0, - fun teardown/1, - common_cases() ++ multiple_consumers() - } - } - ] - }. - common_cases() -> [ fun should_block_consumer_on_dequeue_from_empty_queue/1, fun should_consume_right_item/1, fun should_timeout_on_close_non_empty_queue/1, fun should_not_block_producer_for_non_empty_queue_after_close/1, - fun should_be_closed/1 + fun should_be_closed/1, + fun should_produce_consume_any_term/1, + fun should_crash_on_multiple_consumers/1, + fun should_crash_on_random_messages/1 ]. single_consumer_max_item_count() -> @@ -134,13 +106,6 @@ single_consumer_max_size() -> single_consumer_max_items_and_size() -> single_consumer_max_item_count() ++ single_consumer_max_size(). -multiple_consumers() -> - [ - fun should_have_zero_size_for_new_queue/1, - fun should_have_no_items_for_new_queue/1, - fun should_increase_queue_size_on_produce/1 - ]. - should_have_no_items_for_new_queue({Q, _, _}) -> ?_assertEqual(0, couch_work_queue:item_count(Q)). @@ -279,19 +244,6 @@ should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) -> ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}). -should_be_closed({Q, _, Consumers}) when is_list(Consumers) -> - ok = close_queue(Q), - - [consume(C, 1) || C <- Consumers], - - LastConsumerItems = [last_consumer_items(C) || C <- Consumers], - ItemsCount = couch_work_queue:item_count(Q), - Size = couch_work_queue:size(Q), - - ?_assertEqual( - {[closed, closed, closed], closed, closed}, - {LastConsumerItems, ItemsCount, Size} - ); should_be_closed({Q, _, Consumer}) -> ok = close_queue(Q), @@ -306,6 +258,46 @@ should_be_closed({Q, _, Consumer}) -> {LastConsumerItems, ItemsCount, Size} ). +should_produce_consume_any_term({Q, Producer, Consumer}) -> + Potato = produce_term(Q, Producer, potato, false), + Tomato = produce_term(Q, Producer, {tomato}, false), + Carrot = produce_term(Q, Producer, [carrot], false), + + consume(Consumer, 2), + {ok, Items1} = last_consumer_items(Consumer), + ?_assertEqual([Potato, Tomato], Items1), + + consume(Consumer, 1), + {ok, Items2} = last_consumer_items(Consumer), + ?_assertEqual([Carrot], Items2). + +should_crash_on_multiple_consumers({_Q, _Producer, _Consumer}) -> + % Do not want to crash the test, so set up a new Q process + {ok, Q} = couch_work_queue:new([]), + unlink(Q), + Ref = monitor(process, Q), + Pid1 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end), + Pid2 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end), + Reason = + receive + {'DOWN', Ref, _, _, Res} -> Res + end, + exit(Pid1, kill), + exit(Pid2, kill), + ?_assertEqual(multiple_workers_error, Reason). + +should_crash_on_random_messages({_Q, _Producer, _Consumer}) -> + % Do not want to crash the test, so set up a new Q process + {ok, Q} = couch_work_queue:new([]), + unlink(Q), + Ref = monitor(process, Q), + Q ! eggplant, + Reason = + receive + {'DOWN', Ref, _, _, Res} -> Res + end, + ?_assertEqual(eggplant, Reason). + close_queue(Q) -> test_util:stop_sync( Q, @@ -329,7 +321,10 @@ consumer_loop(Parent, Q, PrevItem) -> {last_item, Ref} -> Parent ! {item, Ref, PrevItem}, consumer_loop(Parent, Q, PrevItem); - {consume, N} -> + {consume, all} -> + Result = couch_work_queue:dequeue(Q), + consumer_loop(Parent, Q, Result); + {consume, N} when is_integer(N) -> Result = couch_work_queue:dequeue(Q, N), consumer_loop(Parent, Q, Result) end. @@ -345,7 +340,11 @@ producer_loop(Parent, Q) -> {ping, Ref} -> Parent ! {pong, Ref}, producer_loop(Parent, Q); - {produce, Ref, Size} -> + {produce_term, Ref, Term} -> + Parent ! {item, Ref, Term}, + ok = couch_work_queue:queue(Q, Term), + producer_loop(Parent, Q); + {produce_binary, Ref, Size} -> Item = crypto:strong_rand_bytes(Size), Parent ! {item, Ref, Item}, ok = couch_work_queue:queue(Q, Item), @@ -365,10 +364,30 @@ last_consumer_items(Consumer) -> timeout end. +produce_term(Q, Producer, Term, Wait) -> + Ref = make_ref(), + ItemsCount = couch_work_queue:item_count(Q), + Producer ! {produce_term, Ref, Term}, + receive + {item, Ref, Item} when Wait -> + ok = wait_increment(Q, ItemsCount), + Item; + {item, Ref, Item} -> + Item + after ?TIMEOUT -> + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout asking producer to produce a term"} + ]} + ) + end. + produce(Q, Producer, Size, Wait) -> Ref = make_ref(), ItemsCount = couch_work_queue:item_count(Q), - Producer ! {produce, Ref, Size}, + Producer ! {produce_binary, Ref, Size}, receive {item, Ref, Item} when Wait -> ok = wait_increment(Q, ItemsCount),
