This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3f74712  KAFKA-13015: Ducktape System Tests for Metadata Snapshots 
(#11053)
3f74712 is described below

commit 3f747129f46af44dc078c26bb6d555ac132c21c3
Author: Niket <[email protected]>
AuthorDate: Fri Jul 23 16:28:21 2021 -0700

    KAFKA-13015: Ducktape System Tests for Metadata Snapshots (#11053)
    
    This PR implements system tests in ducktape to test the ability of brokers 
and controllers to generate
    and consume snapshots and catch up with the metadata log.
    
    Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio 
<[email protected]>
---
 tests/kafkatest/services/kafka/config.py          |   5 +
 tests/kafkatest/services/kafka/config_property.py |   6 +
 tests/kafkatest/services/kafka/kafka.py           |   6 +-
 tests/kafkatest/tests/core/snapshot_test.py       | 251 ++++++++++++++++++++++
 4 files changed, 267 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/kafka/config.py 
b/tests/kafkatest/services/kafka/config.py
index d440fcf..da5b4a2 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -24,6 +24,11 @@ class KafkaConfig(dict):
     DEFAULTS = {
         config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
         config_property.LOG_DIRS: 
"/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
+        config_property.METADATA_LOG_DIR: "/mnt/kafka/kafka-metadata-logs",
+        config_property.METADATA_LOG_SEGMENT_BYTES: str(9*1024*1024), # 9 MB
+        config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: 
str(10*1024*1024), # 10 MB
+        config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 
MB
+        config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000) # one minute
     }
 
     def __init__(self, **kwargs):
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index 42243cf..de11422 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -41,6 +41,12 @@ LOG_RETENTION_CHECK_INTERVAL_MS = 
"log.retention.check.interval.ms"
 LOG_RETENTION_MS = "log.retention.ms"
 LOG_CLEANER_ENABLE = "log.cleaner.enable"
 
+METADATA_LOG_DIR = "metadata.log.dir"
+METADATA_LOG_SEGMENT_BYTES = "metadata.log.segment.bytes"
+METADATA_LOG_RETENTION_BYTES = "metadata.max.retention.bytes"
+METADATA_LOG_SEGMENT_MS = "metadata.log.segment.ms"
+METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = 
"metadata.log.max.record.bytes.between.snapshots"
+
 AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
 
 ZOOKEEPER_CONNECT = "zookeeper.connect"
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index f017d87..9901362 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -154,6 +154,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     DATA_LOG_DIR_1 = "%s-1" % (DATA_LOG_DIR_PREFIX)
     DATA_LOG_DIR_2 = "%s-2" % (DATA_LOG_DIR_PREFIX)
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
+    METADATA_LOG_DIR = os.path.join (PERSISTENT_ROOT, "kafka-metadata-logs")
+    METADATA_SNAPSHOT_SEARCH_STR = "%s/__cluster_metadata-0/*.checkpoint" % 
METADATA_LOG_DIR
+    METADATA_FIRST_LOG = "%s/__cluster_metadata-0/00000000000000000000.log" % 
METADATA_LOG_DIR
     # Kafka Authorizer
     ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
@@ -308,7 +311,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                     jmx_attributes=jmx_attributes,
                     listener_security_config=listener_security_config,
                     extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
-                    remote_kafka=self, 
allow_zk_with_kraft=self.allow_zk_with_kraft
+                    remote_kafka=self, 
allow_zk_with_kraft=self.allow_zk_with_kraft,
+                    server_prop_overrides=server_prop_overrides
                 )
                 self.controller_quorum = self.remote_controller_quorum
 
diff --git a/tests/kafkatest/tests/core/snapshot_test.py 
b/tests/kafkatest/tests/core/snapshot_test.py
new file mode 100644
index 0000000..1218cff
--- /dev/null
+++ b/tests/kafkatest/tests/core/snapshot_test.py
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    TOPIC_NAME_PREFIX = "test_topic_"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, "10000"],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, "2048"],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, "2048"]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            self.controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in self.controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+            self.logger.debug("Creating topic %s" % topic)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: not self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists" % file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist" % file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found" % file_path)
+            return True
+
+    def validate_success(self, topic = None):
+        if topic is None:
+            # Create a new topic
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+            self.topics_created += self.create_n_topics(topic_count=1)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        """ Test the ability of a broker to consume metadata snapshots
+        and to recover the cluster metadata state using them
+
+        The test ensures that that there is atleast one snapshot created on
+        the controller quorum during the setup phase and that at least the 
first
+        log segment in the metadata log has been marked for deletion, thereby 
ensuring
+        that any observer of the log needs to always load a snapshot to catch
+        up to the current metadata state.
+
+        Each scenario is a progression over the previous one.
+        The scenarios build on top of each other by:
+        * Loading a snapshot
+        * Loading and snapshot and some delta records
+        * Loading a snapshot and delta and ensuring that the most recent 
metadata state
+          has been caught up.
+
+        Even though a subsequent scenario covers the previous one, they are all
+        left in the test to make debugging a failure of the test easier
+        e.g. if the first scenario passes and the second fails, it hints 
towards
+        a problem with the application of delta records while catching up
+        """
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario: kill-clean-start on broker node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # Create some metadata changes for the broker to consume as well.
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario: kill-clean-create_topics-start on broker 
node %s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # And ensure that the broker has replicated the metadata log
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario: kill-clean-start-verify-produce on broker 
node %s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+        self.topics_created += 1
+        self.logger.debug("Creating topic %s" % broker_topic)
+        topic_cfg = {
+            "topic": broker_topic,
+            "replica-assignment": self.kafka.idx(node),
+            "configs": {"min.insync.replicas": 1}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        # Produce to the newly created topic and make sure it works.
+        self.validate_success(broker_topic)
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+        """ Test the ability of controllers to consume metadata snapshots
+        and to recover the cluster metadata state using them
+
+        The test ensures that that there is atleast one snapshot created on
+        the controller quorum during the setup phase and that at least the 
first
+        log segment in the metadata log has been marked for deletion, thereby 
ensuring
+        that any observer of the log needs to always load a snapshot to catch
+        up to the current metadata state.
+
+        Each scenario is a progression over the previous one.
+        The scenarios build on top of each other by:
+        * Loading a snapshot
+        * Loading and snapshot and some delta records
+        * Loading a snapshot and delta and ensuring that the most recent 
metadata state
+          has been caught up.
+
+        Even though a subsequent scenario covers the previous one, they are all
+        left in the test to make debugging a failure of the test easier
+        e.g. if the first scenario passes and the second fails, it hints 
towards
+        a problem with the application of delta records while catching up
+        """
+
+        # Scenario -- Re-init controllers with a clean kafka dir
+        self.logger.debug("Scenario: kill-clean-start controller node")
+        for node in self.controller_nodes:
+            self.logger.debug("Restarting node: %s", 
self.kafka.controller_quorum.who_am_i(node))
+            self.kafka.controller_quorum.clean_node(node)
+            self.kafka.controller_quorum.start_node(node)
+
+        # Scenario -- Re-init controllers with a clean kafka dir and
+        # make metadata changes while they are down.
+        # This will force the entire quorum to load from snapshots
+        # and verify the quorum's ability to catch up to the latest metadata
+        self.logger.debug("Scenario: kill-clean-create_topics-start on 
controller node %s")
+        for node in self.controller_nodes:
+            self.logger.debug("Restarting node: %s", 
self.kafka.controller_quorum.who_am_i(node))
+            self.kafka.controller_quorum.clean_node(node)
+            # Now modify the cluster to create more metadata changes
+            self.topics_created += self.create_n_topics(topic_count=5)
+            self.kafka.controller_quorum.start_node(node)
+
+        # Produce to a newly created topic and make sure it works.
+        self.validate_success()

Reply via email to