This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3f74712 KAFKA-13015: Ducktape System Tests for Metadata Snapshots
(#11053)
3f74712 is described below
commit 3f747129f46af44dc078c26bb6d555ac132c21c3
Author: Niket <[email protected]>
AuthorDate: Fri Jul 23 16:28:21 2021 -0700
KAFKA-13015: Ducktape System Tests for Metadata Snapshots (#11053)
This PR implements system tests in ducktape to test the ability of brokers
and controllers to generate
and consume snapshots and catch up with the metadata log.
Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio
<[email protected]>
---
tests/kafkatest/services/kafka/config.py | 5 +
tests/kafkatest/services/kafka/config_property.py | 6 +
tests/kafkatest/services/kafka/kafka.py | 6 +-
tests/kafkatest/tests/core/snapshot_test.py | 251 ++++++++++++++++++++++
4 files changed, 267 insertions(+), 1 deletion(-)
diff --git a/tests/kafkatest/services/kafka/config.py
b/tests/kafkatest/services/kafka/config.py
index d440fcf..da5b4a2 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -24,6 +24,11 @@ class KafkaConfig(dict):
DEFAULTS = {
config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
config_property.LOG_DIRS:
"/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
+ config_property.METADATA_LOG_DIR: "/mnt/kafka/kafka-metadata-logs",
+ config_property.METADATA_LOG_SEGMENT_BYTES: str(9*1024*1024), # 9 MB
+ config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS:
str(10*1024*1024), # 10 MB
+ config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10
MB
+ config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000) # one minute
}
def __init__(self, **kwargs):
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index 42243cf..de11422 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -41,6 +41,12 @@ LOG_RETENTION_CHECK_INTERVAL_MS =
"log.retention.check.interval.ms"
LOG_RETENTION_MS = "log.retention.ms"
LOG_CLEANER_ENABLE = "log.cleaner.enable"
+METADATA_LOG_DIR = "metadata.log.dir"
+METADATA_LOG_SEGMENT_BYTES = "metadata.log.segment.bytes"
+METADATA_LOG_RETENTION_BYTES = "metadata.max.retention.bytes"
+METADATA_LOG_SEGMENT_MS = "metadata.log.segment.ms"
+METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS =
"metadata.log.max.record.bytes.between.snapshots"
+
AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
ZOOKEEPER_CONNECT = "zookeeper.connect"
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index f017d87..9901362 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -154,6 +154,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
DATA_LOG_DIR_1 = "%s-1" % (DATA_LOG_DIR_PREFIX)
DATA_LOG_DIR_2 = "%s-2" % (DATA_LOG_DIR_PREFIX)
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
+ METADATA_LOG_DIR = os.path.join (PERSISTENT_ROOT, "kafka-metadata-logs")
+ METADATA_SNAPSHOT_SEARCH_STR = "%s/__cluster_metadata-0/*.checkpoint" %
METADATA_LOG_DIR
+ METADATA_FIRST_LOG = "%s/__cluster_metadata-0/00000000000000000000.log" %
METADATA_LOG_DIR
# Kafka Authorizer
ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
@@ -308,7 +311,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
jmx_attributes=jmx_attributes,
listener_security_config=listener_security_config,
extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
- remote_kafka=self,
allow_zk_with_kraft=self.allow_zk_with_kraft
+ remote_kafka=self,
allow_zk_with_kraft=self.allow_zk_with_kraft,
+ server_prop_overrides=server_prop_overrides
)
self.controller_quorum = self.remote_controller_quorum
diff --git a/tests/kafkatest/tests/core/snapshot_test.py
b/tests/kafkatest/tests/core/snapshot_test.py
new file mode 100644
index 0000000..1218cff
--- /dev/null
+++ b/tests/kafkatest/tests/core/snapshot_test.py
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+ TOPIC_NAME_PREFIX = "test_topic_"
+
+ def __init__(self, test_context):
+ super(TestSnapshots, self).__init__(test_context=test_context)
+ self.topics_created = 0
+ self.topic = "test_topic"
+ self.partitions = 3
+ self.replication_factor = 3
+ self.num_nodes = 3
+
+ # Producer and consumer
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ security_protocol = 'PLAINTEXT'
+ # Setup Custom Config to ensure snapshot will be generated
deterministically
+ self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+ topics={self.topic: {"partitions":
self.partitions,
+ "replication-factor":
self.replication_factor,
+ 'configs':
{"min.insync.replicas": 2}}},
+ server_prop_overrides=[
+ [config_property.METADATA_LOG_DIR,
KafkaService.METADATA_LOG_DIR],
+
[config_property.METADATA_LOG_SEGMENT_MS, "10000"],
+
[config_property.METADATA_LOG_RETENTION_BYTES, "2048"],
+
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, "2048"]
+ ])
+
+ self.kafka.interbroker_security_protocol = security_protocol
+ self.kafka.security_protocol = security_protocol
+
+ def setUp(self):
+ # Start the cluster and ensure that a snapshot is generated
+ self.logger.info("Starting the cluster and running until snapshot
creation")
+
+ assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+ "Snapshot test should be run Kraft Modes only"
+
+ self.kafka.start()
+
+ topic_count = 10
+ self.topics_created += self.create_n_topics(topic_count)
+
+ if self.kafka.remote_controller_quorum:
+ self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+ else:
+ self.controller_nodes =
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+ # Waiting for snapshot creation and first log segment
+ # cleanup on all controller nodes
+ for node in self.controller_nodes:
+ self.logger.debug("Waiting for snapshot on: %s" %
self.kafka.who_am_i(node))
+ self.wait_for_log_segment_delete(node)
+ self.wait_for_snapshot(node)
+ self.logger.debug("Verified Snapshots exist on controller nodes")
+
+ def create_n_topics(self, topic_count):
+ for i in range(self.topics_created, topic_count):
+ topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+ self.logger.debug("Creating topic %s" % topic)
+ topic_cfg = {
+ "topic": topic,
+ "partitions": self.partitions,
+ "replication-factor": self.replication_factor,
+ "configs": {"min.insync.replicas": 2}
+ }
+ self.kafka.create_topic(topic_cfg)
+ self.logger.debug("Created %d more topics" % topic_count)
+ return topic_count
+
+ def wait_for_log_segment_delete(self, node):
+ file_path = self.kafka.METADATA_FIRST_LOG
+ # Wait until the first log segment in metadata log is marked for
deletion
+ wait_until(lambda: not self.file_exists(node, file_path),
+ timeout_sec=100,
+ backoff_sec=1, err_msg="Not able to verify cleanup of log
file %s in a reasonable amount of time" % file_path)
+
+ def wait_for_snapshot(self, node):
+ # Wait for a snapshot file to show up
+ file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+ wait_until(lambda: self.file_exists(node, file_path),
+ timeout_sec=100,
+ backoff_sec=1, err_msg="Not able to verify snapshot
existence in a reasonable amount of time")
+
+ def file_exists(self, node, file_path):
+ # Check if the first log segment is cleaned up
+ self.logger.debug("Checking if file %s exists" % file_path)
+ cmd = "ls %s" % file_path
+ files = node.account.ssh_output(cmd, allow_fail=True,
combine_stderr=False)
+
+ if len(files) is 0:
+ self.logger.debug("File %s does not exist" % file_path)
+ return False
+ else:
+ self.logger.debug("File %s was found" % file_path)
+ return True
+
+ def validate_success(self, topic = None):
+ if topic is None:
+ # Create a new topic
+ topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX,
self.topics_created)
+ self.topics_created += self.create_n_topics(topic_count=1)
+
+ # Produce to the newly created topic to ensure broker has caught up
+ self.producer = VerifiableProducer(self.test_context,
self.num_producers, self.kafka,
+ topic,
throughput=self.producer_throughput,
+ message_validator=is_int)
+
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
+ topic, consumer_timeout_ms=30000,
+ message_validator=is_int)
+ self.start_producer_and_consumer()
+ self.stop_producer_and_consumer()
+ self.validate()
+
+ @cluster(num_nodes=9)
+ @matrix(metadata_quorum=quorum.all_kraft)
+ def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+ """ Test the ability of a broker to consume metadata snapshots
+ and to recover the cluster metadata state using them
+
+ The test ensures that that there is atleast one snapshot created on
+ the controller quorum during the setup phase and that at least the
first
+ log segment in the metadata log has been marked for deletion, thereby
ensuring
+ that any observer of the log needs to always load a snapshot to catch
+ up to the current metadata state.
+
+ Each scenario is a progression over the previous one.
+ The scenarios build on top of each other by:
+ * Loading a snapshot
+ * Loading and snapshot and some delta records
+ * Loading a snapshot and delta and ensuring that the most recent
metadata state
+ has been caught up.
+
+ Even though a subsequent scenario covers the previous one, they are all
+ left in the test to make debugging a failure of the test easier
+ e.g. if the first scenario passes and the second fails, it hints
towards
+ a problem with the application of delta records while catching up
+ """
+
+ # Scenario -- Re-init broker after cleaning up all persistent state
+ node = random.choice(self.kafka.nodes)
+ self.logger.debug("Scenario: kill-clean-start on broker node %s",
self.kafka.who_am_i(node))
+ self.kafka.clean_node(node)
+ self.kafka.start_node(node)
+
+ # Scenario -- Re-init broker after cleaning up all persistent state
+ # Create some metadata changes for the broker to consume as well.
+ node = random.choice(self.kafka.nodes)
+ self.logger.debug("Scenario: kill-clean-create_topics-start on broker
node %s", self.kafka.who_am_i(node))
+ self.kafka.clean_node(node)
+ # Now modify the cluster to create more metadata changes
+ self.topics_created += self.create_n_topics(topic_count=10)
+ self.kafka.start_node(node)
+
+ # Scenario -- Re-init broker after cleaning up all persistent state
+ # And ensure that the broker has replicated the metadata log
+ node = random.choice(self.kafka.nodes)
+ self.logger.debug("Scenario: kill-clean-start-verify-produce on broker
node %s", self.kafka.who_am_i(node))
+ self.kafka.clean_node(node)
+ self.kafka.start_node(node)
+ # Create a topic where the affected broker must be the leader
+ broker_topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX,
self.topics_created)
+ self.topics_created += 1
+ self.logger.debug("Creating topic %s" % broker_topic)
+ topic_cfg = {
+ "topic": broker_topic,
+ "replica-assignment": self.kafka.idx(node),
+ "configs": {"min.insync.replicas": 1}
+ }
+ self.kafka.create_topic(topic_cfg)
+
+ # Produce to the newly created topic and make sure it works.
+ self.validate_success(broker_topic)
+
+ @cluster(num_nodes=9)
+ @matrix(metadata_quorum=quorum.all_kraft)
+ def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+ """ Test the ability of controllers to consume metadata snapshots
+ and to recover the cluster metadata state using them
+
+ The test ensures that that there is atleast one snapshot created on
+ the controller quorum during the setup phase and that at least the
first
+ log segment in the metadata log has been marked for deletion, thereby
ensuring
+ that any observer of the log needs to always load a snapshot to catch
+ up to the current metadata state.
+
+ Each scenario is a progression over the previous one.
+ The scenarios build on top of each other by:
+ * Loading a snapshot
+ * Loading and snapshot and some delta records
+ * Loading a snapshot and delta and ensuring that the most recent
metadata state
+ has been caught up.
+
+ Even though a subsequent scenario covers the previous one, they are all
+ left in the test to make debugging a failure of the test easier
+ e.g. if the first scenario passes and the second fails, it hints
towards
+ a problem with the application of delta records while catching up
+ """
+
+ # Scenario -- Re-init controllers with a clean kafka dir
+ self.logger.debug("Scenario: kill-clean-start controller node")
+ for node in self.controller_nodes:
+ self.logger.debug("Restarting node: %s",
self.kafka.controller_quorum.who_am_i(node))
+ self.kafka.controller_quorum.clean_node(node)
+ self.kafka.controller_quorum.start_node(node)
+
+ # Scenario -- Re-init controllers with a clean kafka dir and
+ # make metadata changes while they are down.
+ # This will force the entire quorum to load from snapshots
+ # and verify the quorum's ability to catch up to the latest metadata
+ self.logger.debug("Scenario: kill-clean-create_topics-start on
controller node %s")
+ for node in self.controller_nodes:
+ self.logger.debug("Restarting node: %s",
self.kafka.controller_quorum.who_am_i(node))
+ self.kafka.controller_quorum.clean_node(node)
+ # Now modify the cluster to create more metadata changes
+ self.topics_created += self.create_n_topics(topic_count=5)
+ self.kafka.controller_quorum.start_node(node)
+
+ # Produce to a newly created topic and make sure it works.
+ self.validate_success()