This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 6cad956d1af MINOR: update truncation test (#18952)
6cad956d1af is described below
commit 6cad956d1afd3ffb2f6b15dbeb2f0c868b6b7f11
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Feb 24 12:32:29 2025 -0800
MINOR: update truncation test (#18952)
Reduce the minISR to be 1 for the truncation test in order to skip the
protection from KIP-966
Reviewers: David Jacot <[email protected]>, Colin P. McCabe
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 23 +++++------------------
tests/kafkatest/tests/client/truncation_test.py | 12 ++++++++++--
2 files changed, 15 insertions(+), 20 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 018bf382529..2ae8e20ec51 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1341,22 +1341,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)
- def set_unclean_leader_election(self, topic, value=True, node=None):
- if node is None:
- node = self.nodes[0]
- if value is True:
- self.logger.info("Enabling unclean leader election for topic %s",
topic)
- else:
- self.logger.info("Disabling unclean leader election for topic %s",
topic)
-
- force_use_zk_connection = not
self.all_nodes_configs_command_uses_bootstrap_server()
-
- cmd = fix_opts_for_new_jvm(node)
- cmd += "%s --entity-name %s --entity-type topics --alter --add-config
unclean.leader.election.enable=%s" % \
- (self.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection), topic, str(value).lower())
- self.logger.info("Running alter unclean leader command...\n%s" % cmd)
- node.account.ssh(cmd)
-
def kafka_acls_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol = None,
override_command_config = None):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-acls against a broker, not a
KRaft controller")
@@ -1602,11 +1586,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
describe_output = self.describe_topic(topic, node,
offline_nodes=offline_nodes)
self.logger.debug(describe_output)
requested_partition_line =
self._describe_topic_line_for_partition(partition, describe_output)
- # e.g. Topic: test_topic Partition: 0 Leader: 3
Replicas: 3,2 Isr: 3,2
+ # e.g. Topic: test_topic Partition: 0 Leader: 3
Replicas: 3,2 Isr: 3,2 Elr: 4 LastKnownElr: 5
if not requested_partition_line:
raise Exception("Error finding partition state for topic %s
and partition %d." % (topic, partition))
isr_csv = requested_partition_line.split()[9] # 10th column from
above
- isr_idx_list = [int(i) for i in isr_csv.split(",")]
+ if isr_csv == "Elr:":
+ isr_idx_list = []
+ else:
+ isr_idx_list = [int(i) for i in isr_csv.split(",")]
self.logger.info("Isr for topic %s and partition %d is now: %s" %
(topic, partition, isr_idx_list))
return isr_idx_list
diff --git a/tests/kafkatest/tests/client/truncation_test.py
b/tests/kafkatest/tests/client/truncation_test.py
index 3a091c01a57..34575f33ba2 100644
--- a/tests/kafkatest/tests/client/truncation_test.py
+++ b/tests/kafkatest/tests/client/truncation_test.py
@@ -27,7 +27,9 @@ class TruncationTest(VerifiableConsumerTest):
TOPICS = {
TOPIC: {
'partitions': NUM_PARTITIONS,
- 'replication-factor': 2
+ 'replication-factor': 2,
+ 'configs': {"min.insync.replicas": 1,
+ "unclean.leader.election.enable": True}
}
}
GROUP_ID = "truncation-test"
@@ -80,6 +82,9 @@ class TruncationTest(VerifiableConsumerTest):
isr = self.kafka.isr_idx_list(self.TOPIC, 0)
node1 = self.kafka.get_node(isr[0])
self.kafka.stop_node(node1)
+ wait_until(lambda: len(self.kafka.isr_idx_list(self.TOPIC, 0)) == 1,
+ timeout_sec=30,
+ err_msg="The ISR update taking too long")
self.logger.info("Reduced ISR to one node, consumer is at %s",
consumer.current_position(tp))
# Ensure remaining ISR member has a little bit of data
@@ -112,7 +117,10 @@ class TruncationTest(VerifiableConsumerTest):
pre_truncation_pos = consumer.current_position(tp)
- self.kafka.set_unclean_leader_election(self.TOPIC)
+ wait_until(lambda: len(self.kafka.isr_idx_list(self.TOPIC, 0)) == 1,
+ timeout_sec=30,
+ err_msg="The unclean leader election takes too long")
+
self.logger.info("New unclean leader, consumer is at %s",
consumer.current_position(tp))
# Wait for truncation to be detected