This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 6cad956d1af MINOR: update truncation test (#18952)
6cad956d1af is described below

commit 6cad956d1afd3ffb2f6b15dbeb2f0c868b6b7f11
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Feb 24 12:32:29 2025 -0800

    MINOR: update truncation test (#18952)
    
    Reduce the minISR to be 1 for the truncation test in order to skip the 
protection from KIP-966
    
    Reviewers: David Jacot <[email protected]>, Colin P. McCabe 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py         | 23 +++++------------------
 tests/kafkatest/tests/client/truncation_test.py | 12 ++++++++++--
 2 files changed, 15 insertions(+), 20 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 018bf382529..2ae8e20ec51 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1341,22 +1341,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
-    def set_unclean_leader_election(self, topic, value=True, node=None):
-        if node is None:
-            node = self.nodes[0]
-        if value is True:
-            self.logger.info("Enabling unclean leader election for topic %s", 
topic)
-        else:
-            self.logger.info("Disabling unclean leader election for topic %s", 
topic)
-
-        force_use_zk_connection = not 
self.all_nodes_configs_command_uses_bootstrap_server()
-
-        cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s --entity-name %s --entity-type topics --alter --add-config 
unclean.leader.election.enable=%s" % \
-              (self.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection), topic, str(value).lower())
-        self.logger.info("Running alter unclean leader command...\n%s" % cmd)
-        node.account.ssh(cmd)
-
     def kafka_acls_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None, 
override_command_config = None):
         if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
             raise Exception("Must invoke kafka-acls against a broker, not a 
KRaft controller")
@@ -1602,11 +1586,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             describe_output = self.describe_topic(topic, node, 
offline_nodes=offline_nodes)
             self.logger.debug(describe_output)
             requested_partition_line = 
self._describe_topic_line_for_partition(partition, describe_output)
-            # e.g. Topic: test_topic   Partition: 0    Leader: 3       
Replicas: 3,2   Isr: 3,2
+            # e.g. Topic: test_topic   Partition: 0    Leader: 3       
Replicas: 3,2   Isr: 3,2    Elr: 4  LastKnownElr: 5
             if not requested_partition_line:
                 raise Exception("Error finding partition state for topic %s 
and partition %d." % (topic, partition))
             isr_csv = requested_partition_line.split()[9] # 10th column from 
above
-            isr_idx_list = [int(i) for i in isr_csv.split(",")]
+            if isr_csv == "Elr:":
+                isr_idx_list = []
+            else:
+                isr_idx_list = [int(i) for i in isr_csv.split(",")]
 
         self.logger.info("Isr for topic %s and partition %d is now: %s" % 
(topic, partition, isr_idx_list))
         return isr_idx_list
diff --git a/tests/kafkatest/tests/client/truncation_test.py 
b/tests/kafkatest/tests/client/truncation_test.py
index 3a091c01a57..34575f33ba2 100644
--- a/tests/kafkatest/tests/client/truncation_test.py
+++ b/tests/kafkatest/tests/client/truncation_test.py
@@ -27,7 +27,9 @@ class TruncationTest(VerifiableConsumerTest):
     TOPICS = {
         TOPIC: {
             'partitions': NUM_PARTITIONS,
-            'replication-factor': 2
+            'replication-factor': 2,
+            'configs': {"min.insync.replicas": 1,
+                        "unclean.leader.election.enable": True}
         }
     }
     GROUP_ID = "truncation-test"
@@ -80,6 +82,9 @@ class TruncationTest(VerifiableConsumerTest):
         isr = self.kafka.isr_idx_list(self.TOPIC, 0)
         node1 = self.kafka.get_node(isr[0])
         self.kafka.stop_node(node1)
+        wait_until(lambda: len(self.kafka.isr_idx_list(self.TOPIC, 0)) == 1,
+                   timeout_sec=30,
+                   err_msg="The ISR update taking too long")
         self.logger.info("Reduced ISR to one node, consumer is at %s", 
consumer.current_position(tp))
 
         # Ensure remaining ISR member has a little bit of data
@@ -112,7 +117,10 @@ class TruncationTest(VerifiableConsumerTest):
 
         pre_truncation_pos = consumer.current_position(tp)
 
-        self.kafka.set_unclean_leader_election(self.TOPIC)
+        wait_until(lambda: len(self.kafka.isr_idx_list(self.TOPIC, 0)) == 1,
+           timeout_sec=30,
+           err_msg="The unclean leader election takes too long")
+
         self.logger.info("New unclean leader, consumer is at %s", 
consumer.current_position(tp))
 
         # Wait for truncation to be detected

Reply via email to