This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch remove-unused-option-in-couch-workers
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3f34786df752f00de4e2fb7fd51deef66db0542a
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Sun Jan 26 20:10:15 2025 -0500

    Remove multi_workers option from couch_work_queue
    
    Nothing was using it so let's remove it. "Deleted code is debugged code" as
    they say.
    
    It turns out its unit tests never covered the actual use case of the work 
queue
    -- sending terms back and form, it just tested binaries. So add a few tests 
for
    terms and conver some other error cases, thus increasing test coverage to 
100%.
---
 src/couch/src/couch_work_queue.erl              |  46 ++++----
 src/couch/test/eunit/couch_work_queue_tests.erl | 137 ++++++++++++++----------
 2 files changed, 101 insertions(+), 82 deletions(-)

diff --git a/src/couch/src/couch_work_queue.erl 
b/src/couch/src/couch_work_queue.erl
index ccd9d993a..f7d3fd9ab 100644
--- a/src/couch/src/couch_work_queue.erl
+++ b/src/couch/src/couch_work_queue.erl
@@ -29,9 +29,8 @@
     max_items,
     items = 0,
     size = 0,
-    work_waiters = [],
-    close_on_dequeue = false,
-    multi_workers = false
+    worker = undefined,
+    close_on_dequeue = false
 }).
 
 new(Options) ->
@@ -72,15 +71,17 @@ close(Wq) ->
 init(Options) ->
     Q = #q{
         max_size = couch_util:get_value(max_size, Options, nil),
-        max_items = couch_util:get_value(max_items, Options, nil),
-        multi_workers = couch_util:get_value(multi_workers, Options, false)
+        max_items = couch_util:get_value(max_items, Options, nil)
     },
     {ok, Q, hibernate}.
 
-terminate(_Reason, #q{work_waiters = Workers}) ->
-    lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
+terminate(_Reason, #q{worker = undefined}) ->
+    ok;
+terminate(_Reason, #q{worker = {W, _}}) ->
+    gen_server:reply(W, closed),
+    ok.
 
-handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
+handle_call({queue, Item, Size}, From, #q{worker = undefined} = Q0) ->
     Q = Q0#q{
         size = Q0#q.size + Size,
         items = Q0#q.items + 1,
@@ -95,23 +96,20 @@ handle_call({queue, Item, Size}, From, #q{work_waiters = 
[]} = Q0) ->
         false ->
             {reply, ok, Q, hibernate}
     end;
-handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = 
Q) ->
+handle_call({queue, Item, _}, _From, #q{worker = {W, _Max}} = Q) ->
     gen_server:reply(W, {ok, [Item]}),
-    {reply, ok, Q#q{work_waiters = Rest}, hibernate};
-handle_call({dequeue, Max}, From, Q) ->
-    #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q,
-    case {Workers, Multi} of
-        {[_ | _], false} ->
-            exit("Only one caller allowed to wait for this work at a time");
-        {[_ | _], true} ->
-            {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}};
-        _ ->
-            case Count of
-                0 ->
-                    {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}};
-                C when C > 0 ->
-                    deliver_queue_items(Max, Q)
-            end
+    {reply, ok, Q#q{worker = undefined}, hibernate};
+handle_call({dequeue, _Max}, _From, #q{worker = {_, _}}) ->
+    % Something went wrong - the same or a different worker is
+    % trying to dequeue an item. We only only allow one worker to wait
+    % for work at a time, so we exit with an error.
+    exit(multiple_workers_error);
+handle_call({dequeue, Max}, From, #q{worker = undefined, items = Count} = Q) ->
+    case Count of
+        0 ->
+            {noreply, Q#q{worker = {From, Max}}};
+        C when C > 0 ->
+            deliver_queue_items(Max, Q)
     end;
 handle_call(item_count, _From, Q) ->
     {reply, Q#q.items, Q};
diff --git a/src/couch/test/eunit/couch_work_queue_tests.erl 
b/src/couch/test/eunit/couch_work_queue_tests.erl
index 7c3854f61..ab0ef9353 100644
--- a/src/couch/test/eunit/couch_work_queue_tests.erl
+++ b/src/couch/test/eunit/couch_work_queue_tests.erl
@@ -31,27 +31,12 @@ setup_max_size() ->
 setup_max_items_and_size() ->
     setup([{max_size, 160}, {max_items, 3}]).
 
-setup_multi_workers() ->
-    {Q, Producer, Consumer1} = setup([
-        {max_size, 160},
-        {max_items, 3},
-        {multi_workers, true}
-    ]),
-    Consumer2 = spawn_consumer(Q),
-    Consumer3 = spawn_consumer(Q),
-    {Q, Producer, [Consumer1, Consumer2, Consumer3]}.
-
-teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
+teardown({Q, Producer, Consumer}) ->
     % consume all to unblock and let producer/consumer stop without timeout
-    [consume(Consumer, all) || Consumer <- Consumers],
-
+    consume(Consumer, all),
     ok = close_queue(Q),
     ok = stop(Producer, "producer"),
-    R = [stop(Consumer, "consumer") || Consumer <- Consumers],
-    R = [ok || _ <- Consumers],
-    ok;
-teardown({Q, Producer, Consumer}) ->
-    teardown({Q, Producer, [Consumer]}).
+    ok = stop(Consumer, "consumer").
 
 single_consumer_test_() ->
     {
@@ -87,29 +72,16 @@ single_consumer_test_() ->
         ]
     }.
 
-multiple_consumers_test_() ->
-    {
-        "Single producer and multiple consumers",
-        [
-            {
-                "Queue with max size of 160 bytes and 3 max items",
-                {
-                    foreach,
-                    fun setup_multi_workers/0,
-                    fun teardown/1,
-                    common_cases() ++ multiple_consumers()
-                }
-            }
-        ]
-    }.
-
 common_cases() ->
     [
         fun should_block_consumer_on_dequeue_from_empty_queue/1,
         fun should_consume_right_item/1,
         fun should_timeout_on_close_non_empty_queue/1,
         fun should_not_block_producer_for_non_empty_queue_after_close/1,
-        fun should_be_closed/1
+        fun should_be_closed/1,
+        fun should_produce_consume_any_term/1,
+        fun should_crash_on_multiple_consumers/1,
+        fun should_crash_on_random_messages/1
     ].
 
 single_consumer_max_item_count() ->
@@ -134,13 +106,6 @@ single_consumer_max_size() ->
 single_consumer_max_items_and_size() ->
     single_consumer_max_item_count() ++ single_consumer_max_size().
 
-multiple_consumers() ->
-    [
-        fun should_have_zero_size_for_new_queue/1,
-        fun should_have_no_items_for_new_queue/1,
-        fun should_increase_queue_size_on_produce/1
-    ].
-
 should_have_no_items_for_new_queue({Q, _, _}) ->
     ?_assertEqual(0, couch_work_queue:item_count(Q)).
 
@@ -279,19 +244,6 @@ 
should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
 
     ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).
 
-should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
-    ok = close_queue(Q),
-
-    [consume(C, 1) || C <- Consumers],
-
-    LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
-    ItemsCount = couch_work_queue:item_count(Q),
-    Size = couch_work_queue:size(Q),
-
-    ?_assertEqual(
-        {[closed, closed, closed], closed, closed},
-        {LastConsumerItems, ItemsCount, Size}
-    );
 should_be_closed({Q, _, Consumer}) ->
     ok = close_queue(Q),
 
@@ -306,6 +258,48 @@ should_be_closed({Q, _, Consumer}) ->
         {LastConsumerItems, ItemsCount, Size}
     ).
 
+should_produce_consume_any_term({Q, Producer, Consumer}) ->
+    Potato = produce_term(Q, Producer, potato, false),
+    Tomato = produce_term(Q, Producer, {tomato}, false),
+    Carrot = produce_term(Q, Producer, [carrot], false),
+
+    consume(Consumer, 2),
+    {ok, Items1} = last_consumer_items(Consumer),
+    ?_assertEqual([Potato, Tomato], Items1),
+
+    consume(Consumer, 1),
+    {ok, Items2} = last_consumer_items(Consumer),
+    ?_assertEqual([Carrot], Items2).
+
+should_crash_on_multiple_consumers({_Q, _Producer, _Consumer}) ->
+    % Do not want to crash the set up Q as that would terminate
+    % the test process, so we make a new one and unlink it
+    {ok, Q} = couch_work_queue:new([]),
+    unlink(Q),
+    Ref = monitor(process, Q),
+    Pid1 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end),
+    Pid2 = spawn(fun() -> couch_work_queue:dequeue(Q, 1) end),
+    Reason =
+        receive
+            {'DOWN', Ref, _, _, Res} -> Res
+        end,
+    exit(Pid1, kill),
+    exit(Pid2, kill),
+    ?_assertEqual(multiple_workers_error, Reason).
+
+should_crash_on_random_messages({_Q, _Producer, _Consumer}) ->
+    % Do not want to crash the set up Q as that would terminate
+    % the test process, so we make a new one and unlink it
+    {ok, Q} = couch_work_queue:new([]),
+    unlink(Q),
+    Ref = monitor(process, Q),
+    Q ! eggplant,
+    Reason =
+        receive
+            {'DOWN', Ref, _, _, Res} -> Res
+        end,
+    ?_assertEqual(eggplant, Reason).
+
 close_queue(Q) ->
     test_util:stop_sync(
         Q,
@@ -329,7 +323,10 @@ consumer_loop(Parent, Q, PrevItem) ->
         {last_item, Ref} ->
             Parent ! {item, Ref, PrevItem},
             consumer_loop(Parent, Q, PrevItem);
-        {consume, N} ->
+        {consume, all} ->
+            Result = couch_work_queue:dequeue(Q),
+            consumer_loop(Parent, Q, Result);
+        {consume, N} when is_integer(N) ->
             Result = couch_work_queue:dequeue(Q, N),
             consumer_loop(Parent, Q, Result)
     end.
@@ -345,7 +342,11 @@ producer_loop(Parent, Q) ->
         {ping, Ref} ->
             Parent ! {pong, Ref},
             producer_loop(Parent, Q);
-        {produce, Ref, Size} ->
+        {produce_term, Ref, Term} ->
+            Parent ! {item, Ref, Term},
+            ok = couch_work_queue:queue(Q, Term),
+            producer_loop(Parent, Q);
+        {produce_binary, Ref, Size} ->
             Item = crypto:strong_rand_bytes(Size),
             Parent ! {item, Ref, Item},
             ok = couch_work_queue:queue(Q, Item),
@@ -365,10 +366,30 @@ last_consumer_items(Consumer) ->
         timeout
     end.
 
+produce_term(Q, Producer, Term, Wait) ->
+    Ref = make_ref(),
+    ItemsCount = couch_work_queue:item_count(Q),
+    Producer ! {produce_term, Ref, Term},
+    receive
+        {item, Ref, Item} when Wait ->
+            ok = wait_increment(Q, ItemsCount),
+            Item;
+        {item, Ref, Item} ->
+            Item
+    after ?TIMEOUT ->
+        erlang:error(
+            {assertion_failed, [
+                {module, ?MODULE},
+                {line, ?LINE},
+                {reason, "Timeout asking producer to produce a term"}
+            ]}
+        )
+    end.
+
 produce(Q, Producer, Size, Wait) ->
     Ref = make_ref(),
     ItemsCount = couch_work_queue:item_count(Q),
-    Producer ! {produce, Ref, Size},
+    Producer ! {produce_binary, Ref, Size},
     receive
         {item, Ref, Item} when Wait ->
             ok = wait_increment(Q, ItemsCount),

Reply via email to