This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed9cb08dfe9 KAFKA-17977 Remove new_consumer from E2E (#17798)
ed9cb08dfe9 is described below
commit ed9cb08dfe9178d7427f354fb0b87a964233cd22
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Nov 15 16:26:26 2024 +0800
KAFKA-17977 Remove new_consumer from E2E (#17798)
Reviewers: Chia-Ping Tsai <[email protected]>
---
tests/kafkatest/services/console_consumer.py | 6 +-----
tests/kafkatest/services/kafka/kafka.py | 2 +-
tests/kafkatest/tests/core/compatibility_test_new_broker_test.py | 6 ++----
tests/kafkatest/tests/core/kraft_upgrade_test.py | 2 +-
tests/kafkatest/tests/core/quorum_reconfiguration_test.py | 4 ++--
5 files changed, 7 insertions(+), 13 deletions(-)
diff --git a/tests/kafkatest/services/console_consumer.py
b/tests/kafkatest/services/console_consumer.py
index cc8a0d31997..3e65efc3483 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -59,7 +59,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
"collect_default": False}
}
- def __init__(self, context, num_nodes, kafka, topic,
group_id="test-consumer-group", new_consumer=True,
+ def __init__(self, context, num_nodes, kafka, topic,
group_id="test-consumer-group",
message_validator=None, from_beginning=True,
consumer_timeout_ms=None, version=DEV_BRANCH,
client_id="console-consumer", print_key=False,
jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=35,
print_timestamp=False, print_partition=False,
@@ -72,7 +72,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
num_nodes: number of nodes to use (this should be
1)
kafka: kafka service
topic: consume from this topic
- new_consumer: use new Kafka consumer if True
message_validator: function which returns message or None
from_beginning: consume from beginning if True, else
from the end
consumer_timeout_ms: corresponds to consumer.timeout.ms.
consumer process ends if time between
@@ -96,7 +95,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
root=ConsoleConsumer.PERSISTENT_ROOT)
BackgroundThreadService.__init__(self, context, num_nodes)
self.kafka = kafka
- self.new_consumer = new_consumer
self.group_id = group_id
self.args = {
'topic': topic,
@@ -145,8 +143,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
"""Return the start command appropriate for the given node."""
args = self.args.copy()
args['broker_list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
- if not self.new_consumer:
- args['zk_connect'] = self.kafka.zk_connect_setting()
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
args['log_dir'] = ConsoleConsumer.LOG_DIR
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 1d9444da204..07f5d44ef2c 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -881,11 +881,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" %
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.dynamicRaftQuorum:
- cmd += " --feature kraft.version=1"
if self.node_quorum_info.has_controller_role:
if self.standalone_controller_bootstrapped:
cmd += " --no-initial-controllers"
else:
+ cmd += " --feature kraft.version=1"
cmd += " --standalone"
self.standalone_controller_bootstrapped = True
self.logger.info("Running log directory format command...\n%s" %
cmd)
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index e25080fa61b..ec4a878f032 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -66,9 +66,7 @@ class
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_3_8)],
consumer_version=[str(LATEST_3_8)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_9)],
consumer_version=[str(LATEST_3_9)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)],
consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- def test_compatibility(self, producer_version, consumer_version,
compression_types, new_consumer=True, timestamp_type=None,
metadata_quorum=quorum.zk):
- if not new_consumer and metadata_quorum != quorum.zk:
- raise Exception("ZooKeeper-based consumers are not supported when
using a KRaft metadata quorum")
+ def test_compatibility(self, producer_version, consumer_version,
compression_types, timestamp_type=None, metadata_quorum=quorum.zk):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
version=DEV_BRANCH, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
@@ -86,7 +84,7 @@ class
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
version=KafkaVersion(producer_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
- self.topic, consumer_timeout_ms=30000,
new_consumer=new_consumer,
+ self.topic, consumer_timeout_ms=30000,
message_validator=is_int,
version=KafkaVersion(consumer_version))
self.run_produce_consume_validate(lambda: wait_until(
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index b9ef1617f63..dc0dc261d1c 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -100,7 +100,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
compression_types=["none"],
version=KafkaVersion(from_kafka_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
- self.topic, new_consumer=True,
consumer_timeout_ms=30000,
+ self.topic, consumer_timeout_ms=30000,
message_validator=is_int,
version=KafkaVersion(from_kafka_version))
self.run_produce_consume_validate(core_test_action=lambda:
self.perform_version_change(from_kafka_version))
cluster_id = self.kafka.cluster_id()
diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
index 80cf535021c..10aa1d6a792 100644
--- a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
+++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
@@ -120,7 +120,7 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
message_validator=is_int,
compression_types=["none"],
version=DEV_BRANCH,
offline_nodes=[inactive_controller])
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
- self.topic, new_consumer=True,
consumer_timeout_ms=30000,
+ self.topic, consumer_timeout_ms=30000,
message_validator=is_int,
version=DEV_BRANCH)
# Perform reconfigurations
self.run_produce_consume_validate(
@@ -161,7 +161,7 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
message_validator=is_int,
compression_types=["none"],
version=DEV_BRANCH)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
- self.topic, new_consumer=True,
consumer_timeout_ms=30000,
+ self.topic, consumer_timeout_ms=30000,
message_validator=is_int,
version=DEV_BRANCH)
# Perform reconfigurations
self.run_produce_consume_validate(