This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
5e4ae06d12 is described below
commit 5e4ae06d129f01d94ed27f73ea311a2adb0477e7
Author: Hao Li <[email protected]>
AuthorDate: Thu Jul 21 12:12:29 2022 -0700
MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
* Description
In this test, when third proc join, sometimes there are other rebalance
scenarios such as followup joingroup request happens before syncgroup response
was received by one of the proc and the previously assigned tasks for that proc
is then lost during new joingroup request. This can result in standby tasks
assigned as 3, 1, 2. This PR relax the expected assignment of 2, 2, 2 to a
range of [1-3].
* Some backgroud from Guozhang:
I talked to @hao Li offline and also inspected the code a bit, and tl;dr is
that I think the code logic is correct (i.e. we do not really have a bug), but
we need to relax the test verification a little bit. The general idea behind
the subscription info is that:
When a client joins the group, its subscription will try to encode all its
current assigned active and standby tasks, which would be used as prev active
and standby tasks by the assignor in order to achieve some stickiness.
When a client drops all its active/standby tasks due to errors, it does not
actually report all empty from its subscription, instead it tries to check its
local state directory (you can see that from TaskManager#getTaskOffsetSums
which populates the taskOffsetSum. For active task, its offset would be “-2”
a.k.a. LATEST_OFFSET, for standby task, its offset is an actual numerical
number.
So in this case, the proc2 which drops all its active and standby tasks,
would still report all tasks that have some local state still, and since it was
previously owning all six tasks (three as active, and three as standby), it
would report all six as standbys, and when that happens the resulted assignment
as @hao Li verified, is indeed the un-even one.
So I think the actual “issue“ happens here, is when proc2 is a bit late
sending the sync-group request, when the previous rebalance has already
completed, and a follow-up rebalance has already triggered, in that case, the
resulted un-even assignment is indeed expected. Such a scenario, though not
common, is still legitimate since in practice all kinds of timing skewness
across instances can happen. So I think we should just relax our verification
here, i.e. just making sure that each [...]
Reviewers: Suhas Satish <[email protected]>, Guozhang Wang
<[email protected]>
---
.../tests/streams/streams_standby_replica_test.py | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index a8c07513c1..c0e5953f73 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -73,9 +73,9 @@ class StreamsStandbyTask(BaseStreamsTest):
processor_3.start()
- self.wait_for_verification(processor_1, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
- self.wait_for_verification(processor_2, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_2.STDOUT_FILE)
- self.wait_for_verification(processor_3, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
processor_1.stop()
@@ -93,9 +93,9 @@ class StreamsStandbyTask(BaseStreamsTest):
processor_2.start()
- self.wait_for_verification(processor_1, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
- self.wait_for_verification(processor_2, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_2.STDOUT_FILE)
- self.wait_for_verification(processor_3, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2)
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE, num_lines=2)
processor_3.stop()
@@ -112,10 +112,9 @@ class StreamsStandbyTask(BaseStreamsTest):
self.wait_for_verification(processor_2, "ACTIVE_TASKS:3
STANDBY_TASKS:3", processor_2.STDOUT_FILE, num_lines=2)
processor_1.start()
-
- self.wait_for_verification(processor_1, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
- self.wait_for_verification(processor_3, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_3.STDOUT_FILE)
- self.wait_for_verification(processor_2, "ACTIVE_TASKS:2
STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)
+ self.wait_for_verification(processor_1, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+ self.wait_for_verification(processor_2, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE, num_lines=2)
+ self.wait_for_verification(processor_3, "ACTIVE_TASKS:2
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
self.assert_consume(self.client_id, "assert all messages consumed from
%s" % self.streams_sink_topic_1,
self.streams_sink_topic_1, self.num_messages)