This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3084e45e6d6 IGNITE-26371 [ducktests] Add CDC extensions tests (#12356)
3084e45e6d6 is described below

commit 3084e45e6d6a6c8db6a3e3eb39167a389903adb0
Author: Sergey Korotkov <[email protected]>
AuthorDate: Mon Oct 13 19:30:28 2025 +0700

    IGNITE-26371 [ducktests] Add CDC extensions tests (#12356)
---
 modules/ducktests/README.md                        |  19 +
 .../tests/cdc/CdcContinuousUpdatesApplication.java |  98 ++++
 modules/ducktests/tests/docker/Dockerfile          |   2 +-
 modules/ducktests/tests/docker/ducker-ignite       |   8 +
 .../ignitetest/services/utils/cdc/cdc_helper.py    | 233 ++++++++++
 .../utils/cdc/ignite_to_ignite_cdc_helper.py       |  83 ++++
 .../cdc/ignite_to_ignite_client_cdc_helper.py      |  65 +++
 .../utils/cdc/ignite_to_kafka_cdc_helper.py        | 185 ++++++++
 .../services/utils/cdc/kafka_to_ignite.py          | 232 ++++++++++
 .../services/utils/cdc/templates/ignite_cdc.j2     |  44 ++
 .../cdc/templates/ignite_to_ignite_cdc_streamer.j2 |  44 ++
 .../ignite_to_ignite_client_cdc_streamer.j2        |  43 ++
 .../cdc/templates/ignite_to_kafka.properties.j2    |  19 +
 .../cdc/templates/ignite_to_kafka_cdc_streamer.j2  |  24 +
 .../cdc/templates/kafka_to_ignite.properties.j2    |  22 +
 .../utils/cdc/templates/properties_macro.j2        |  20 +
 .../ignitetest/services/utils/config_template.py   |   3 +-
 .../tests/ignitetest/services/utils/ignite_spec.py |  10 +-
 .../services/utils/templates/misc_macro.j2         |   2 +-
 .../tests/cdc/cdc_replication_abstract_test.py     | 496 +++++++++++++++++++++
 .../ignitetest/tests/cdc/cdc_replication_test.py   |  90 ++++
 21 files changed, 1738 insertions(+), 4 deletions(-)

diff --git a/modules/ducktests/README.md b/modules/ducktests/README.md
index 96c7708e69b..2bea2c532af 100644
--- a/modules/ducktests/README.md
+++ b/modules/ducktests/README.md
@@ -11,6 +11,23 @@ Structure of the `tests` directory is:
 - `./ignitetest/tests` contains tests.
 - `./checks` contains unit tests of utils, tests' decorators etc. 
 
+Some tests (like the CDC replication ones) require modules maintained in the 
+separate [ignite-extensions](https://github.com/apache/ignite-extensions) 
repository.
+
+To run these tests the `ignite-extensions` working directory should be checked 
out
+to the same filesystem level next to the `ignite` one.  So it should be the 
below structure
+of directories:
+- `./ignite` working directory checked out from the ignite repository 
(mentioned as `${IGNITE_HOME}` below) 
+- `./ignite-extensions` working directory checked out from the 
ignite-extensions repository
+
+The needed extension module should be built before tests run. 
+
+For example for the CDC replication tests the `cdc-ext` module should be built 
as:  
+```
+cd ${IGNITE_HOME}\..\ignite-extensions
+mvn clean package -pl :ignite-cdc-ext -Pskip-docs -DskipTests
+```
+
 # Local run
 Docker is used to emulate distributed environment. Single container represents 
 a running node.
@@ -22,6 +39,7 @@ For development process requirements are `python` >= 3.7.
 ## Run tests
 - Change a current directory to`${IGNITE_HOME}`
 - Build Apache IGNITE invoking `${IGNITE_HOME}/scripts/build-module.sh 
ducktests`
+- (Optionally) Build the needed extension modules in the 
`${IGNITE_HOME}\..\ignite-extensions` directory
 - Change a current directory to `${IGNITE_HOME}/modules/ducktests/tests`
 - Run tests in docker containers using a following command:
 ```
@@ -50,6 +68,7 @@ Custom cluster, Vagrant, K8s, Mesos, Docker, cloud providers, 
etc.
 ## Run tests
 - Change a current directory to`${IGNITE_HOME}`
 - Build Apache IGNITE invoking `${IGNITE_HOME}/scripts/build-module.sh 
ducktests`
+- (Optionally) Build the needed extension modules in the 
`${IGNITE_HOME}\..\ignite-extensions` directory
 - Run tests using 
[Ducktape](https://ducktape-docs.readthedocs.io/en/latest/run_tests.html). \
   For example:
   ```
diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CdcContinuousUpdatesApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CdcContinuousUpdatesApplication.java
new file mode 100644
index 00000000000..cc6c28b835e
--- /dev/null
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CdcContinuousUpdatesApplication.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cdc;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Application continuously writes to cache spreading keys among clusters.
+ */
+public class CdcContinuousUpdatesApplication extends IgniteAwareApplication {
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) throws Exception {
+        String cacheName = jsonNode.get("cacheName").asText();
+
+        int range = jsonNode.get("range").asInt();
+
+        int clusterIdx = jsonNode.get("clusterIdx").asInt();
+
+        int clusterCnt = Optional.ofNullable(jsonNode.get("clusterCnt"))
+            .map(JsonNode::asInt)
+            .orElse(2);
+
+        long pacing = Optional.ofNullable(jsonNode.get("pacing"))
+            .map(JsonNode::asLong)
+            .orElse(0L);
+
+        assert clusterIdx < clusterCnt;
+
+        log.info("Test props:" +
+            " cacheName=" + cacheName +
+            " range=" + range +
+            " clusterCnt=" + clusterCnt +
+            " clusterIdx=" + clusterIdx);
+
+        IgniteCache<Integer, UUID> cache = ignite.getOrCreateCache(cacheName);
+        log.info("Node name: " + ignite.name() + " starting cache 
operations.");
+
+        markInitialized();
+
+        long putCnt = 0;
+
+        while (!terminated()) {
+            int key = ThreadLocalRandom.current().nextInt(range);
+
+            if (key % clusterCnt != clusterIdx)
+                continue;
+
+            UUID uuid = UUID.randomUUID();
+
+            cache.put(key, uuid);
+
+            putCnt++;
+
+            Thread.sleep(pacing);
+        }
+
+        long removeCnt = 0;
+
+        for (int i = 0; i < range / 4; i++) {
+            int key = ThreadLocalRandom.current().nextInt(range);
+
+            if (key % clusterCnt != clusterIdx)
+                continue;
+
+            cache.remove(key);
+
+            removeCnt++;
+        }
+
+        log.info("Finished cache operations [node=" + ignite.name() + ", 
putCnt=" + putCnt + ", removeCnt=" + removeCnt + "]");
+
+        recordResult("putCnt", putCnt);
+        recordResult("removeCnt", removeCnt);
+
+        markFinished();
+    }
+}
diff --git a/modules/ducktests/tests/docker/Dockerfile 
b/modules/ducktests/tests/docker/Dockerfile
index e6880e37115..72ea79a38f6 100644
--- a/modules/ducktests/tests/docker/Dockerfile
+++ b/modules/ducktests/tests/docker/Dockerfile
@@ -111,4 +111,4 @@ USER ducker
 CMD sudo service ssh start && tail -f /dev/null
 
 # Container port exposure
-EXPOSE 11211 47100 47500 49112 10800 8080 2888 3888 2181 1098 8082
+EXPOSE 11211 47100 47500 49112 10800 8080 2888 3888 2181 1098 8082 8083
diff --git a/modules/ducktests/tests/docker/ducker-ignite 
b/modules/ducktests/tests/docker/ducker-ignite
index 8872f61dd19..10d2b8f7a6d 100755
--- a/modules/ducktests/tests/docker/ducker-ignite
+++ b/modules/ducktests/tests/docker/ducker-ignite
@@ -304,6 +304,13 @@ docker_run() {
         done
     fi
 
+    # Mount the ignite-extensions sources directory if it exists.
+    local mount_ignite_ext=""
+    if [[ -d "${ignite_dir}/../ignite-extensions" ]]; then
+        local ignite_ext_dir="$( cd "${ignite_dir}/../ignite-extensions" && 
pwd )"
+        mount_ignite_ext="--mount 
type=bind,source="${ignite_ext_dir}",target=/opt/ignite-extensions,consistency=delegated"
+    fi
+
     # Invoke docker-run. We need privileged mode to be able to run iptables
     # and mount FUSE filesystems inside the container.  We also need it to
     # run iptables inside the container.
@@ -315,6 +322,7 @@ docker_run() {
         --memory=${docker_run_memory_limit} \
         --memory-swappiness=1 \
         --mount 
type=bind,source="${ignite_dir}",target=/opt/ignite-dev,consistency=delegated \
+        "${mount_ignite_ext}" \
         $DOCKER_OPTIONS \
         --name "${node}" \
         -- "${image_name}"
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/cdc_helper.py 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/cdc_helper.py
new file mode 100644
index 00000000000..9965dbde21f
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/cdc_helper.py
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+"""
+This module contains base helper class for CDC configuration with different
+CDC consumers implemented in ignite extensions.
+"""
+
+import time
+from typing import NamedTuple
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
+from ignitetest.services.utils.cdc.ignite_cdc import IgniteCdcUtility
+from ignitetest.services.utils.jmx_utils import JmxClient
+
+
+class CdcParams:
+    """
+    CDC parameters.
+    """
+    def __init__(self, caches=None, max_batch_size=None, only_primary=None,
+                 conflict_resolve_field=None, cdc_configuration=None):
+        self.caches = caches
+        self.max_batch_size = max_batch_size
+        self.only_primary = only_primary
+        self.conflict_resolve_field = conflict_resolve_field
+
+        self.cdc_configuration = CdcConfiguration() if cdc_configuration is 
None else cdc_configuration
+
+
+class CdcConfiguration(NamedTuple):
+    """
+    CDC configuration template parameters.
+    """
+    check_frequency: int = None
+    keep_binary: bool = None
+    lock_timeout: int = None
+    metric_exporter_spi: set = None
+
+
+class CdcContext:
+    """
+    CDC context.
+
+    Keeps information about CDC configuration needed to control the CDC 
(start, stop, wait etc.).
+    In particular stores references to ignite-cdc and source cluster services.
+    Different CDC extension helpers may add more fields.
+    """
+    def __init__(self):
+        self.cdc_params = None
+        self.ignite_cdc = None
+        self.source_cluster = None
+
+
+class CdcHelper:
+    """
+    Base CDC helper class for different CDC consumer extensions.
+    """
+    def configure(self, src_cluster, dst_cluster, cdc_params):
+        """
+        Configures the CDC. Updates the src_cluster service in place.
+
+        May be overridden by subclasses.
+
+        :param src_cluster: Ignite service for the source cluster.
+        :param dst_cluster: Ignite service for the destination cluster.
+        :param cdc_params: CDC test parameters.
+
+        :return: CDC context
+        """
+        ctx = CdcContext()
+
+        ctx.cdc_params = cdc_params
+        ctx.source_cluster = src_cluster
+
+        beans = self.get_src_cluster_cdc_ext_beans(src_cluster, dst_cluster, 
cdc_params, ctx)
+
+        src_cluster.config = src_cluster.config._replace(
+            ext_beans=[
+                *src_cluster.config.ext_beans,
+                *beans
+            ]
+        )
+
+        ctx.ignite_cdc = IgniteCdcUtility(src_cluster)
+
+        return ctx
+
+    def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster, 
cdc_params, ctx):
+        """
+        Returns list of CDC beans required to be created in the source Ignite 
cluster.
+        May update the CDC contex in place.
+
+        Supposed to be overridden and extended by subclasses.
+
+        :param src_cluster: Ignite service for the source cluster.
+        :param dst_cluster: Ignite service for the destination cluster.
+        :param cdc_params: CDC test parameters.
+        :param ctx: CDC context.
+        :return: List of beans. Each bean is represented as a pair of j2 
template and params instance.
+        """
+        if cdc_params.cdc_configuration.metric_exporter_spi is None:
+            if src_cluster.config.metric_exporters is None:
+                src_cluster.config.metric_exporters = set()
+
+        
src_cluster.config.metric_exporters.add("org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi")
+
+        cdc_params.cdc_configuration = cdc_params.cdc_configuration._replace(
+            metric_exporter_spi=src_cluster.config.metric_exporters
+        )
+
+        return [("ignite_cdc.j2", cdc_params.cdc_configuration)]
+
+    def start_ignite_cdc(self, ctx):
+        """
+        Starts CDC.
+
+        Supposed to be overridden and extended by subclasses.
+        Default implementation starts process executing the CDC consumer 
(ignite_cdc.sh).
+
+        :param ctx: CDC context.
+        """
+        ctx.ignite_cdc.start()
+
+    def stop_ignite_cdc(self, ctx, timeout_sec):
+        """
+        Stops CDC.
+
+        Supposed to be overridden and extended by subclasses.
+        Default implementation stops process executing the CDC consumer 
(ignite_cdc.sh).
+
+        :param ctx: CDC context.
+        :param timeout_sec: Timeout.
+        :return: Arbitrary CDC consumer specific metrics (if any).
+        """
+        ctx.ignite_cdc.stop()
+
+        try:
+            ctx.source_cluster.await_event("WalRecordsConsumer stopped",
+                                           timeout_sec=timeout_sec, 
from_the_beginning=True,
+                                           log_file="ignite-cdc.log")
+        except TimeoutError:
+            ctx.ignite_cdc.stop(force_stop=True)
+
+        return {}
+
+    def wait_cdc(self, ctx, no_new_events_period_secs, timeout_sec):
+        """
+        Waits all events are processed by the CDC streamer.
+
+        It's considered that all events are processed if no new events are 
processed
+        for last 'no_new_events_period_secs' seconds.
+
+        :param ctx: CDC context.
+        :param no_new_events_period_secs: Time period to wait for new events.
+        :param timeout_sec: Timeout.
+        """
+        wait_ignite_cdc_service(ctx.ignite_cdc, no_new_events_period_secs, 
timeout_sec)
+
+
+def wait_ignite_cdc_service(ignite_cdc, no_new_events_period_secs, 
timeout_sec):
+    """
+    Waits all events are processed by the CDC streamer in service passed.
+
+    It's considered that all events are processed if no new events are 
processed
+    for last 'no_new_events_period_secs' seconds.
+
+    :param ignite_cdc: Service running the CDC.
+    :param no_new_events_period_secs: Time period to wait for new events.
+    :param timeout_sec: Timeout.
+    """
+    start = time.time()
+    end = start + timeout_sec
+
+    while True:
+        now = time.time()
+        if now > end:
+            raise TimeoutError(f"Timed out waiting {timeout_sec} seconds for 
ignite_cdc.sh to stream all data.")
+
+        last = last_ignite_cdc_event_time(ignite_cdc)
+
+        if last + no_new_events_period_secs < now:
+            return
+        else:
+            time.sleep(1)
+
+
+def last_ignite_cdc_event_time(ignite_cdc):
+    """
+    Requests timestamp (unix time in seconds) of the last CDC event processed
+    by the CDC streamer in service passed.
+
+    :param ignite_cdc: Service running the CDC.
+    :return: Timestamp of the last CDC event (unix time in seconds).
+    """
+    def last_event_time_on(node):
+        jmx_client = JmxClient(node)
+
+        if isinstance(ignite_cdc, IgniteCdcUtility):
+            main_java_class = ignite_cdc.APP_SERVICE_CLASS
+        else:
+            main_java_class = ignite_cdc.main_java_class
+
+        pids = ignite_cdc.pids(node, main_java_class)
+
+        if len(pids) == 0:
+            raise AssertionError("ignite_cdc java process is not found on 
node: " + node.account.hostname)
+
+        jmx_client.pid = pids[0]
+
+        try:
+            mbean = jmx_client.find_mbean('.*name=cdc.*')
+
+            return int(next(mbean.LastEventTime).strip())
+        except (StopIteration, RemoteCommandError):
+            ignite_cdc.logger.warn("Filed to read LastEventTime metric from 
ignite_cdc, node: " + node.account.hostname)
+
+            return -1
+
+    return max([last_event_time_on(node) for node in ignite_cdc.nodes]) / 1_000
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_cdc_helper.py
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_cdc_helper.py
new file mode 100644
index 00000000000..b41fb604e92
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_cdc_helper.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+from typing import NamedTuple
+
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.cdc.cdc_helper import CdcHelper, CdcParams
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
+from ignitetest.services.utils.metrics.metrics import OPENCENSUS_TEMPLATE_FILE
+
+
+class IgniteToIgniteCdcHelper(CdcHelper):
+    """
+    CDC helper for the IgniteToIgniteCdcStreamer.
+    """
+    def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster, 
cdc_params, ctx):
+        beans: list = super().get_src_cluster_cdc_ext_beans(src_cluster, 
dst_cluster, cdc_params, ctx)
+
+        dst_cluster_client_config = dst_cluster.config._replace(
+            client_mode=True,
+            ssl_params=None,
+            ext_beans=[],
+            data_storage=None,
+        )
+
+        dummy_client = IgniteApplicationService(dst_cluster.context,
+                                                dst_cluster_client_config,
+                                                java_class_name="")
+        dst_cluster_client_config = 
dummy_client.spec.extend_config(dst_cluster_client_config)
+
+        remove_bean_by_template_name(dst_cluster_client_config.ext_beans, 
OPENCENSUS_TEMPLATE_FILE)
+
+        dst_cluster_client_config = 
dst_cluster_client_config._replace(metric_exporters={})
+
+        dummy_client.free()
+
+        params = IgniteToIgniteCdcStreamerTemplateParams(
+            dst_cluster,
+            dst_cluster_client_config,
+            cdc=cdc_params
+        )
+
+        beans.append((
+            "ignite_to_ignite_cdc_streamer.j2",
+            params
+        ))
+
+        return beans
+
+
+def remove_bean_by_template_name(beans, template_name):
+    """
+    Removes bean from the list.
+
+    :param beans: List of beans. Bean is a tuple (template_name, params).
+    :param template_name: Template file name.
+    """
+    bean = next((b for b in beans if b[0] == template_name), None)
+
+    if bean:
+        beans.remove(bean)
+
+
+class IgniteToIgniteCdcStreamerTemplateParams(NamedTuple):
+    """
+    IgniteToIgniteCdcStreamer template parameters.
+    """
+    dst_cluster: IgniteAwareService
+    dst_cluster_client_config: IgniteConfiguration
+    cdc: CdcParams
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_client_cdc_helper.py
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_client_cdc_helper.py
new file mode 100644
index 00000000000..4180512d985
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_client_cdc_helper.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+from typing import NamedTuple
+
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.cdc.cdc_helper import CdcHelper, CdcParams
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.utils.ignite_configuration import 
IgniteThinClientConfiguration
+
+
+class IgniteToIgniteClientCdcHelper(CdcHelper):
+    """
+    CDC helper for the IgniteToIgniteClientCdcStreamer.
+    """
+    def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster, 
cdc_params, ctx):
+        beans: list = super().get_src_cluster_cdc_ext_beans(src_cluster, 
dst_cluster, cdc_params, ctx)
+
+        addresses = [dst_cluster.nodes[0].account.hostname + ":" +
+                     
str(dst_cluster.config.client_connector_configuration.port)]
+
+        dst_cluster_client_config = IgniteThinClientConfiguration(
+            addresses=addresses,
+            version=dst_cluster.config.version)
+
+        dummy_client = IgniteApplicationService(dst_cluster.context,
+                                                dst_cluster_client_config,
+                                                java_class_name="")
+        dst_cluster_client_config = 
dummy_client.spec.extend_config(dst_cluster_client_config)
+
+        dummy_client.free()
+
+        params = IgniteToIgniteClientCdcStreamerTemplateParams(
+            dst_cluster,
+            dst_cluster_client_config,
+            cdc=cdc_params
+        )
+
+        beans.append((
+            "ignite_to_ignite_client_cdc_streamer.j2",
+            params
+        ))
+
+        return beans
+
+
+class IgniteToIgniteClientCdcStreamerTemplateParams(NamedTuple):
+    """
+    IgniteToIgniteClientCdcStreamer template parameters.
+    """
+    dst_cluster: IgniteAwareService
+    dst_cluster_client_config: IgniteThinClientConfiguration
+    cdc: CdcParams
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_kafka_cdc_helper.py
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_kafka_cdc_helper.py
new file mode 100644
index 00000000000..5aaf1314d8d
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_kafka_cdc_helper.py
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+import os
+import time
+
+from ignitetest.services.utils import IgniteServiceType
+from ignitetest.services.utils.cdc.cdc_helper import CdcHelper, CdcParams
+from ignitetest.services.utils.cdc.kafka_to_ignite import 
KafkaToIgniteService, KafkaPropertiesTemplate
+from ignitetest.utils.bean import BeanRef, Bean
+
+DEFAULT_KAFKA_PARTITIONS_COUNT = 16
+DEFAULT_KAFKA_TO_IGNITE_NODES = 1
+DEFAULT_KAFKA_TO_IGNITE_THREAD_COUNT = 8
+DEFAULT_KAFKA_TOPIC = "ignite"
+DEFAULT_KAFKA_METADATA_TOPIC = "ignite-metadata"
+
+
+class IgniteToKafkaCdcHelper(CdcHelper):
+    """
+    CDC helper for IgniteToKafkaCdcStreamer.
+    """
+    def configure(self, src_cluster, dst_cluster, cdc_params):
+        ctx = super().configure(src_cluster, dst_cluster, cdc_params)
+
+        src_cluster.spec = get_ignite_to_kafka_spec(src_cluster.spec.__class__,
+                                                    
cdc_params.kafka.connection_string(),
+                                                    src_cluster)
+        return ctx
+
+    def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster, 
cdc_params, ctx):
+        ctx.kafka_to_ignite = KafkaToIgniteService(
+            dst_cluster.context,
+            cdc_params.kafka,
+            dst_cluster,
+            cdc_params=cdc_params,
+            jvm_opts=dst_cluster.spec.jvm_opts,
+            merge_with_default=True,
+            modules=dst_cluster.modules
+        )
+
+        beans = super().get_src_cluster_cdc_ext_beans(src_cluster, 
dst_cluster, cdc_params, ctx)
+
+        beans.append((
+            "ignite_to_kafka_cdc_streamer.j2",
+            Bean("org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer",
+                 bean_id="cdcConsumer",
+                 kafka_properties=BeanRef("kafkaProperties"),
+                 caches=cdc_params.caches,
+                 max_batch_size=cdc_params.max_batch_size,
+                 only_primary=cdc_params.only_primary,
+                 kafka_partitions=cdc_params.kafka_partitions,
+                 kafka_request_timeout=cdc_params.kafka_request_timeout,
+                 metadata_topic=cdc_params.metadata_topic,
+                 topic=cdc_params.topic)
+        ))
+
+        return beans
+
+    def start_ignite_cdc(self, ctx):
+        ctx.cdc_params.kafka.create_topic(
+            name=ctx.cdc_params.topic,
+            partitions=ctx.cdc_params.kafka_partitions,
+            retention_ms=ctx.cdc_params.kafka_retention_ms)
+
+        ctx.cdc_params.kafka.create_topic(
+            name=ctx.cdc_params.metadata_topic,
+            partitions=ctx.cdc_params.kafka_partitions)
+
+        ctx.kafka_to_ignite.start()
+
+        super().start_ignite_cdc(ctx)
+
+    def stop_ignite_cdc(self, ctx, timeout_sec):
+        super().stop_ignite_cdc(ctx, timeout_sec)
+
+        start = time.time()
+
+        ctx.kafka_to_ignite.await_all_consumed(timeout_sec)
+
+        kafka_to_ignite_lag_sec = time.time() - start
+
+        ctx.kafka_to_ignite.stop()
+
+        metrics = {
+            "kafka_to_ignite_lag_sec": kafka_to_ignite_lag_sec
+        }
+
+        return metrics
+
+
+def get_ignite_to_kafka_spec(base, kafka_connection_string, service):
+    """
+    Dynamically create IgniteSpec subclass to run IgniteToKafkaCdcStreamer in 
scope of service provided.
+
+    :param base: Base IgniteSpec class.
+    :param kafka_connection_string: Kafka connection string.
+    :param service: IgniteService.
+    :return: IgniteSpec instance for IgniteToKafkaCdcStreamer.
+    """
+    class IgniteToKafkaSpec(base):
+        def libs(self):
+            libs = super().libs()
+
+            libs.extend([os.path.join(self.service.config_dir)])
+
+            return libs
+
+        def config_templates(self):
+            templates = super().config_templates()
+
+            templates.extend([
+                ("kafka.properties", 
KafkaPropertiesTemplate("ignite_to_kafka.properties.j2", {
+                    "kafka_connection_string": kafka_connection_string
+                }))])
+
+            return templates
+
+    return IgniteToKafkaSpec(service, service.spec.jvm_opts, 
merge_with_default=True)
+
+
+class KafkaCdcParams(CdcParams):
+    """
+    Parameters for Kafka CDC.
+    """
+    def __init__(self,
+                 kafka=None,
+                 kafka_partitions=None,
+                 kafka_request_timeout=None,
+                 topic=None,
+                 metadata_topic=None,
+                 kafka_to_ignite_client_type=None,
+                 kafka_to_ignite_nodes=None,
+                 kafka_to_ignite_thread_count=None,
+                 kafka_to_ignite_max_batch_size=None,
+                 kafka_to_ignite_metadata_consumer_group=None,
+                 kafka_to_ignite_kafka_consumer_poll_timeout=None,
+                 kafka_to_ignite_kafka_request_timeout=None,
+                 kafka_retention_ms=None,
+                 **kwargs):
+        super().__init__(**kwargs)
+
+        self.kafka = kafka
+
+        self.kafka_partitions = kafka_partitions \
+            if kafka_partitions is not None else DEFAULT_KAFKA_PARTITIONS_COUNT
+
+        self.kafka_request_timeout = kafka_request_timeout
+
+        self.topic = topic \
+            if topic is not None else DEFAULT_KAFKA_TOPIC
+
+        self.metadata_topic = metadata_topic \
+            if metadata_topic is not None else DEFAULT_KAFKA_METADATA_TOPIC
+
+        self.kafka_retention_ms = kafka_retention_ms
+
+        self.kafka_to_ignite_client_type = kafka_to_ignite_client_type \
+            if kafka_to_ignite_client_type is not None else 
IgniteServiceType.NODE
+
+        self.kafka_to_ignite_nodes = kafka_to_ignite_nodes \
+            if kafka_to_ignite_nodes is not None else 
DEFAULT_KAFKA_TO_IGNITE_NODES
+
+        self.kafka_to_ignite_thread_count = kafka_to_ignite_thread_count \
+            if kafka_to_ignite_thread_count is not None else 
DEFAULT_KAFKA_TO_IGNITE_THREAD_COUNT
+
+        self.kafka_to_ignite_max_batch_size = kafka_to_ignite_max_batch_size
+
+        self.kafka_to_ignite_metadata_consumer_group = 
kafka_to_ignite_metadata_consumer_group
+
+        self.kafka_to_ignite_kafka_consumer_poll_timeout = 
kafka_to_ignite_kafka_consumer_poll_timeout
+
+        self.kafka_to_ignite_kafka_request_timeout = 
kafka_to_ignite_kafka_request_timeout
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/kafka_to_ignite.py 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/kafka_to_ignite.py
new file mode 100644
index 00000000000..1ffb501ecd1
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/kafka_to_ignite.py
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+"""
+This module contains kafka-to-ignite.sh utility wrapper.
+"""
+import os
+import time
+from copy import deepcopy
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils import IgniteServiceType
+from ignitetest.services.utils.config_template import ConfigTemplate
+from ignitetest.services.utils.ignite_configuration import 
IgniteThinClientConfiguration
+from ignitetest.services.utils.ignite_spec import envs_to_exports
+from ignitetest.utils.bean import Bean
+
+
+class KafkaToIgniteService(IgniteService):
+    """
+    Kafka to Ignite utility (kafka-to-ignite.sh) wrapper.
+    """
+    def __init__(self, context, kafka, dst_cluster, cdc_params, jvm_opts=None,
+                 merge_with_default=True, startup_timeout_sec=60, 
shutdown_timeout_sec=60, modules=None):
+        def add_cdc_ext_module(_modules):
+            if _modules:
+                _modules.append("cdc-ext")
+            else:
+                _modules = ["cdc-ext"]
+
+            return _modules
+
+        assert cdc_params.kafka_to_ignite_nodes <= 
cdc_params.kafka_partitions, \
+            (f"number of nodes ({cdc_params.kafka_to_ignite_nodes}) more then "
+             f"kafka topic partitions ({cdc_params.kafka_partitions})")
+
+        super().__init__(context, self.get_config(dst_cluster, 
cdc_params.kafka_to_ignite_client_type),
+                         cdc_params.kafka_to_ignite_nodes, jvm_opts, 
merge_with_default, startup_timeout_sec,
+                         shutdown_timeout_sec, add_cdc_ext_module(modules))
+
+        self.cdc_params = cdc_params
+
+        self.main_java_class = 
"org.apache.ignite.cdc.kafka.KafkaToIgniteCommandLineStartup"
+
+        self.spec = get_kafka_to_ignite_spec(self.spec.__class__,
+                                             kafka.connection_string(), self)
+
+        self.spec.jvm_opts += ["-Dlog4j.configurationFile=file:" + 
self.log_config_file]
+
+        self.kafka = kafka
+
+    def _prepare_configs(self, node):
+        parts_per_node = round(self.cdc_params.kafka_partitions / 
self.num_nodes)
+
+        idx = self.idx(node) - 1
+
+        parts_from = idx * parts_per_node
+
+        if idx == self.num_nodes - 1:
+            parts_to = self.cdc_params.kafka_partitions
+        else:
+            parts_to = (idx + 1) * parts_per_node
+
+        self.config = self.add_kafka_streamer_config(parts_from, parts_to)
+
+        super()._prepare_configs(node)
+
+    def add_kafka_streamer_config(self, parts_from, parts_to):
+        ext_beans = [("bean.j2", Bean(
+                
"org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration",
+                caches=self.cdc_params.caches,
+                max_batch_size=self.cdc_params.kafka_to_ignite_max_batch_size,
+                
kafka_request_timeout=self.cdc_params.kafka_to_ignite_kafka_request_timeout,
+                
kafka_consumer_poll_timeout=self.cdc_params.kafka_to_ignite_kafka_consumer_poll_timeout,
+                
metadata_consumer_group=self.cdc_params.kafka_to_ignite_metadata_consumer_group,
+                metadata_topic=self.cdc_params.metadata_topic,
+                topic=self.cdc_params.topic,
+                thread_count=self.cdc_params.kafka_to_ignite_thread_count,
+                kafka_parts_to=parts_to,
+                kafka_parts_from=parts_from)),
+            ("properties_macro.j2", {
+                "id": "kafkaProperties",
+                "location": "kafka.properties"
+            })
+        ]
+
+        return self.config._replace(ext_beans=ext_beans)
+
+    @staticmethod
+    def get_config(dst_cluster, client_type):
+        """
+        Returns kafka-to-ignite.sh ignite client configuration.
+
+        :param dst_cluster: Destination cluster.
+        :param client_type: Client type.
+        :return: Ignite client configuration.
+        """
+        if client_type == IgniteServiceType.NODE:
+            return dst_cluster.config._replace(
+                client_mode=True,
+                data_storage=None
+            )
+        else:
+            addresses = [dst_cluster.nodes[0].account.hostname + ":" +
+                         
str(dst_cluster.config.client_connector_configuration.port)]
+
+            return IgniteThinClientConfiguration(
+                addresses=addresses,
+                version=dst_cluster.config.version
+            )
+
+    def await_started(self):
+        """
+        Awaits kafka-to-ignite.sh started.
+        """
+        self.logger.info("Waiting for KafkaToIgnite(s) to start ...")
+
+        self.await_event(">>> Kafka partitions", self.startup_timeout_sec, 
from_the_beginning=True,
+                         log_file="kafka-ignite-streamer.log")
+
+    def await_all_consumed(self, timeout_sec):
+        """
+        Awaits all events are consumed from Kafka.
+
+        :param timeout_sec: Timeout.
+        """
+        start = time.time()
+        end = start + timeout_sec
+
+        while True:
+            now = time.time()
+            if now > end:
+                raise TimeoutError(f"Timed out waiting {timeout_sec} seconds 
for kafka_to_ignite.sh "
+                                   f"to consume all events from kafka.")
+
+            offsets = self.kafka.offsets([
+                self.cdc_params.topic,
+                self.cdc_params.metadata_topic
+            ])
+
+            all_consumed = all(map(lambda o: o.lag == 0, offsets))
+
+            if all_consumed:
+                self.logger.info("All events are consumed from kafka.")
+
+                break
+            elif any([not self.alive(n) for n in self.nodes]):
+                raise RuntimeError("Some kafka_to_ignite.sh instances are not 
alive while not "
+                                   "all events are consumed from kafka.")
+            else:
+                for offset in offsets:
+                    if offset.lag > 0:
+                        self.logger.debug(f"offsets: [topic={offset.topic}, 
part={offset.part}, "
+                                          f"lag={offset.lag}]")
+
+                time.sleep(2)
+
+
+def get_kafka_to_ignite_spec(base, kafka_connection_string, service):
+    """
+    Dynamically create IgniteSpec subclass to run KafkaToIgniteCdcStreamer in 
scope of service provided.
+
+    :param base: Base IgniteSpec class.
+    :param kafka_connection_string: Kafka connection string.
+    :param service: IgniteService.
+    :return: IgniteSpec instance for kafka-to-ignite application.
+    """
+    class KafkaToIgniteSpec(base):
+        def command(self, _):
+            envs = deepcopy(self.envs())
+
+            envs["IGNITE_HOME"] = self.service.home_dir
+
+            cmd = "%s %s %s %s 2>&1 | tee -a %s &" % \
+                  (envs_to_exports(envs),
+                   self.script("kafka-to-ignite.sh"),
+                   self.config_file_path(),
+                   self._jvm_opts(),
+                   os.path.join(self.service.log_dir, "console.log"))
+
+            return cmd
+
+        def libs(self):
+            libs = super().libs()
+
+            libs.extend([os.path.join(self.service.config_dir)])
+
+            return libs
+
+        def config_templates(self):
+            templates = super().config_templates()
+
+            templates.extend([
+                ("kafka.properties", 
KafkaPropertiesTemplate("kafka_to_ignite.properties.j2", {
+                    "kafka_connection_string": kafka_connection_string
+                }))])
+
+            return templates
+
+        def config_file_path(self):
+            return self.service.config_file if 
self.service.config.service_type == IgniteServiceType.NODE \
+                else self.service.thin_client_config_file
+
+        def script(self, cmd):
+            if self.service.config.version.is_dev:
+                return os.path.join(self.service.spec.extensions_home(), 
"modules", "cdc-ext", "bin", cmd)
+            else:
+                return self.service.script(cmd)
+
+    return KafkaToIgniteSpec(service, service.spec.jvm_opts)
+
+
+class KafkaPropertiesTemplate(ConfigTemplate):
+    """
+    Kafka client configuration properties file.
+    """
+    def __init__(self, template_file_name, params):
+        super().__init__(template_file_name)
+
+        self.default_params = params
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_cdc.j2 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_cdc.j2
new file mode 100644
index 00000000000..a739a533dc8
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_cdc.j2
@@ -0,0 +1,44 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+    <bean class="org.apache.ignite.cdc.CdcConfiguration" autowire="byType">
+        {% if params.metric_exporter_spi | length > 0 %}
+            <property name="metricExporterSpi">
+                <list>
+                {% for exporter in params.metric_exporter_spi %}
+                    {{ misc_utils.bean(exporter) }}
+                {% endfor %}
+                </list>
+            </property>
+        {% endif %}
+
+        {% if params.check_frequency != None %}
+            <property name="checkFrequency" value="{{ params.check_frequency 
}}"/>
+        {% endif %}
+
+        {% if params.keep_binary != None %}
+            <property name="keepBinary" value="{{ params.keep_binary }}"/>
+        {% endif %}
+
+        {% if params.lock_timeout != None %}
+            <property name="lockTimeout" value="{{ params.lock_timeout }}"/>
+        {% endif %}
+    </bean>
+{% endmacro %}
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_cdc_streamer.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_cdc_streamer.j2
new file mode 100644
index 00000000000..e5cc560efcb
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_cdc_streamer.j2
@@ -0,0 +1,44 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'ignite_configuration_macro.j2' as ignite_configuration %}
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+    <bean id="cdcConsumer" 
class="org.apache.ignite.cdc.IgniteToIgniteCdcStreamer">
+        <property name="destinationIgniteConfiguration">
+            {{ ignite_configuration.apply(params.dst_cluster_client_config, 
params.dst_cluster) }}
+        </property>
+
+        {% if params.cdc.max_batch_size != None %}
+        <property name="maxBatchSize" value="{{ params.cdc.max_batch_size }}"/>
+        {% endif %}
+
+        {% if params.cdc.only_primary != None %}
+        <property name="onlyPrimary" value="{{ params.cdc.only_primary }}"/>
+        {% endif %}
+
+        <property name="caches">
+            <list>
+                {% for v in params.cdc.caches %}
+                    <value>{{ v }}</value>
+                {% endfor %}
+            </list>
+        </property>
+    </bean>
+    {{ misc_utils.ext_beans(params.dst_cluster_client_config) }}
+{% endmacro %}
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_client_cdc_streamer.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_client_cdc_streamer.j2
new file mode 100644
index 00000000000..0ec79852d2c
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_client_cdc_streamer.j2
@@ -0,0 +1,43 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'client_configuration_macro.j2' as client_configuration %}
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+    <bean id="cdcConsumer" 
class="org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer">
+        <property name="destinationClientConfiguration">
+            {{ client_configuration.apply(params.dst_cluster_client_config, 
params.dst_cluster) }}
+        </property>
+
+        {% if params.cdc.max_batch_size != None %}
+        <property name="maxBatchSize" value="{{ params.cdc.max_batch_size }}"/>
+        {% endif %}
+
+        {% if params.cdc.only_primary != None %}
+        <property name="onlyPrimary" value="{{ params.cdc.only_primary }}"/>
+        {% endif %}
+
+        <property name="caches">
+            <list>
+                {% for v in params.cdc.caches %}
+                    <value>{{ v }}</value>
+                {% endfor %}
+            </list>
+        </property>
+    </bean>
+{% endmacro %}
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka.properties.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka.properties.j2
new file mode 100644
index 00000000000..e686999d943
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka.properties.j2
@@ -0,0 +1,19 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+bootstrap.servers={{ kafka_connection_string }}
+request.timeout.ms=10000
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka_cdc_streamer.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka_cdc_streamer.j2
new file mode 100644
index 00000000000..8fe9b095936
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka_cdc_streamer.j2
@@ -0,0 +1,24 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+    {{ misc_utils.bean(params) }}
+
+    <util:properties id="kafkaProperties" location="kafka.properties"/>
+{% endmacro %}
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/kafka_to_ignite.properties.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/kafka_to_ignite.properties.j2
new file mode 100644
index 00000000000..e4c2749bcd1
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/kafka_to_ignite.properties.j2
@@ -0,0 +1,22 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+bootstrap.servers={{ kafka_connection_string }}
+request.timeout.ms=10000
+group.id=kafka-to-ignite
+auto.offset.reset=earliest
+enable.auto.commit=false
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/properties_macro.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/properties_macro.j2
new file mode 100644
index 00000000000..d47e5ae2cb5
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/properties_macro.j2
@@ -0,0 +1,20 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% macro apply(config, properties) %}
+    <util:properties id="{{ properties.id }}" location="{{ properties.location 
}}" />
+{% endmacro %}
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/config_template.py 
b/modules/ducktests/tests/ignitetest/services/utils/config_template.py
index 16a6a9a3533..69fb32966fe 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/config_template.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/config_template.py
@@ -22,12 +22,13 @@ from jinja2 import FileSystemLoader, Environment
 
 IGNITE_TEMPLATE_PATH = 
os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
 ZK_TEMPLATE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 
"..", "zk", "templates")
+CDC_TEMPLATE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 
"cdc", "templates")
 DEFAULT_IGNITE_CONF = "ignite.xml.j2"
 DEFAULT_THIN_CLIENT_CONF = "thin_client_config.xml.j2"
 DEFAULT_THIN_JDBC_CONF = "thin_jdbc_config.xml.j2"
 DEFAULT_LOG4J2_CONF = "log4j2.xml.j2"
 
-TEMPLATE_PATHES = [IGNITE_TEMPLATE_PATH, ZK_TEMPLATE_PATH]
+TEMPLATE_PATHES = [IGNITE_TEMPLATE_PATH, ZK_TEMPLATE_PATH, CDC_TEMPLATE_PATH]
 
 
 class ConfigTemplate:
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 87fe8b6732d..a99aa163026 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -205,6 +205,9 @@ class IgniteSpec(metaclass=ABCMeta):
 
         return [os.path.join(project_dir, module_path) for module_path in 
module_libs]
 
+    def extensions_home(self):
+        return os.path.join(self.service.install_root, "ignite-extensions")
+
     def _module_libs(self, module_name):
         """
         Get list of paths to be added to classpath for the passed module for 
current spec.
@@ -212,7 +215,12 @@ class IgniteSpec(metaclass=ABCMeta):
         if module_name == "ducktests":
             return self.__get_module_libs(self.__home(str(DEV_BRANCH)), 
module_name, is_dev=True)
 
-        return self.__get_module_libs(self.__home(), module_name, 
self.service.config.version.is_dev)
+        if module_name.endswith("-ext") and self.service.config.version.is_dev:
+            home = self.extensions_home()
+        else:
+            home = self.__home()
+
+        return self.__get_module_libs(home, module_name, 
self.service.config.version.is_dev)
 
     @abstractmethod
     def command(self, node):
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2 
b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
index ca2c7e2a99b..d6b28e30ff4 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
@@ -79,7 +79,7 @@
                     </property>
                 {% elif value.ref | length > 0 %}
                     <property name="{{ name  | snake_to_camel }}" ref="{{ 
value.ref }}"/>
-                {% else %}
+                {% elif value != None %}
                     <property name="{{ name | snake_to_camel }}">
                         {{ bean(value) }}
                     </property>
diff --git 
a/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_abstract_test.py 
b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_abstract_test.py
new file mode 100644
index 00000000000..08e7fdbd610
--- /dev/null
+++ 
b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_abstract_test.py
@@ -0,0 +1,496 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+import os
+import difflib
+import json
+from copy import copy, deepcopy
+from time import sleep
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.kafka.kafka import KafkaSettings, KafkaService
+from ignitetest.services.utils.cdc.cdc_helper import CdcParams
+from ignitetest.services.utils.cdc.ignite_to_kafka_cdc_helper import 
IgniteToKafkaCdcHelper
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import 
IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.cache import 
CacheConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster, TcpDiscoveryVmIpFinder, \
+    TcpDiscoverySpi
+from ignitetest.services.utils.ssl.client_connector_configuration import 
ClientConnectorConfiguration
+from ignitetest.services.zk.zookeeper import ZookeeperSettings, 
ZookeeperService
+from ignitetest.utils.bean import Bean
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import IgniteVersion
+
+
+TEST_CACHE_NAME = "cdc-test-cache"
+JAVA_CLIENT_CLASS_NAME = 
"org.apache.ignite.internal.ducktest.tests.cdc.CdcContinuousUpdatesApplication"
+
+TEST_DURATION_SEC = 10
+RANGE = 5000
+DEFAULT_SERVER_NODES_COUNT = 2
+DEFAULT_CLIENT_NODES_COUNT = 1
+
+
+class CdcReplicationAbstractTest(IgniteTest):
+    def __init__(self, test_context):
+        super().__init__(test_context)
+
+        self.pds = True
+
+    """
+    Base class for CDC replication tests.
+    """
+    def run(self, ignite_version, mode, cdc_helper, cdc_params=None):
+        """
+        Run CDC replication test.
+
+        :param ignite_version: Ignite version.
+        :param mode: Active-passive or active-active mode.
+        :param cdc_helper: CDC helper for particular CDC streamer 
implementation.
+        :param cdc_params: CDC test parameters.
+        :return: CDC metrics if any.
+        """
+        if cdc_params is None:
+            cdc_params = CdcParams()
+
+        if cdc_params.caches is None:
+            cdc_params.caches = self.caches()
+
+        src_cluster = self.src_cluster(ignite_version)
+
+        dst_cluster = self.dst_cluster(ignite_version)
+
+        if mode == "active-active":
+            cdc_streamer_metrics = self.run_active_active(src_cluster, 
dst_cluster, cdc_helper, cdc_params,
+                                                          
self.default_active_active_load)
+        else:
+            cdc_streamer_metrics = self.run_active_passive(src_cluster, 
dst_cluster, cdc_helper, cdc_params,
+                                                           
self.default_active_passive_load)
+
+        self.logger.info(f"Cdc metrics:\n{json.dumps(cdc_streamer_metrics, 
indent=4)}")
+
+        if not self.check_partitions_are_same(src_cluster, dst_cluster):
+            raise AssertionError("Partitions are different in source and 
destination clusters.")
+
+        src_cluster.stop()
+
+        dst_cluster.stop()
+
+        return cdc_streamer_metrics
+
+    def run_active_passive(self, src_cluster, dst_cluster, cdc_helper, 
cdc_params, do_load=None):
+        """
+        Run test in active-passive mode. Client operations are performed 
against source cluster.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :param cdc_helper: CDC helper for particular CDC streamer 
implementation.
+        :param cdc_params: CDC test parameters.
+        :param do_load: lambda function to run client operations against 
source cluster. Use default if None.
+        :return: CDC metrics if any.
+        """
+        src_cluster_config = deepcopy(src_cluster.config)
+        dst_cluster_config = deepcopy(dst_cluster.config)
+
+        src_ctx = self.setup_active_passive(src_cluster, dst_cluster, 
cdc_helper, cdc_params)
+
+        if do_load is None:
+            self.default_active_passive_load(src_cluster, dst_cluster, 
src_cluster_config, dst_cluster_config)
+        else:
+            do_load(src_cluster, dst_cluster, src_cluster_config, 
dst_cluster_config)
+
+        cdc_helper.wait_cdc(src_ctx, no_new_events_period_secs=10, 
timeout_sec=300)
+
+        cdc_streamer_metrics = cdc_helper.stop_ignite_cdc(src_ctx, 
timeout_sec=300)
+
+        return cdc_streamer_metrics
+
+    def run_active_active(self, src_cluster, dst_cluster, cdc_helper, 
cdc_params, do_load=None):
+        """
+        Run test in active-active mode. Client operations are performed both 
against source and destination clusters.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :param cdc_helper: CDC helper for particular CDC streamer 
implementation.
+        :param cdc_params: CDC test parameters.
+        :param do_load: lambda function to run client operations against 
source cluster. Use default if None.
+        :return: CDC metrics if any.
+        """
+        src_cluster_config = deepcopy(src_cluster.config)
+        dst_cluster_config = deepcopy(dst_cluster.config)
+
+        src_ctx, dst_ctx = self.setup_active_active(src_cluster, dst_cluster, 
cdc_helper, cdc_params)
+
+        if do_load is None:
+            self.default_active_active_load(src_cluster, dst_cluster, 
src_cluster_config, dst_cluster_config)
+        else:
+            do_load(src_cluster, dst_cluster, src_cluster_config, 
dst_cluster_config)
+
+        cdc_helper.wait_cdc(src_ctx, no_new_events_period_secs=10, 
timeout_sec=300)
+        cdc_helper.wait_cdc(dst_ctx, no_new_events_period_secs=10, 
timeout_sec=300)
+
+        cdc_streamer_metrics = {
+            "src": cdc_helper.stop_ignite_cdc(src_ctx, timeout_sec=300),
+            "dst": cdc_helper.stop_ignite_cdc(dst_ctx, timeout_sec=300)
+        }
+
+        return cdc_streamer_metrics
+
+    def default_active_active_load(self, src_cluster, dst_cluster,
+                                   src_cluster_config, dst_cluster_config):
+        """
+        Default client operations against source and destination clusters.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :param src_cluster_config: Source cluster configuration.
+        :param dst_cluster_config: Destination cluster configuration.
+        """
+        client1 = IgniteApplicationService(
+            self.test_context,
+            self.client_config(src_cluster, src_cluster_config),
+            java_class_name=JAVA_CLIENT_CLASS_NAME,
+            num_nodes=DEFAULT_CLIENT_NODES_COUNT,
+            params={"cacheName": TEST_CACHE_NAME, "pacing": 1, "clusterCnt": 
2, "clusterIdx": 0, "range": RANGE},
+            modules=self.modules())
+
+        client2 = IgniteApplicationService(
+            self.test_context,
+            self.client_config(dst_cluster, dst_cluster_config),
+            java_class_name=JAVA_CLIENT_CLASS_NAME,
+            num_nodes=DEFAULT_CLIENT_NODES_COUNT,
+            params={"cacheName": TEST_CACHE_NAME, "pacing": 1, "clusterCnt": 
2, "clusterIdx": 1, "range": RANGE},
+            modules=self.modules())
+
+        client1.start()
+        client2.start()
+
+        sleep(TEST_DURATION_SEC)
+
+        client1.stop()
+        client2.stop()
+
+        assert int(client1.extract_result("putCnt")) > 0
+        assert int(client1.extract_result("removeCnt")) > 0
+
+        assert int(client2.extract_result("putCnt")) > 0
+        assert int(client2.extract_result("removeCnt")) > 0
+
+    def default_active_passive_load(self, src_cluster, dst_cluster,
+                                    src_cluster_config, dst_cluster_config):
+        """
+        Default client operations against source cluster.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :param src_cluster_config: Source cluster configuration.
+        :param dst_cluster_config: Destination cluster configuration.
+        """
+        client = IgniteApplicationService(
+            self.test_context,
+            self.client_config(src_cluster, src_cluster_config),
+            java_class_name=JAVA_CLIENT_CLASS_NAME,
+            num_nodes=DEFAULT_CLIENT_NODES_COUNT,
+            params={"cacheName": TEST_CACHE_NAME, "pacing": 5, "clusterCnt": 
1, "clusterIdx": 0, "range": RANGE},
+            modules=self.modules())
+
+        client.start()
+
+        sleep(TEST_DURATION_SEC)
+
+        client.stop()
+
+        assert int(client.extract_result("putCnt")) > 0
+        assert int(client.extract_result("removeCnt")) > 0
+
+    def check_partitions_are_same(self, src_cluster, dst_cluster):
+        """
+        Compare partitions on source and destination clusters.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :return: True if there is no any divergence between partitions on 
source and destination clusters.
+        """
+        src_cluster_dump = self.dump_partitions(src_cluster)
+
+        dst_cluster_dump = self.dump_partitions(dst_cluster)
+
+        if src_cluster_dump != dst_cluster_dump:
+            def diff(source, destination):
+                return "".join(difflib.unified_diff(
+                    source.splitlines(True),
+                    destination.splitlines(True),
+                    "source",
+                    "destination",
+                    n=1)
+                )
+
+            self.logger.debug("Partitions are different in source and 
destination clusters:\n"
+                              f"{diff(src_cluster_dump, dst_cluster_dump)}")
+            return False
+        else:
+            return True
+
+    def dump_partitions(self, cluster):
+        """
+        Dump partitions info skipping the cluster-specific fields.
+        Saves original dump file in the service log directory.
+
+        :param cluster: Cluster to dump partitions from.
+        """
+        dump_filename = 
ControlUtility(cluster).idle_verify_dump(cluster.nodes[0])
+
+        orig_dump_filename = os.path.join(cluster.log_dir, 
'idle_verify_dump_orig.txt')
+
+        cluster.nodes[0].account.ssh(f"mv {dump_filename} 
{orig_dump_filename}")
+
+        processed_dump_filename = os.path.join(cluster.log_dir, 
'idle_verify_dump.txt')
+
+        cluster.nodes[0].account.ssh(
+            f"cat {orig_dump_filename} | "
+            f"sed -E 's/, partVerHash=[-0-9]+]/]/g' | "
+            f"sed -E 's/ consistentId=[^,]+,//g' > "
+            f"{processed_dump_filename}")
+
+        return cluster.nodes[0].account.ssh_output(f"cat 
{processed_dump_filename}").decode("utf-8")
+
+    def setup_active_passive(self, src_cluster, dst_cluster, cdc_helper, 
cdc_params):
+        """
+        Setup active-passive replication.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :param cdc_helper: CDC helper for particular CDC streamer 
implementation.
+        :param cdc_params: CDC test parameters.
+        :return: CDC context.
+        """
+        enable_cdc(src_cluster)
+
+        setup_conflict_resolver(dst_cluster, "2", cdc_params)
+
+        dst_cluster.config.discovery_spi.prepare_on_start(cluster=dst_cluster)
+
+        ctx = cdc_helper.configure(src_cluster, dst_cluster, cdc_params)
+
+        dst_cluster.start()
+        ControlUtility(dst_cluster).activate()
+        self.on_dst_cluster_start(dst_cluster)
+
+        src_cluster.start()
+        ControlUtility(src_cluster).activate()
+        self.on_src_cluster_start(src_cluster)
+
+        cdc_helper.start_ignite_cdc(ctx)
+
+        return ctx
+
+    def setup_active_active(self, src_cluster, dst_cluster, cdc_helper, 
cdc_params):
+        """
+        Setup active-active replication.
+
+        :param src_cluster: Source cluster.
+        :param dst_cluster: Destination cluster.
+        :param cdc_helper: CDC helper for particular CDC streamer 
implementation.
+        :param cdc_params: CDC test parameters.
+        :return: CDC context.
+        """
+        enable_cdc(src_cluster)
+        enable_cdc(dst_cluster)
+
+        setup_conflict_resolver(src_cluster, "1", cdc_params)
+        setup_conflict_resolver(dst_cluster, "2", cdc_params)
+
+        src_cdc_params = copy(cdc_params)
+        dst_cdc_params = copy(cdc_params)
+
+        if isinstance(cdc_helper, IgniteToKafkaCdcHelper):
+            src_cdc_params.topic = src_cdc_params.topic + "-src-to-dst"
+            src_cdc_params.metadata_topic = src_cdc_params.metadata_topic + 
"-src-to-dst"
+
+            dst_cdc_params.topic = dst_cdc_params.topic + "-dst-to-src"
+            dst_cdc_params.metadata_topic = dst_cdc_params.metadata_topic + 
"-dst-to-src"
+
+        src_cluster.config.discovery_spi.prepare_on_start(cluster=src_cluster)
+        dst_cluster.config.discovery_spi.prepare_on_start(cluster=dst_cluster)
+
+        src_config_orig = deepcopy(src_cluster.config)
+
+        src_ctx = cdc_helper.configure(src_cluster, dst_cluster, 
src_cdc_params)
+        src_config_cdc_configured = src_cluster.config
+
+        src_cluster.config = src_config_orig
+        dst_ctx = cdc_helper.configure(dst_cluster, src_cluster, 
dst_cdc_params)
+
+        src_cluster.config = src_config_cdc_configured
+
+        src_cluster.start()
+        ControlUtility(src_cluster).activate()
+        self.on_src_cluster_start(src_cluster)
+
+        dst_cluster.start()
+        ControlUtility(dst_cluster).activate()
+        self.on_dst_cluster_start(dst_cluster)
+
+        cdc_helper.start_ignite_cdc(src_ctx)
+        cdc_helper.start_ignite_cdc(dst_ctx)
+
+        return src_ctx, dst_ctx
+
+    def on_src_cluster_start(self, src_cluster):
+        """
+        To be overriden to perform some actions on source cluster start.
+
+        :param src_cluster: Source cluster.
+        """
+        pass
+
+    def on_dst_cluster_start(self, dst_cluster):
+        """
+        To be overriden to perform some actions on destination cluster start.
+
+        :param dst_cluster: Destination cluster.
+        """
+        pass
+
+    def caches(self):
+        """
+        :return: List of caches to be used in the test.
+        """
+        return [TEST_CACHE_NAME]
+
+    def start_kafka(self, kafka_nodes, zk_nodes=1):
+        """
+        Start Kafka and Zookeeper.
+
+        :param kafka_nodes: Kafka nodes count.
+        :param zk_nodes: Zookeeper nodes count.
+        :return: Zookeeper and Kafka services.
+        """
+        zk_settings = ZookeeperSettings()
+        zk = ZookeeperService(self.test_context, zk_nodes, 
settings=zk_settings)
+
+        kafka_settings = 
KafkaSettings(zookeeper_connection_string=zk.connection_string())
+        kafka = KafkaService(self.test_context, kafka_nodes, 
settings=kafka_settings)
+
+        zk.start_async()
+        kafka.start()
+
+        return zk, kafka
+
+    def stop_kafka(self, zk, kafka):
+        """
+        Stop Kafka and Zookeeper.
+        """
+        kafka.stop(force_stop=False, allow_fail=True)
+
+        zk.stop(force_stop=False)
+
+    def src_cluster(self, ignite_version):
+        """
+        :return: Source cluster service.
+        """
+        return IgniteService(self.test_context, 
self.ignite_config(ignite_version, "src"),
+                             DEFAULT_SERVER_NODES_COUNT, 
modules=self.modules())
+
+    def dst_cluster(self, ignite_version):
+        """
+        :return: Destination cluster service.
+        """
+        return IgniteService(self.test_context, 
self.ignite_config(ignite_version, "dst"),
+                             DEFAULT_SERVER_NODES_COUNT, 
modules=self.modules())
+
+    def modules(self):
+        """
+        :return: List of modules to be used in the test.
+        """
+        return ["cdc-ext"]
+
+    def client_config(self, cluster, config):
+        """
+        Create client node configuration to run aginst cluster passed.
+
+        :param cluster: Cluster Ignite service.
+        :param config: Cluster Ignite configuration.
+        :return: Client node ignite configuration.
+        """
+        return config._replace(
+            client_mode=True,
+            data_storage=None,
+            ext_beans=[],
+            discovery_spi=from_ignite_cluster(cluster)
+        )
+
+    def ignite_config(self, ignite_version, ignite_instance_name):
+        """
+        Create ignite configuration for server nodes.
+
+        :param ignite_version: Ignite version.
+        :param ignite_instance_name: Ignite instance name.
+        :return: Server node ignite configuration.
+        """
+        config = IgniteConfiguration(
+            discovery_spi=TcpDiscoverySpi(ip_finder=TcpDiscoveryVmIpFinder()),
+            ignite_instance_name=ignite_instance_name,
+            version=IgniteVersion(ignite_version),
+            data_storage=DataStorageConfiguration(),
+            caches=[CacheConfiguration(name=TEST_CACHE_NAME)],
+            client_connector_configuration=ClientConnectorConfiguration()
+        )
+
+        if self.pds:
+            config = config._replace(
+                data_storage=config.data_storage._replace(
+                    default=config.data_storage.default._replace(
+                        persistence_enabled=True
+                    )
+                )
+            )
+
+        return config
+
+
+def enable_cdc(cluster):
+    """
+    Enable CDC on cluster. Changes cluster config in place.
+
+    :param cluster: Ignite cluster.
+    """
+    cluster.config = cluster.config._replace(
+        data_storage=cluster.config.data_storage._replace(
+            default=cluster.config.data_storage.default._replace(
+                cdc_enabled=True
+            )
+        )
+    )
+
+
+def setup_conflict_resolver(cluster, cluster_id, cdc_params: CdcParams):
+    """
+    Setup conflict resolver plugin. Changes cluster config in place.
+
+    :param cluster: Ignite cluster.
+    :param cluster_id: Cluster ID.
+    :param cdc_params: CDC test parameters.
+    """
+    cluster.config = cluster.config._replace(
+        plugins=[*cluster.config.plugins,
+                 ('bean.j2',
+                  
Bean("org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider",
+                       cluster_id=cluster_id,
+                       
conflict_resolve_field=cdc_params.conflict_resolve_field,
+                       caches=cdc_params.caches))],
+    )
diff --git 
a/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_test.py 
b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_test.py
new file mode 100644
index 00000000000..43d48524b75
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_test.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+from ducktape.mark import defaults
+
+from ignitetest.services.utils import IgniteServiceType
+from ignitetest.services.utils.cdc.ignite_to_ignite_cdc_helper import 
IgniteToIgniteCdcHelper
+from ignitetest.services.utils.cdc.ignite_to_kafka_cdc_helper import 
KafkaCdcParams, \
+    IgniteToKafkaCdcHelper
+from ignitetest.services.utils.cdc.ignite_to_ignite_client_cdc_helper import 
IgniteToIgniteClientCdcHelper
+from ignitetest.tests.cdc.cdc_replication_abstract_test import 
CdcReplicationAbstractTest
+from ignitetest.utils import cluster, ignite_versions
+from ignitetest.utils.version import DEV_BRANCH
+
+WAL_FORCE_ARCHIVE_TIMEOUT_MS = 100
+
+
+class CdcReplicationTest(CdcReplicationAbstractTest):
+    """
+    CDC replication tests.
+    """
+    @cluster(num_nodes=6)
+    @ignite_versions(str(DEV_BRANCH))
+    @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+    def ignite_to_ignite_test(self, ignite_version, pds, mode):
+        self.pds = pds
+
+        return self.run(ignite_version, mode, IgniteToIgniteCdcHelper())
+
+    @cluster(num_nodes=6)
+    @ignite_versions(str(DEV_BRANCH))
+    @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+    def ignite_to_ignite_client_test(self, ignite_version, pds, mode):
+        self.pds = pds
+
+        return self.run(ignite_version, mode, IgniteToIgniteClientCdcHelper())
+
+    @cluster(num_nodes=10)
+    @ignite_versions(str(DEV_BRANCH))
+    @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+    def ignite_to_kafka_to_ignite_test(self, ignite_version, pds, mode):
+        self.pds = pds
+
+        zk, kafka = self.start_kafka(kafka_nodes=1)
+
+        res = self.run(ignite_version, mode, IgniteToKafkaCdcHelper(), 
KafkaCdcParams(kafka=kafka))
+
+        self.stop_kafka(zk, kafka)
+
+        return res
+
+    @cluster(num_nodes=10)
+    @ignite_versions(str(DEV_BRANCH))
+    @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+    def ignite_to_kafka_to_ignite_client_test(self, ignite_version, pds, mode):
+        self.pds = pds
+
+        zk, kafka = self.start_kafka(kafka_nodes=1)
+
+        res = self.run(ignite_version, mode, IgniteToKafkaCdcHelper(), 
KafkaCdcParams(
+            kafka=kafka,
+            kafka_to_ignite_client_type=IgniteServiceType.THIN_CLIENT
+        ))
+
+        self.stop_kafka(zk, kafka)
+
+        return res
+
+    def ignite_config(self, ignite_version, ignite_instance_name):
+        cfg = super().ignite_config(ignite_version, ignite_instance_name)
+
+        cfg = cfg._replace(
+            data_storage=cfg.data_storage._replace(
+                wal_force_archive_timeout=WAL_FORCE_ARCHIVE_TIMEOUT_MS
+            )
+        )
+
+        return cfg

Reply via email to