This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed9cb08dfe9 KAFKA-17977 Remove new_consumer from E2E (#17798)
ed9cb08dfe9 is described below

commit ed9cb08dfe9178d7427f354fb0b87a964233cd22
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Nov 15 16:26:26 2024 +0800

    KAFKA-17977 Remove new_consumer from E2E (#17798)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 tests/kafkatest/services/console_consumer.py                     | 6 +-----
 tests/kafkatest/services/kafka/kafka.py                          | 2 +-
 tests/kafkatest/tests/core/compatibility_test_new_broker_test.py | 6 ++----
 tests/kafkatest/tests/core/kraft_upgrade_test.py                 | 2 +-
 tests/kafkatest/tests/core/quorum_reconfiguration_test.py        | 4 ++--
 5 files changed, 7 insertions(+), 13 deletions(-)

diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index cc8a0d31997..3e65efc3483 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -59,7 +59,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-consumer-group", new_consumer=True,
+    def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-consumer-group",
                  message_validator=None, from_beginning=True, 
consumer_timeout_ms=None, version=DEV_BRANCH,
                  client_id="console-consumer", print_key=False, 
jmx_object_names=None, jmx_attributes=None,
                  enable_systest_events=False, stop_timeout_sec=35, 
print_timestamp=False, print_partition=False,
@@ -72,7 +72,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
             num_nodes:                  number of nodes to use (this should be 
1)
             kafka:                      kafka service
             topic:                      consume from this topic
-            new_consumer:               use new Kafka consumer if True
             message_validator:          function which returns message or None
             from_beginning:             consume from beginning if True, else 
from the end
             consumer_timeout_ms:        corresponds to consumer.timeout.ms. 
consumer process ends if time between
@@ -96,7 +95,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
                           root=ConsoleConsumer.PERSISTENT_ROOT)
         BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
-        self.new_consumer = new_consumer
         self.group_id = group_id
         self.args = {
             'topic': topic,
@@ -145,8 +143,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
         """Return the start command appropriate for the given node."""
         args = self.args.copy()
         args['broker_list'] = 
self.kafka.bootstrap_servers(self.security_config.security_protocol)
-        if not self.new_consumer:
-            args['zk_connect'] = self.kafka.zk_connect_setting()
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
         args['log_dir'] = ConsoleConsumer.LOG_DIR
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 1d9444da204..07f5d44ef2c 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -881,11 +881,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             kafka_storage_script = self.path.script("kafka-storage.sh", node)
             cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % 
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
             if self.dynamicRaftQuorum:
-                cmd += " --feature kraft.version=1"
                 if self.node_quorum_info.has_controller_role:
                     if self.standalone_controller_bootstrapped:
                         cmd += " --no-initial-controllers"
                     else:
+                        cmd += " --feature kraft.version=1"
                         cmd += " --standalone"
                         self.standalone_controller_bootstrapped = True
             self.logger.info("Running log directory format command...\n%s" % 
cmd)
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py 
b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index e25080fa61b..ec4a878f032 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -66,9 +66,7 @@ class 
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
     @matrix(producer_version=[str(LATEST_3_8)], 
consumer_version=[str(LATEST_3_8)], compression_types=[["none"]], 
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
     @matrix(producer_version=[str(LATEST_3_9)], 
consumer_version=[str(LATEST_3_9)], compression_types=[["none"]], 
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
     @matrix(producer_version=[str(LATEST_2_1)], 
consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], 
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
-    def test_compatibility(self, producer_version, consumer_version, 
compression_types, new_consumer=True, timestamp_type=None, 
metadata_quorum=quorum.zk):
-        if not new_consumer and metadata_quorum != quorum.zk:
-            raise Exception("ZooKeeper-based consumers are not supported when 
using a KRaft metadata quorum")
+    def test_compatibility(self, producer_version, consumer_version, 
compression_types, timestamp_type=None, metadata_quorum=quorum.zk):
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, 
version=DEV_BRANCH, topics={self.topic: {
                                                                     
"partitions": 3,
                                                                     
"replication-factor": 3,
@@ -86,7 +84,7 @@ class 
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
                                            
version=KafkaVersion(producer_version))
 
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, 
new_consumer=new_consumer,
+                                        self.topic, consumer_timeout_ms=30000,
                                         message_validator=is_int, 
version=KafkaVersion(consumer_version))
 
         self.run_produce_consume_validate(lambda: wait_until(
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py 
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index b9ef1617f63..dc0dc261d1c 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -100,7 +100,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
                                            compression_types=["none"],
                                            
version=KafkaVersion(from_kafka_version))
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
-                                        self.topic, new_consumer=True, 
consumer_timeout_ms=30000,
+                                        self.topic, consumer_timeout_ms=30000,
                                         message_validator=is_int, 
version=KafkaVersion(from_kafka_version))
         self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_version_change(from_kafka_version))
         cluster_id = self.kafka.cluster_id()
diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py 
b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
index 80cf535021c..10aa1d6a792 100644
--- a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
+++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
@@ -120,7 +120,7 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
                                            message_validator=is_int, 
compression_types=["none"],
                                            version=DEV_BRANCH, 
offline_nodes=[inactive_controller])
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
-                                        self.topic, new_consumer=True, 
consumer_timeout_ms=30000,
+                                        self.topic, consumer_timeout_ms=30000,
                                         message_validator=is_int, 
version=DEV_BRANCH)
         # Perform reconfigurations
         self.run_produce_consume_validate(
@@ -161,7 +161,7 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
                                            message_validator=is_int, 
compression_types=["none"],
                                            version=DEV_BRANCH)
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
-                                        self.topic, new_consumer=True, 
consumer_timeout_ms=30000,
+                                        self.topic, consumer_timeout_ms=30000,
                                         message_validator=is_int, 
version=DEV_BRANCH)
         # Perform reconfigurations
         self.run_produce_consume_validate(

Reply via email to