This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e18237504f9 IGNITE-26372 [ducktests] Add service for Kafka (#12314)
e18237504f9 is described below
commit e18237504f92b79284665a1911364c679479e6ba
Author: Sergey Korotkov <[email protected]>
AuthorDate: Wed Sep 3 16:57:18 2025 +0700
IGNITE-26372 [ducktests] Add service for Kafka (#12314)
---
modules/ducktests/tests/docker/Dockerfile | 10 +
.../ducktests/tests/docker/requirements-dev.txt | 2 +-
modules/ducktests/tests/docker/requirements.txt | 1 +
.../services/kafka/__init__.py} | 7 +-
.../tests/ignitetest/services/kafka/kafka.py | 232 +++++++++++++++++++++
.../services/kafka/templates/log4j.properties.j2 | 33 +++
.../services/kafka/templates/server.properties.j2 | 22 ++
.../tests/ignitetest/services/utils/ignite_spec.py | 2 +-
.../tests/ignitetest/services/zk/zookeeper.py | 2 +-
.../ducktests/tests/ignitetest/tests/smoke_test.py | 18 ++
.../tests/ignitetest/utils/ignite_test.py | 2 +-
.../ducktests/tests/ignitetest/utils/version.py | 2 +-
12 files changed, 324 insertions(+), 9 deletions(-)
diff --git a/modules/ducktests/tests/docker/Dockerfile
b/modules/ducktests/tests/docker/Dockerfile
index 1a6b87e60b5..e6880e37115 100644
--- a/modules/ducktests/tests/docker/Dockerfile
+++ b/modules/ducktests/tests/docker/Dockerfile
@@ -69,6 +69,16 @@ RUN cd /opt && curl -O
$APACHE_ARCHIVE/zookeeper/$ZOOKEEPER_NAME/$ZOOKEEPER_RELE
&& tar xvf $ZOOKEEPER_RELEASE_ARTIFACT && rm $ZOOKEEPER_RELEASE_ARTIFACT
RUN mv /opt/$ZOOKEEPER_RELEASE_NAME /opt/$ZOOKEEPER_NAME
+#Install kafka
+ARG KAFKA_VERSION="3.9.1"
+ARG KAFKA_NAME="kafka"
+ARG KAFKA_RELEASE_NAME="${KAFKA_NAME}_2.13-$KAFKA_VERSION"
+ARG KAFKA_RELEASE_ARTIFACT="$KAFKA_RELEASE_NAME.tgz"
+RUN echo $APACHE_ARCHIVE/kafka/$KAFKA_VERSION/$KAFKA_RELEASE_ARTIFACT
+RUN cd /opt && curl -O
$APACHE_ARCHIVE/kafka/$KAFKA_VERSION/$KAFKA_RELEASE_ARTIFACT \
+ && tar xvf $KAFKA_RELEASE_ARTIFACT && rm $KAFKA_RELEASE_ARTIFACT
+RUN mv /opt/$KAFKA_RELEASE_NAME /opt/$KAFKA_NAME-$KAFKA_VERSION
+
# The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sh
ARG KIBOSH_VERSION="8841dd392e6fbf02986e2fb1f1ebf04df344b65a"
diff --git a/modules/ducktests/tests/docker/requirements-dev.txt
b/modules/ducktests/tests/docker/requirements-dev.txt
index 08626a4b106..f868e43d674 100644
--- a/modules/ducktests/tests/docker/requirements-dev.txt
+++ b/modules/ducktests/tests/docker/requirements-dev.txt
@@ -15,4 +15,4 @@
-r requirements.txt
pytest==6.2.5
-flake8==3.8.3
+flake8==6.1.0
diff --git a/modules/ducktests/tests/docker/requirements.txt
b/modules/ducktests/tests/docker/requirements.txt
index e07096d488a..aba4f25a86d 100644
--- a/modules/ducktests/tests/docker/requirements.txt
+++ b/modules/ducktests/tests/docker/requirements.txt
@@ -15,3 +15,4 @@
filelock==3.8.2
ducktape==0.13.0
+looseversion==1.3.0
diff --git a/modules/ducktests/tests/docker/requirements-dev.txt
b/modules/ducktests/tests/ignitetest/services/kafka/__init__.py
similarity index 91%
copy from modules/ducktests/tests/docker/requirements-dev.txt
copy to modules/ducktests/tests/ignitetest/services/kafka/__init__.py
index 08626a4b106..f33fab7169f 100644
--- a/modules/ducktests/tests/docker/requirements-dev.txt
+++ b/modules/ducktests/tests/ignitetest/services/kafka/__init__.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
--r requirements.txt
-pytest==6.2.5
-flake8==3.8.3
+"""
+This module contains classes and utilities to start Kafka cluster.
+"""
diff --git a/modules/ducktests/tests/ignitetest/services/kafka/kafka.py
b/modules/ducktests/tests/ignitetest/services/kafka/kafka.py
new file mode 100644
index 00000000000..b4d43887dac
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/kafka/kafka.py
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from looseversion import LooseVersion
+from typing import NamedTuple
+
+from ducktape.utils.util import wait_until
+
+from ignitetest.services.utils.ducktests_service import DucktestsService
+from ignitetest.services.utils.log_utils import monitor_log
+from ignitetest.services.utils.path import PathAware
+
+
+class KafkaSettings:
+ """
+ Settings for kafka nodes.
+ """
+ def __init__(self, **kwargs):
+ self.zookeeper_connection_string =
kwargs.get("zookeeper_connection_string")
+ self.port = kwargs.get("port", 9092)
+
+ self.host = None
+ self.broker_id = None
+
+ version = kwargs.get("version")
+ if version:
+ if isinstance(version, str):
+ version = LooseVersion(version)
+
+ self.version = version
+ else:
+ self.version = LooseVersion("3.9.1")
+
+
+class KafkaService(DucktestsService, PathAware):
+ """
+ Kafka service.
+ """
+ LOG_FILENAME = "kafka.log"
+
+ def __init__(self, context, num_nodes, settings: KafkaSettings,
start_timeout_sec=60):
+ super().__init__(context, num_nodes)
+ self.settings = settings
+ self.start_timeout_sec = start_timeout_sec
+ self.init_logs_attribute()
+
+ @property
+ def product(self):
+ return "%s-%s" % ("kafka", self.settings.version)
+
+ @property
+ def globals(self):
+ return self.context.globals
+
+ @property
+ def log_config_file(self):
+ return os.path.join(self.config_dir, "log4j.properties")
+
+ @property
+ def config_file(self):
+ return os.path.join(self.config_dir, "server.properties")
+
+ def start(self, **kwargs):
+ super().start(**kwargs)
+ self.logger.info("Waiting for Kafka ...")
+
+ for node in self.nodes:
+ self.await_kafka(node, self.start_timeout_sec)
+
+ self.logger.info("Kafka cluster is formed.")
+
+ def start_node(self, node, **kwargs):
+ idx = self.idx(node)
+
+ self.logger.info("Starting Kafka broker %d on %s", idx,
node.account.hostname)
+
+ self.init_persistent(node)
+
+ self.settings.host = node.account.externally_routable_ip
+ self.settings.broker_id = idx
+
+ config_file = self.render('server.properties.j2',
settings=self.settings, data_dir=self.work_dir)
+ node.account.create_file(self.config_file, config_file)
+ self.logger.info("Kafka config %s", config_file)
+
+ log_config_file = self.render('log4j.properties.j2',
log_dir=self.log_dir)
+ node.account.create_file(self.log_config_file, log_config_file)
+
+ start_cmd = f"nohup java
-Dlog4j.configuration=file:{self.log_config_file} " \
+ f"-cp {os.path.join(self.home_dir, 'libs')}/*:{self.config_dir} " \
+ f"kafka.Kafka {self.config_file} >/tmp/log 2>&1 &"
+
+ node.account.ssh(start_cmd)
+
+ def wait_node(self, node, timeout_sec=20):
+ wait_until(lambda: not self.alive(node), timeout_sec=timeout_sec)
+
+ return not self.alive(node)
+
+ def await_kafka(self, node, timeout):
+ """
+ Await kafka broker started on node.
+ :param node: Kafka service node.
+ :param timeout: Wait timeout.
+ """
+ with monitor_log(node, self.log_file, from_the_beginning=True) as
monitor:
+ monitor.wait_until("KafkaServer.*started",
+ timeout_sec=timeout,
+ err_msg=f"Kafka cluster was not formed on
{node.account.hostname}")
+
+ def create_topic(self, name, partitions=1, replication_factor=1,
retention_ms=None):
+ """
+ Create kafka topic
+ :param name: Topic name
+ :param partitions: Number of partitions
+ :param replication_factor: Replication factor
+ :param retention_ms: Retention in milliseconds
+ """
+ create_topic_cmd = f"{os.path.join(self.home_dir, 'bin',
'kafka-topics.sh')} --create " \
+ f"--topic {name} --bootstrap-server {self.connection_string()} " \
+ f"--partitions {partitions} --replication-factor
{replication_factor}"
+
+ if retention_ms is not None:
+ create_topic_cmd = create_topic_cmd + f" --config
retention.ms={retention_ms}"
+
+ self.nodes[0].account.ssh(create_topic_cmd)
+
+ def offsets(self, topics=None):
+ """
+ Return offset info for all consumer groups and partitions.
+ :param topics: List of topics to process. Return info about all topics
if None.
+ """
+ kafka_consumer_groups_cmd =\
+ f"{os.path.join(self.home_dir, 'bin', 'kafka-consumer-groups.sh')}
--describe " \
+ f"--all-groups --bootstrap-server {self.connection_string()}
2>/dev/null |" \
+ f"grep . | grep -v GROUP | sed -E 's/ */ /g'"
+
+ def callback(line):
+ fields = line.strip().split(' ')
+
+ return ConsumerOffsetInfo(
+ group=fields[0],
+ topic=fields[1],
+ part=int(fields[2]),
+ current_offset=int(fields[3]),
+ log_end_offset=int(fields[4]),
+ lag=int(fields[5]),
+ consumer_id=fields[6] if fields[6] != '-' else None,
+ host=fields[7] if fields[7] != '-' else None,
+ client_id=fields[8] if fields[8] != '-' else None
+ )
+
+ offsets = self.nodes[0].account.ssh_capture(kafka_consumer_groups_cmd,
callback=callback)
+
+ return [o for o in offsets if not topics or (o.topic in topics)]
+
+ @property
+ def log_file(self):
+ """
+ :return: current log file of node.
+ """
+ return os.path.join(self.log_dir, self.LOG_FILENAME)
+
+ @staticmethod
+ def java_class_name():
+ """ The class name of the Kafka broker. """
+ return "kafka.Kafka"
+
+ def pids(self, node):
+ """
+ Get pids of kafka service node.
+ :param node: Kafka service node.
+ :return: List of pids.
+ """
+ return node.account.java_pids(self.java_class_name())
+
+ def alive(self, node):
+ """
+ Check if kafka service node is alive.
+ :param node: Kafka service node.
+ :return: True if node is alive
+ """
+ return len(self.pids(node)) > 0
+
+ def connection_string(self):
+ """
+ Form a connection string to kafka cluster.
+ :return: Connection string.
+ """
+ return ','.join([node.account.hostname + ":" + str(self.settings.port)
for node in self.nodes])
+
+ def stop_node(self, node, force_stop=True, **kwargs):
+ idx = self.idx(node)
+ self.logger.info("Stopping %s node %d on %s" % (type(self).__name__,
idx, node.account.hostname))
+
+ allow_fail = kwargs["allow_fail"] if "allow_fail" in kwargs else False
+
+ node.account.kill_process("kafka", clean_shutdown=not force_stop,
allow_fail=allow_fail)
+
+ def clean_node(self, node, **kwargs):
+ super().clean_node(node, **kwargs)
+
+ self.logger.info("Cleaning Kafka node %d on %s", self.idx(node),
node.account.hostname)
+ node.account.ssh(f"rm -rf -- {self.persistent_root}", allow_fail=False)
+
+
+class ConsumerOffsetInfo(NamedTuple):
+ """
+ Consumer offset record.
+ """
+ group: str
+ topic: str
+ part: int
+ current_offset: int
+ log_end_offset: int
+ lag: int
+ consumer_id: str
+ host: str
+ client_id: str
diff --git
a/modules/ducktests/tests/ignitetest/services/kafka/templates/log4j.properties.j2
b/modules/ducktests/tests/ignitetest/services/kafka/templates/log4j.properties.j2
new file mode 100644
index 00000000000..8517212f116
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/kafka/templates/log4j.properties.j2
@@ -0,0 +1,33 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+kafka.root.logger=INFO, FILE
+
+kafka.log.dir={{ log_dir }}
+kafka.log.file={{ LOG_FILENAME }}
+kafka.log.threshold=INFO
+kafka.log.maxfilesize=256MB
+kafka.log.maxbackupindex=20
+
+log4j.rootLogger=${kafka.root.logger}
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.Threshold=${kafka.log.threshold}
+log4j.appender.FILE.File=${kafka.log.dir}/${kafka.log.file}
+log4j.appender.FILE.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.ConversionPattern=[%d] %p %m (%c)%n
diff --git
a/modules/ducktests/tests/ignitetest/services/kafka/templates/server.properties.j2
b/modules/ducktests/tests/ignitetest/services/kafka/templates/server.properties.j2
new file mode 100644
index 00000000000..18ae089ad47
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/kafka/templates/server.properties.j2
@@ -0,0 +1,22 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+broker.id={{ settings.broker_id }}
+listeners=PLAINTEXT://{{ settings.host }}:{{ settings.port }}
+zookeeper.connect={{ settings.zookeeper_connection_string }}
+offsets.topic.replication.factor=1
+log.dirs={{ data_dir }}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 223757b081d..87fe8b6732d 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -33,7 +33,7 @@ from ignitetest.services.utils.config_template import
IgniteClientConfigTemplate
from ignitetest.services.utils.jvm_utils import create_jvm_settings,
merge_jvm_settings
from ignitetest.services.utils.path import get_home_dir, IgnitePathAware
from ignitetest.services.utils.ssl.ssl_params import is_ssl_enabled
-from ignitetest.services.utils.metrics.metrics import
is_opencensus_metrics_enabled, configure_opencensus_metrics,\
+from ignitetest.services.utils.metrics.metrics import
is_opencensus_metrics_enabled, configure_opencensus_metrics, \
is_jmx_metrics_enabled, configure_jmx_metrics
from ignitetest.services.utils.jmx_remote.jmx_remote_params import
get_jmx_remote_params
from ignitetest.utils.ignite_test import JFR_ENABLED, SAFEPOINT_LOGS_ENABLED
diff --git a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
index fc6e03caba4..b3a8b689ef6 100644
--- a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
+++ b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
@@ -18,7 +18,7 @@ This module contains classes and utilities to start zookeeper
cluster for testin
"""
import os.path
-from distutils.version import LooseVersion
+from looseversion import LooseVersion
from ducktape.utils.util import wait_until
diff --git a/modules/ducktests/tests/ignitetest/tests/smoke_test.py
b/modules/ducktests/tests/ignitetest/tests/smoke_test.py
index 7ace6ac9228..7e0be2e3f75 100644
--- a/modules/ducktests/tests/ignitetest/tests/smoke_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/smoke_test.py
@@ -19,6 +19,7 @@ This module contains smoke tests that checks that services
work
from ignitetest.services.ignite import IgniteService
from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.kafka.kafka import KafkaService, KafkaSettings
from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster
from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
from ignitetest.services.zk.zookeeper import ZookeeperService
@@ -74,3 +75,20 @@ class SmokeServicesTest(IgniteTest):
zookeeper = ZookeeperService(self.test_context, num_nodes=3)
zookeeper.start()
zookeeper.stop()
+
+ @cluster(num_nodes=6)
+ def test_kafka_start_stop(self):
+ """
+ Test that KafkaService correctly start and stop
+ """
+ zookeeper = ZookeeperService(self.test_context, num_nodes=3)
+ zookeeper.start()
+
+ kafka_settings =
KafkaSettings(zookeeper_connection_string=zookeeper.connection_string())
+ kafka = KafkaService(self.test_context, num_nodes=3,
settings=kafka_settings)
+ kafka.start()
+
+ kafka.create_topic("topic")
+
+ kafka.stop()
+ zookeeper.stop()
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index 67b205f0006..1feda7859ea 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -61,7 +61,7 @@ class IgniteTest(Test):
Basic ignite test.
"""
def __init__(self, test_context):
- assert isinstance(test_context, IgniteTestContext),\
+ assert isinstance(test_context, IgniteTestContext), \
"any IgniteTest MUST BE decorated with the
@ignitetest.utils.cluster decorator"
super().__init__(test_context=test_context)
diff --git a/modules/ducktests/tests/ignitetest/utils/version.py
b/modules/ducktests/tests/ignitetest/utils/version.py
index 929d53e3729..781ef33aa3e 100644
--- a/modules/ducktests/tests/ignitetest/utils/version.py
+++ b/modules/ducktests/tests/ignitetest/utils/version.py
@@ -17,7 +17,7 @@
Module contains ignite version utility class.
"""
import re
-from distutils.version import LooseVersion
+from looseversion import LooseVersion
from ignitetest import __version__