This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e18237504f9 IGNITE-26372 [ducktests] Add service for Kafka (#12314)
e18237504f9 is described below

commit e18237504f92b79284665a1911364c679479e6ba
Author: Sergey Korotkov <[email protected]>
AuthorDate: Wed Sep 3 16:57:18 2025 +0700

    IGNITE-26372 [ducktests] Add service for Kafka (#12314)
---
 modules/ducktests/tests/docker/Dockerfile          |  10 +
 .../ducktests/tests/docker/requirements-dev.txt    |   2 +-
 modules/ducktests/tests/docker/requirements.txt    |   1 +
 .../services/kafka/__init__.py}                    |   7 +-
 .../tests/ignitetest/services/kafka/kafka.py       | 232 +++++++++++++++++++++
 .../services/kafka/templates/log4j.properties.j2   |  33 +++
 .../services/kafka/templates/server.properties.j2  |  22 ++
 .../tests/ignitetest/services/utils/ignite_spec.py |   2 +-
 .../tests/ignitetest/services/zk/zookeeper.py      |   2 +-
 .../ducktests/tests/ignitetest/tests/smoke_test.py |  18 ++
 .../tests/ignitetest/utils/ignite_test.py          |   2 +-
 .../ducktests/tests/ignitetest/utils/version.py    |   2 +-
 12 files changed, 324 insertions(+), 9 deletions(-)

diff --git a/modules/ducktests/tests/docker/Dockerfile 
b/modules/ducktests/tests/docker/Dockerfile
index 1a6b87e60b5..e6880e37115 100644
--- a/modules/ducktests/tests/docker/Dockerfile
+++ b/modules/ducktests/tests/docker/Dockerfile
@@ -69,6 +69,16 @@ RUN cd /opt && curl -O 
$APACHE_ARCHIVE/zookeeper/$ZOOKEEPER_NAME/$ZOOKEEPER_RELE
  && tar xvf $ZOOKEEPER_RELEASE_ARTIFACT && rm $ZOOKEEPER_RELEASE_ARTIFACT
 RUN mv /opt/$ZOOKEEPER_RELEASE_NAME /opt/$ZOOKEEPER_NAME
 
+#Install kafka
+ARG KAFKA_VERSION="3.9.1"
+ARG KAFKA_NAME="kafka"
+ARG KAFKA_RELEASE_NAME="${KAFKA_NAME}_2.13-$KAFKA_VERSION"
+ARG KAFKA_RELEASE_ARTIFACT="$KAFKA_RELEASE_NAME.tgz"
+RUN echo $APACHE_ARCHIVE/kafka/$KAFKA_VERSION/$KAFKA_RELEASE_ARTIFACT
+RUN cd /opt && curl -O 
$APACHE_ARCHIVE/kafka/$KAFKA_VERSION/$KAFKA_RELEASE_ARTIFACT \
+ && tar xvf $KAFKA_RELEASE_ARTIFACT && rm $KAFKA_RELEASE_ARTIFACT
+RUN mv /opt/$KAFKA_RELEASE_NAME /opt/$KAFKA_NAME-$KAFKA_VERSION
+
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sh
 ARG KIBOSH_VERSION="8841dd392e6fbf02986e2fb1f1ebf04df344b65a"
diff --git a/modules/ducktests/tests/docker/requirements-dev.txt 
b/modules/ducktests/tests/docker/requirements-dev.txt
index 08626a4b106..f868e43d674 100644
--- a/modules/ducktests/tests/docker/requirements-dev.txt
+++ b/modules/ducktests/tests/docker/requirements-dev.txt
@@ -15,4 +15,4 @@
 
 -r requirements.txt
 pytest==6.2.5
-flake8==3.8.3
+flake8==6.1.0
diff --git a/modules/ducktests/tests/docker/requirements.txt 
b/modules/ducktests/tests/docker/requirements.txt
index e07096d488a..aba4f25a86d 100644
--- a/modules/ducktests/tests/docker/requirements.txt
+++ b/modules/ducktests/tests/docker/requirements.txt
@@ -15,3 +15,4 @@
 
 filelock==3.8.2
 ducktape==0.13.0
+looseversion==1.3.0
diff --git a/modules/ducktests/tests/docker/requirements-dev.txt 
b/modules/ducktests/tests/ignitetest/services/kafka/__init__.py
similarity index 91%
copy from modules/ducktests/tests/docker/requirements-dev.txt
copy to modules/ducktests/tests/ignitetest/services/kafka/__init__.py
index 08626a4b106..f33fab7169f 100644
--- a/modules/ducktests/tests/docker/requirements-dev.txt
+++ b/modules/ducktests/tests/ignitetest/services/kafka/__init__.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
--r requirements.txt
-pytest==6.2.5
-flake8==3.8.3
+"""
+This module contains classes and utilities to start Kafka cluster.
+"""
diff --git a/modules/ducktests/tests/ignitetest/services/kafka/kafka.py 
b/modules/ducktests/tests/ignitetest/services/kafka/kafka.py
new file mode 100644
index 00000000000..b4d43887dac
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/kafka/kafka.py
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from looseversion import LooseVersion
+from typing import NamedTuple
+
+from ducktape.utils.util import wait_until
+
+from ignitetest.services.utils.ducktests_service import DucktestsService
+from ignitetest.services.utils.log_utils import monitor_log
+from ignitetest.services.utils.path import PathAware
+
+
+class KafkaSettings:
+    """
+    Settings for kafka nodes.
+    """
+    def __init__(self, **kwargs):
+        self.zookeeper_connection_string = 
kwargs.get("zookeeper_connection_string")
+        self.port = kwargs.get("port", 9092)
+
+        self.host = None
+        self.broker_id = None
+
+        version = kwargs.get("version")
+        if version:
+            if isinstance(version, str):
+                version = LooseVersion(version)
+
+            self.version = version
+        else:
+            self.version = LooseVersion("3.9.1")
+
+
+class KafkaService(DucktestsService, PathAware):
+    """
+    Kafka service.
+    """
+    LOG_FILENAME = "kafka.log"
+
+    def __init__(self, context, num_nodes, settings: KafkaSettings, 
start_timeout_sec=60):
+        super().__init__(context, num_nodes)
+        self.settings = settings
+        self.start_timeout_sec = start_timeout_sec
+        self.init_logs_attribute()
+
+    @property
+    def product(self):
+        return "%s-%s" % ("kafka", self.settings.version)
+
+    @property
+    def globals(self):
+        return self.context.globals
+
+    @property
+    def log_config_file(self):
+        return os.path.join(self.config_dir, "log4j.properties")
+
+    @property
+    def config_file(self):
+        return os.path.join(self.config_dir, "server.properties")
+
+    def start(self, **kwargs):
+        super().start(**kwargs)
+        self.logger.info("Waiting for Kafka ...")
+
+        for node in self.nodes:
+            self.await_kafka(node, self.start_timeout_sec)
+
+        self.logger.info("Kafka cluster is formed.")
+
+    def start_node(self, node, **kwargs):
+        idx = self.idx(node)
+
+        self.logger.info("Starting Kafka broker %d on %s", idx, 
node.account.hostname)
+
+        self.init_persistent(node)
+
+        self.settings.host = node.account.externally_routable_ip
+        self.settings.broker_id = idx
+
+        config_file = self.render('server.properties.j2', 
settings=self.settings, data_dir=self.work_dir)
+        node.account.create_file(self.config_file, config_file)
+        self.logger.info("Kafka config %s", config_file)
+
+        log_config_file = self.render('log4j.properties.j2', 
log_dir=self.log_dir)
+        node.account.create_file(self.log_config_file, log_config_file)
+
+        start_cmd = f"nohup java 
-Dlog4j.configuration=file:{self.log_config_file} " \
+            f"-cp {os.path.join(self.home_dir, 'libs')}/*:{self.config_dir} " \
+            f"kafka.Kafka {self.config_file} >/tmp/log 2>&1 &"
+
+        node.account.ssh(start_cmd)
+
+    def wait_node(self, node, timeout_sec=20):
+        wait_until(lambda: not self.alive(node), timeout_sec=timeout_sec)
+
+        return not self.alive(node)
+
+    def await_kafka(self, node, timeout):
+        """
+        Await kafka broker started on node.
+        :param node: Kafka service node.
+        :param timeout: Wait timeout.
+        """
+        with monitor_log(node, self.log_file, from_the_beginning=True) as 
monitor:
+            monitor.wait_until("KafkaServer.*started",
+                               timeout_sec=timeout,
+                               err_msg=f"Kafka cluster was not formed on 
{node.account.hostname}")
+
+    def create_topic(self, name, partitions=1, replication_factor=1, 
retention_ms=None):
+        """
+        Create kafka topic
+        :param name: Topic name
+        :param partitions: Number of partitions
+        :param replication_factor: Replication factor
+        :param retention_ms: Retention in milliseconds
+        """
+        create_topic_cmd = f"{os.path.join(self.home_dir, 'bin', 
'kafka-topics.sh')} --create " \
+            f"--topic {name} --bootstrap-server {self.connection_string()} " \
+            f"--partitions {partitions} --replication-factor 
{replication_factor}"
+
+        if retention_ms is not None:
+            create_topic_cmd = create_topic_cmd + f" --config 
retention.ms={retention_ms}"
+
+        self.nodes[0].account.ssh(create_topic_cmd)
+
+    def offsets(self, topics=None):
+        """
+        Return offset info for all consumer groups and partitions.
+        :param topics: List of topics to process. Return info about all topics 
if None.
+        """
+        kafka_consumer_groups_cmd =\
+            f"{os.path.join(self.home_dir, 'bin', 'kafka-consumer-groups.sh')} 
--describe " \
+            f"--all-groups --bootstrap-server {self.connection_string()} 
2>/dev/null |" \
+            f"grep . | grep -v GROUP | sed -E 's/  */ /g'"
+
+        def callback(line):
+            fields = line.strip().split(' ')
+
+            return ConsumerOffsetInfo(
+                group=fields[0],
+                topic=fields[1],
+                part=int(fields[2]),
+                current_offset=int(fields[3]),
+                log_end_offset=int(fields[4]),
+                lag=int(fields[5]),
+                consumer_id=fields[6] if fields[6] != '-' else None,
+                host=fields[7] if fields[7] != '-' else None,
+                client_id=fields[8] if fields[8] != '-' else None
+            )
+
+        offsets = self.nodes[0].account.ssh_capture(kafka_consumer_groups_cmd, 
callback=callback)
+
+        return [o for o in offsets if not topics or (o.topic in topics)]
+
+    @property
+    def log_file(self):
+        """
+        :return: current log file of node.
+        """
+        return os.path.join(self.log_dir, self.LOG_FILENAME)
+
+    @staticmethod
+    def java_class_name():
+        """ The class name of the Kafka broker. """
+        return "kafka.Kafka"
+
+    def pids(self, node):
+        """
+        Get pids of kafka service node.
+        :param node: Kafka service node.
+        :return: List of pids.
+        """
+        return node.account.java_pids(self.java_class_name())
+
+    def alive(self, node):
+        """
+        Check if kafka service node is alive.
+        :param node: Kafka service node.
+        :return: True if node is alive
+        """
+        return len(self.pids(node)) > 0
+
+    def connection_string(self):
+        """
+        Form a connection string to kafka cluster.
+        :return: Connection string.
+        """
+        return ','.join([node.account.hostname + ":" + str(self.settings.port) 
for node in self.nodes])
+
+    def stop_node(self, node, force_stop=True, **kwargs):
+        idx = self.idx(node)
+        self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, 
idx, node.account.hostname))
+
+        allow_fail = kwargs["allow_fail"] if "allow_fail" in kwargs else False
+
+        node.account.kill_process("kafka", clean_shutdown=not force_stop, 
allow_fail=allow_fail)
+
+    def clean_node(self, node, **kwargs):
+        super().clean_node(node, **kwargs)
+
+        self.logger.info("Cleaning Kafka node %d on %s", self.idx(node), 
node.account.hostname)
+        node.account.ssh(f"rm -rf -- {self.persistent_root}", allow_fail=False)
+
+
+class ConsumerOffsetInfo(NamedTuple):
+    """
+    Consumer offset record.
+    """
+    group: str
+    topic: str
+    part: int
+    current_offset: int
+    log_end_offset: int
+    lag: int
+    consumer_id: str
+    host: str
+    client_id: str
diff --git 
a/modules/ducktests/tests/ignitetest/services/kafka/templates/log4j.properties.j2
 
b/modules/ducktests/tests/ignitetest/services/kafka/templates/log4j.properties.j2
new file mode 100644
index 00000000000..8517212f116
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/kafka/templates/log4j.properties.j2
@@ -0,0 +1,33 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+kafka.root.logger=INFO, FILE
+
+kafka.log.dir={{ log_dir }}
+kafka.log.file={{ LOG_FILENAME }}
+kafka.log.threshold=INFO
+kafka.log.maxfilesize=256MB
+kafka.log.maxbackupindex=20
+
+log4j.rootLogger=${kafka.root.logger}
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.Threshold=${kafka.log.threshold}
+log4j.appender.FILE.File=${kafka.log.dir}/${kafka.log.file}
+log4j.appender.FILE.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.ConversionPattern=[%d] %p %m (%c)%n
diff --git 
a/modules/ducktests/tests/ignitetest/services/kafka/templates/server.properties.j2
 
b/modules/ducktests/tests/ignitetest/services/kafka/templates/server.properties.j2
new file mode 100644
index 00000000000..18ae089ad47
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/kafka/templates/server.properties.j2
@@ -0,0 +1,22 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+broker.id={{ settings.broker_id }}
+listeners=PLAINTEXT://{{ settings.host }}:{{ settings.port }}
+zookeeper.connect={{ settings.zookeeper_connection_string }}
+offsets.topic.replication.factor=1
+log.dirs={{ data_dir }}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 223757b081d..87fe8b6732d 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -33,7 +33,7 @@ from ignitetest.services.utils.config_template import 
IgniteClientConfigTemplate
 from ignitetest.services.utils.jvm_utils import create_jvm_settings, 
merge_jvm_settings
 from ignitetest.services.utils.path import get_home_dir, IgnitePathAware
 from ignitetest.services.utils.ssl.ssl_params import is_ssl_enabled
-from ignitetest.services.utils.metrics.metrics import 
is_opencensus_metrics_enabled, configure_opencensus_metrics,\
+from ignitetest.services.utils.metrics.metrics import 
is_opencensus_metrics_enabled, configure_opencensus_metrics, \
     is_jmx_metrics_enabled, configure_jmx_metrics
 from ignitetest.services.utils.jmx_remote.jmx_remote_params import 
get_jmx_remote_params
 from ignitetest.utils.ignite_test import JFR_ENABLED, SAFEPOINT_LOGS_ENABLED
diff --git a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py 
b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
index fc6e03caba4..b3a8b689ef6 100644
--- a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
+++ b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
@@ -18,7 +18,7 @@ This module contains classes and utilities to start zookeeper 
cluster for testin
 """
 
 import os.path
-from distutils.version import LooseVersion
+from looseversion import LooseVersion
 
 from ducktape.utils.util import wait_until
 
diff --git a/modules/ducktests/tests/ignitetest/tests/smoke_test.py 
b/modules/ducktests/tests/ignitetest/tests/smoke_test.py
index 7ace6ac9228..7e0be2e3f75 100644
--- a/modules/ducktests/tests/ignitetest/tests/smoke_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/smoke_test.py
@@ -19,6 +19,7 @@ This module contains smoke tests that checks that services 
work
 
 from ignitetest.services.ignite import IgniteService
 from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.kafka.kafka import KafkaService, KafkaSettings
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
 from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
 from ignitetest.services.zk.zookeeper import ZookeeperService
@@ -74,3 +75,20 @@ class SmokeServicesTest(IgniteTest):
         zookeeper = ZookeeperService(self.test_context, num_nodes=3)
         zookeeper.start()
         zookeeper.stop()
+
+    @cluster(num_nodes=6)
+    def test_kafka_start_stop(self):
+        """
+        Test that KafkaService correctly start and stop
+        """
+        zookeeper = ZookeeperService(self.test_context, num_nodes=3)
+        zookeeper.start()
+
+        kafka_settings = 
KafkaSettings(zookeeper_connection_string=zookeeper.connection_string())
+        kafka = KafkaService(self.test_context, num_nodes=3, 
settings=kafka_settings)
+        kafka.start()
+
+        kafka.create_topic("topic")
+
+        kafka.stop()
+        zookeeper.stop()
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py 
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index 67b205f0006..1feda7859ea 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -61,7 +61,7 @@ class IgniteTest(Test):
     Basic ignite test.
     """
     def __init__(self, test_context):
-        assert isinstance(test_context, IgniteTestContext),\
+        assert isinstance(test_context, IgniteTestContext), \
             "any IgniteTest MUST BE decorated with the 
@ignitetest.utils.cluster decorator"
 
         super().__init__(test_context=test_context)
diff --git a/modules/ducktests/tests/ignitetest/utils/version.py 
b/modules/ducktests/tests/ignitetest/utils/version.py
index 929d53e3729..781ef33aa3e 100644
--- a/modules/ducktests/tests/ignitetest/utils/version.py
+++ b/modules/ducktests/tests/ignitetest/utils/version.py
@@ -17,7 +17,7 @@
 Module contains ignite version utility class.
 """
 import re
-from distutils.version import LooseVersion
+from looseversion import LooseVersion
 
 from ignitetest import __version__
 

Reply via email to