This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0322764ab27 KAFKA-17460 Remove downgrade_test.py (#18038)
0322764ab27 is described below
commit 0322764ab279cb4c6761343c6917339e6d20f2e9
Author: mingdaoy <[email protected]>
AuthorDate: Thu Dec 5 07:07:01 2024 +0800
KAFKA-17460 Remove downgrade_test.py (#18038)
Reviewers: Chia-Ping Tsai <[email protected]>
---
tests/kafkatest/tests/core/downgrade_test.py | 170 ---------------------------
1 file changed, 170 deletions(-)
diff --git a/tests/kafkatest/tests/core/downgrade_test.py
b/tests/kafkatest/tests/core/downgrade_test.py
deleted file mode 100644
index 44134b05a8f..00000000000
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ /dev/null
@@ -1,170 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize, matrix
-from ducktape.mark.resource import cluster
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.kafka import config_property
-from kafkatest.tests.end_to_end import EndToEndTest
-from kafkatest.version import LATEST_2_4, LATEST_2_5, \
- LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2,
LATEST_3_3, LATEST_3_4, LATEST_3_5, \
- LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion
-
-class TestDowngrade(EndToEndTest):
- PARTITIONS = 3
- REPLICATION_FACTOR = 3
-
- TOPIC_CONFIG = {
- "partitions": PARTITIONS,
- "replication-factor": REPLICATION_FACTOR,
- "configs": {"min.insync.replicas": 2}
- }
-
- def __init__(self, test_context):
- super(TestDowngrade, self).__init__(test_context=test_context,
topic_config=self.TOPIC_CONFIG)
-
- def upgrade_from(self, kafka_version):
- for node in self.kafka.nodes:
- self.kafka.stop_node(node)
- node.version = DEV_BRANCH
- node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] =
str(kafka_version)
- node.config[config_property.MESSAGE_FORMAT_VERSION] =
str(kafka_version)
- self.kafka.start_node(node)
- self.wait_until_rejoin()
-
- def downgrade_to(self, kafka_version):
- for node in self.kafka.nodes:
- self.kafka.stop_node(node)
- node.version = kafka_version
- del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
- del node.config[config_property.MESSAGE_FORMAT_VERSION]
- self.kafka.start_node(node)
- self.wait_until_rejoin()
-
- def setup_services(self, kafka_version, compression_types,
security_protocol, static_membership):
- self.create_zookeeper_if_necessary()
- self.zk.start()
-
- self.create_kafka(num_nodes=3,
- security_protocol=security_protocol,
- interbroker_security_protocol=security_protocol,
- version=kafka_version)
- self.kafka.start()
-
- self.create_producer(log_level="DEBUG",
- compression_types=compression_types,
- version=kafka_version)
- self.producer.start()
-
- self.create_consumer(log_level="DEBUG",
- version=kafka_version,
- static_membership=static_membership)
-
- self.consumer.start()
-
- def wait_until_rejoin(self):
- for partition in range(0, self.PARTITIONS):
- wait_until(lambda: len(self.kafka.isr_idx_list(self.topic,
partition)) == self.REPLICATION_FACTOR,
- timeout_sec=60, backoff_sec=1, err_msg="Replicas did not
rejoin the ISR in a reasonable amount of time")
-
- @cluster(num_nodes=7)
- @parametrize(version=str(LATEST_3_9), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_9), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_9)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_8), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_8), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_8)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_7), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_7), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_7)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_6), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_6), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_6)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_5), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_5), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_5)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_4), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_4), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_4)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_3), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_3), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_3)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_2), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_2), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_2)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_1), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_1), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_1)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_3_0), compression_types=["snappy"])
- @parametrize(version=str(LATEST_3_0), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_3_0)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_2_8), compression_types=["snappy"])
- @parametrize(version=str(LATEST_2_8), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_2_8)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_2_7), compression_types=["lz4"])
- @parametrize(version=str(LATEST_2_7), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_2_7)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_2_6), compression_types=["lz4"])
- @parametrize(version=str(LATEST_2_6), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @matrix(version=[str(LATEST_2_6)], compression_types=[["none"]],
static_membership=[False, True])
- @matrix(version=[str(LATEST_2_5)], compression_types=[["none"]],
static_membership=[False, True])
- @parametrize(version=str(LATEST_2_5), compression_types=["zstd"],
security_protocol="SASL_SSL")
- # static membership was introduced with a buggy verifiable console
consumer which
- # required static membership to be enabled
- @parametrize(version=str(LATEST_2_4), compression_types=["none"],
static_membership=True)
- @parametrize(version=str(LATEST_2_4), compression_types=["zstd"],
security_protocol="SASL_SSL", static_membership=True)
- def test_upgrade_and_downgrade(self, version, compression_types,
security_protocol="PLAINTEXT",
- static_membership=False):
- """Test upgrade and downgrade of Kafka cluster from old versions to
the current version
-
- `version` is the Kafka version to upgrade from and downgrade back to
-
- Downgrades are supported to any version which is at or above the
current
- `inter.broker.protocol.version` (IBP). For example, if a user upgrades
from 1.1 to 2.3,
- but they leave the IBP set to 1.1, then downgrading to any version at
1.1 or higher is
- supported.
-
- This test case verifies that producers and consumers continue working
during
- the course of an upgrade and downgrade.
-
- - Start 3 node broker cluster on version 'kafka_version'
- - Start producer and consumer in the background
- - Roll the cluster to upgrade to the current version with IBP set to
'kafka_version'
- - Roll the cluster to downgrade back to 'kafka_version'
- - Finally, validate that every message acked by the producer was
consumed by the consumer
- """
- kafka_version = KafkaVersion(version)
-
- self.setup_services(kafka_version, compression_types,
security_protocol, static_membership)
- self.await_startup()
-
- start_topic_id = self.kafka.topic_id(self.topic)
-
- self.logger.info("First pass bounce - rolling upgrade")
- self.upgrade_from(kafka_version)
- self.await_consumed_records(min_records=5000)
-
- upgrade_topic_id = self.kafka.topic_id(self.topic)
- assert start_topic_id == upgrade_topic_id
-
- self.logger.info("Second pass bounce - rolling downgrade")
- num_records_acked = self.producer.num_acked
- self.downgrade_to(kafka_version)
- self.run_validation(min_records=num_records_acked+5000)
-
- downgrade_topic_id = self.kafka.topic_id(self.topic)
- assert upgrade_topic_id == downgrade_topic_id
- assert self.kafka.check_protocol_errors(self)