This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 79d0f55 MINOR: followup to Version Probing improvements2.2 (#7448)
79d0f55 is described below
commit 79d0f55ba7fe4e9d0a07026cb819b421fa0f7c00
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Oct 4 15:58:38 2019 -0700
MINOR: followup to Version Probing improvements2.2 (#7448)
Small follow-up to trunk PR #7426
While debugging the 2.3 VP PR we realized we should remove the
leader-tracking from the VP system test altogether. We'd already merged the
corresponding trunk PR so I made a quick new PR for trunk.
Reviewers: Guozhang Wang <[email protected]>
---
.../tests/streams/streams_upgrade_test.py | 44 ++--------------------
1 file changed, 3 insertions(+), 41 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 7b9a310..c315f72 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -84,8 +84,6 @@ class StreamsUpgradeTest(Test):
'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 },
}
- self.leader = None
- self.leader_counter = {}
processed_msg = "processed [0-9]* records"
@@ -311,13 +309,6 @@ class StreamsUpgradeTest(Test):
self.processors = [self.processor1, self.processor2, self.processor3]
self.old_processors = [self.processor1, self.processor2,
self.processor3]
self.upgraded_processors = []
- for p in self.processors:
- self.leader_counter[p] = 2
-
- self.update_leader()
- for p in self.processors:
- self.leader_counter[p] = 0
- self.leader_counter[self.leader] = 3
counter = 1
current_generation = 3
@@ -342,25 +333,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60,
err_msg="Never saw output
'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
- def update_leader(self):
- self.leader = None
- retries = 10
- while retries > 0:
- for p in self.processors:
- found = list(p.node.account.ssh_capture("grep \"Finished
assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
- if len(found) >= self.leader_counter[p] + 1:
- self.leader = p
- self.leader_counter[p] = self.leader_counter[p] + 1
-
- if self.leader is None:
- retries = retries - 1
- time.sleep(5)
- else:
- break
-
- if self.leader is None:
- raise Exception("Could not identify leader")
-
def get_version_string(self, version):
if version.startswith("0") or version.startswith("1") \
or version.startswith("2.0") or version.startswith("2.1"):
@@ -524,7 +496,6 @@ class StreamsUpgradeTest(Test):
node.account.ssh("mv " + processor.STDOUT_FILE + " " +
processor.STDOUT_FILE + "." + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.STDERR_FILE + " " +
processor.STDERR_FILE + "." + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.LOG_FILE + " " +
processor.LOG_FILE + "." + str(counter), allow_fail=False)
- self.leader_counter[processor] = 0
with node.account.monitor_log(processor.LOG_FILE) as
log_monitor:
processor.set_upgrade_to("future_version")
@@ -540,11 +511,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60,
err_msg="Could not detect
FutureStreamsPartitionAssignor in " + str(node.account))
- if processor == self.leader:
- self.update_leader()
- else:
- self.leader_counter[self.leader] =
self.leader_counter[self.leader] + 1
-
monitors = {}
monitors[processor] = log_monitor
monitors[first_other_processor] = first_other_monitor
@@ -598,12 +564,8 @@ class StreamsUpgradeTest(Test):
if generation_synchronized == False:
raise Exception("Never saw all three processors have
the synchronized generation number")
- if processor == self.leader:
- self.update_leader()
- else:
- self.leader_counter[self.leader] =
self.leader_counter[self.leader] + 1
- if self.leader in self.old_processors or
len(self.old_processors) > 0:
+ if len(self.old_processors) > 0:
self.verify_metadata_no_upgraded_yet()
return current_generation
@@ -616,9 +578,9 @@ class StreamsUpgradeTest(Test):
def verify_metadata_no_upgraded_yet(self):
for p in self.processors:
- found = list(p.node.account.ssh_capture("grep \"Sent a version 4
subscription and group leader.s latest supported version is 5. Upgrading
subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE,
allow_fail=True))
+ found = list(p.node.account.ssh_capture("grep \"Sent a version 4
subscription and group.s latest commonly supported version is 5 (successful
version probing and end of rolling upgrade). Upgrading subscription metadata
version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
if len(found) > 0:
- raise Exception("Kafka Streams failed with 'group member
upgraded to metadata 4 too early'")
+ raise Exception("Kafka Streams failed with 'group member
upgraded to metadata 5 too early'")
def get_topics_count(self):
count = 0