This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3084e45e6d6 IGNITE-26371 [ducktests] Add CDC extensions tests (#12356)
3084e45e6d6 is described below
commit 3084e45e6d6a6c8db6a3e3eb39167a389903adb0
Author: Sergey Korotkov <[email protected]>
AuthorDate: Mon Oct 13 19:30:28 2025 +0700
IGNITE-26371 [ducktests] Add CDC extensions tests (#12356)
---
modules/ducktests/README.md | 19 +
.../tests/cdc/CdcContinuousUpdatesApplication.java | 98 ++++
modules/ducktests/tests/docker/Dockerfile | 2 +-
modules/ducktests/tests/docker/ducker-ignite | 8 +
.../ignitetest/services/utils/cdc/cdc_helper.py | 233 ++++++++++
.../utils/cdc/ignite_to_ignite_cdc_helper.py | 83 ++++
.../cdc/ignite_to_ignite_client_cdc_helper.py | 65 +++
.../utils/cdc/ignite_to_kafka_cdc_helper.py | 185 ++++++++
.../services/utils/cdc/kafka_to_ignite.py | 232 ++++++++++
.../services/utils/cdc/templates/ignite_cdc.j2 | 44 ++
.../cdc/templates/ignite_to_ignite_cdc_streamer.j2 | 44 ++
.../ignite_to_ignite_client_cdc_streamer.j2 | 43 ++
.../cdc/templates/ignite_to_kafka.properties.j2 | 19 +
.../cdc/templates/ignite_to_kafka_cdc_streamer.j2 | 24 +
.../cdc/templates/kafka_to_ignite.properties.j2 | 22 +
.../utils/cdc/templates/properties_macro.j2 | 20 +
.../ignitetest/services/utils/config_template.py | 3 +-
.../tests/ignitetest/services/utils/ignite_spec.py | 10 +-
.../services/utils/templates/misc_macro.j2 | 2 +-
.../tests/cdc/cdc_replication_abstract_test.py | 496 +++++++++++++++++++++
.../ignitetest/tests/cdc/cdc_replication_test.py | 90 ++++
21 files changed, 1738 insertions(+), 4 deletions(-)
diff --git a/modules/ducktests/README.md b/modules/ducktests/README.md
index 96c7708e69b..2bea2c532af 100644
--- a/modules/ducktests/README.md
+++ b/modules/ducktests/README.md
@@ -11,6 +11,23 @@ Structure of the `tests` directory is:
- `./ignitetest/tests` contains tests.
- `./checks` contains unit tests of utils, tests' decorators etc.
+Some tests (like the CDC replication ones) require modules maintained in the
+separate [ignite-extensions](https://github.com/apache/ignite-extensions)
repository.
+
+To run these tests the `ignite-extensions` working directory should be checked
out
+to the same filesystem level next to the `ignite` one. So it should be the
below structure
+of directories:
+- `./ignite` working directory checked out from the ignite repository
(mentioned as `${IGNITE_HOME}` below)
+- `./ignite-extensions` working directory checked out from the
ignite-extensions repository
+
+The needed extension module should be built before tests run.
+
+For example for the CDC replication tests the `cdc-ext` module should be built
as:
+```
+cd ${IGNITE_HOME}\..\ignite-extensions
+mvn clean package -pl :ignite-cdc-ext -Pskip-docs -DskipTests
+```
+
# Local run
Docker is used to emulate distributed environment. Single container represents
a running node.
@@ -22,6 +39,7 @@ For development process requirements are `python` >= 3.7.
## Run tests
- Change a current directory to`${IGNITE_HOME}`
- Build Apache IGNITE invoking `${IGNITE_HOME}/scripts/build-module.sh
ducktests`
+- (Optionally) Build the needed extension modules in the
`${IGNITE_HOME}\..\ignite-extensions` directory
- Change a current directory to `${IGNITE_HOME}/modules/ducktests/tests`
- Run tests in docker containers using a following command:
```
@@ -50,6 +68,7 @@ Custom cluster, Vagrant, K8s, Mesos, Docker, cloud providers,
etc.
## Run tests
- Change a current directory to`${IGNITE_HOME}`
- Build Apache IGNITE invoking `${IGNITE_HOME}/scripts/build-module.sh
ducktests`
+- (Optionally) Build the needed extension modules in the
`${IGNITE_HOME}\..\ignite-extensions` directory
- Run tests using
[Ducktape](https://ducktape-docs.readthedocs.io/en/latest/run_tests.html). \
For example:
```
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CdcContinuousUpdatesApplication.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CdcContinuousUpdatesApplication.java
new file mode 100644
index 00000000000..cc6c28b835e
--- /dev/null
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CdcContinuousUpdatesApplication.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cdc;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Application continuously writes to cache spreading keys among clusters.
+ */
+public class CdcContinuousUpdatesApplication extends IgniteAwareApplication {
+ /** {@inheritDoc} */
+ @Override protected void run(JsonNode jsonNode) throws Exception {
+ String cacheName = jsonNode.get("cacheName").asText();
+
+ int range = jsonNode.get("range").asInt();
+
+ int clusterIdx = jsonNode.get("clusterIdx").asInt();
+
+ int clusterCnt = Optional.ofNullable(jsonNode.get("clusterCnt"))
+ .map(JsonNode::asInt)
+ .orElse(2);
+
+ long pacing = Optional.ofNullable(jsonNode.get("pacing"))
+ .map(JsonNode::asLong)
+ .orElse(0L);
+
+ assert clusterIdx < clusterCnt;
+
+ log.info("Test props:" +
+ " cacheName=" + cacheName +
+ " range=" + range +
+ " clusterCnt=" + clusterCnt +
+ " clusterIdx=" + clusterIdx);
+
+ IgniteCache<Integer, UUID> cache = ignite.getOrCreateCache(cacheName);
+ log.info("Node name: " + ignite.name() + " starting cache
operations.");
+
+ markInitialized();
+
+ long putCnt = 0;
+
+ while (!terminated()) {
+ int key = ThreadLocalRandom.current().nextInt(range);
+
+ if (key % clusterCnt != clusterIdx)
+ continue;
+
+ UUID uuid = UUID.randomUUID();
+
+ cache.put(key, uuid);
+
+ putCnt++;
+
+ Thread.sleep(pacing);
+ }
+
+ long removeCnt = 0;
+
+ for (int i = 0; i < range / 4; i++) {
+ int key = ThreadLocalRandom.current().nextInt(range);
+
+ if (key % clusterCnt != clusterIdx)
+ continue;
+
+ cache.remove(key);
+
+ removeCnt++;
+ }
+
+ log.info("Finished cache operations [node=" + ignite.name() + ",
putCnt=" + putCnt + ", removeCnt=" + removeCnt + "]");
+
+ recordResult("putCnt", putCnt);
+ recordResult("removeCnt", removeCnt);
+
+ markFinished();
+ }
+}
diff --git a/modules/ducktests/tests/docker/Dockerfile
b/modules/ducktests/tests/docker/Dockerfile
index e6880e37115..72ea79a38f6 100644
--- a/modules/ducktests/tests/docker/Dockerfile
+++ b/modules/ducktests/tests/docker/Dockerfile
@@ -111,4 +111,4 @@ USER ducker
CMD sudo service ssh start && tail -f /dev/null
# Container port exposure
-EXPOSE 11211 47100 47500 49112 10800 8080 2888 3888 2181 1098 8082
+EXPOSE 11211 47100 47500 49112 10800 8080 2888 3888 2181 1098 8082 8083
diff --git a/modules/ducktests/tests/docker/ducker-ignite
b/modules/ducktests/tests/docker/ducker-ignite
index 8872f61dd19..10d2b8f7a6d 100755
--- a/modules/ducktests/tests/docker/ducker-ignite
+++ b/modules/ducktests/tests/docker/ducker-ignite
@@ -304,6 +304,13 @@ docker_run() {
done
fi
+ # Mount the ignite-extensions sources directory if it exists.
+ local mount_ignite_ext=""
+ if [[ -d "${ignite_dir}/../ignite-extensions" ]]; then
+ local ignite_ext_dir="$( cd "${ignite_dir}/../ignite-extensions" &&
pwd )"
+ mount_ignite_ext="--mount
type=bind,source="${ignite_ext_dir}",target=/opt/ignite-extensions,consistency=delegated"
+ fi
+
# Invoke docker-run. We need privileged mode to be able to run iptables
# and mount FUSE filesystems inside the container. We also need it to
# run iptables inside the container.
@@ -315,6 +322,7 @@ docker_run() {
--memory=${docker_run_memory_limit} \
--memory-swappiness=1 \
--mount
type=bind,source="${ignite_dir}",target=/opt/ignite-dev,consistency=delegated \
+ "${mount_ignite_ext}" \
$DOCKER_OPTIONS \
--name "${node}" \
-- "${image_name}"
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/cdc_helper.py
b/modules/ducktests/tests/ignitetest/services/utils/cdc/cdc_helper.py
new file mode 100644
index 00000000000..9965dbde21f
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/cdc_helper.py
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+"""
+This module contains base helper class for CDC configuration with different
+CDC consumers implemented in ignite extensions.
+"""
+
+import time
+from typing import NamedTuple
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+
+from ignitetest.services.utils.cdc.ignite_cdc import IgniteCdcUtility
+from ignitetest.services.utils.jmx_utils import JmxClient
+
+
+class CdcParams:
+ """
+ CDC parameters.
+ """
+ def __init__(self, caches=None, max_batch_size=None, only_primary=None,
+ conflict_resolve_field=None, cdc_configuration=None):
+ self.caches = caches
+ self.max_batch_size = max_batch_size
+ self.only_primary = only_primary
+ self.conflict_resolve_field = conflict_resolve_field
+
+ self.cdc_configuration = CdcConfiguration() if cdc_configuration is
None else cdc_configuration
+
+
+class CdcConfiguration(NamedTuple):
+ """
+ CDC configuration template parameters.
+ """
+ check_frequency: int = None
+ keep_binary: bool = None
+ lock_timeout: int = None
+ metric_exporter_spi: set = None
+
+
+class CdcContext:
+ """
+ CDC context.
+
+ Keeps information about CDC configuration needed to control the CDC
(start, stop, wait etc.).
+ In particular stores references to ignite-cdc and source cluster services.
+ Different CDC extension helpers may add more fields.
+ """
+ def __init__(self):
+ self.cdc_params = None
+ self.ignite_cdc = None
+ self.source_cluster = None
+
+
+class CdcHelper:
+ """
+ Base CDC helper class for different CDC consumer extensions.
+ """
+ def configure(self, src_cluster, dst_cluster, cdc_params):
+ """
+ Configures the CDC. Updates the src_cluster service in place.
+
+ May be overridden by subclasses.
+
+ :param src_cluster: Ignite service for the source cluster.
+ :param dst_cluster: Ignite service for the destination cluster.
+ :param cdc_params: CDC test parameters.
+
+ :return: CDC context
+ """
+ ctx = CdcContext()
+
+ ctx.cdc_params = cdc_params
+ ctx.source_cluster = src_cluster
+
+ beans = self.get_src_cluster_cdc_ext_beans(src_cluster, dst_cluster,
cdc_params, ctx)
+
+ src_cluster.config = src_cluster.config._replace(
+ ext_beans=[
+ *src_cluster.config.ext_beans,
+ *beans
+ ]
+ )
+
+ ctx.ignite_cdc = IgniteCdcUtility(src_cluster)
+
+ return ctx
+
+ def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster,
cdc_params, ctx):
+ """
+ Returns list of CDC beans required to be created in the source Ignite
cluster.
+ May update the CDC contex in place.
+
+ Supposed to be overridden and extended by subclasses.
+
+ :param src_cluster: Ignite service for the source cluster.
+ :param dst_cluster: Ignite service for the destination cluster.
+ :param cdc_params: CDC test parameters.
+ :param ctx: CDC context.
+ :return: List of beans. Each bean is represented as a pair of j2
template and params instance.
+ """
+ if cdc_params.cdc_configuration.metric_exporter_spi is None:
+ if src_cluster.config.metric_exporters is None:
+ src_cluster.config.metric_exporters = set()
+
+
src_cluster.config.metric_exporters.add("org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi")
+
+ cdc_params.cdc_configuration = cdc_params.cdc_configuration._replace(
+ metric_exporter_spi=src_cluster.config.metric_exporters
+ )
+
+ return [("ignite_cdc.j2", cdc_params.cdc_configuration)]
+
+ def start_ignite_cdc(self, ctx):
+ """
+ Starts CDC.
+
+ Supposed to be overridden and extended by subclasses.
+ Default implementation starts process executing the CDC consumer
(ignite_cdc.sh).
+
+ :param ctx: CDC context.
+ """
+ ctx.ignite_cdc.start()
+
+ def stop_ignite_cdc(self, ctx, timeout_sec):
+ """
+ Stops CDC.
+
+ Supposed to be overridden and extended by subclasses.
+ Default implementation stops process executing the CDC consumer
(ignite_cdc.sh).
+
+ :param ctx: CDC context.
+ :param timeout_sec: Timeout.
+ :return: Arbitrary CDC consumer specific metrics (if any).
+ """
+ ctx.ignite_cdc.stop()
+
+ try:
+ ctx.source_cluster.await_event("WalRecordsConsumer stopped",
+ timeout_sec=timeout_sec,
from_the_beginning=True,
+ log_file="ignite-cdc.log")
+ except TimeoutError:
+ ctx.ignite_cdc.stop(force_stop=True)
+
+ return {}
+
+ def wait_cdc(self, ctx, no_new_events_period_secs, timeout_sec):
+ """
+ Waits all events are processed by the CDC streamer.
+
+ It's considered that all events are processed if no new events are
processed
+ for last 'no_new_events_period_secs' seconds.
+
+ :param ctx: CDC context.
+ :param no_new_events_period_secs: Time period to wait for new events.
+ :param timeout_sec: Timeout.
+ """
+ wait_ignite_cdc_service(ctx.ignite_cdc, no_new_events_period_secs,
timeout_sec)
+
+
+def wait_ignite_cdc_service(ignite_cdc, no_new_events_period_secs,
timeout_sec):
+ """
+ Waits all events are processed by the CDC streamer in service passed.
+
+ It's considered that all events are processed if no new events are
processed
+ for last 'no_new_events_period_secs' seconds.
+
+ :param ignite_cdc: Service running the CDC.
+ :param no_new_events_period_secs: Time period to wait for new events.
+ :param timeout_sec: Timeout.
+ """
+ start = time.time()
+ end = start + timeout_sec
+
+ while True:
+ now = time.time()
+ if now > end:
+ raise TimeoutError(f"Timed out waiting {timeout_sec} seconds for
ignite_cdc.sh to stream all data.")
+
+ last = last_ignite_cdc_event_time(ignite_cdc)
+
+ if last + no_new_events_period_secs < now:
+ return
+ else:
+ time.sleep(1)
+
+
+def last_ignite_cdc_event_time(ignite_cdc):
+ """
+ Requests timestamp (unix time in seconds) of the last CDC event processed
+ by the CDC streamer in service passed.
+
+ :param ignite_cdc: Service running the CDC.
+ :return: Timestamp of the last CDC event (unix time in seconds).
+ """
+ def last_event_time_on(node):
+ jmx_client = JmxClient(node)
+
+ if isinstance(ignite_cdc, IgniteCdcUtility):
+ main_java_class = ignite_cdc.APP_SERVICE_CLASS
+ else:
+ main_java_class = ignite_cdc.main_java_class
+
+ pids = ignite_cdc.pids(node, main_java_class)
+
+ if len(pids) == 0:
+ raise AssertionError("ignite_cdc java process is not found on
node: " + node.account.hostname)
+
+ jmx_client.pid = pids[0]
+
+ try:
+ mbean = jmx_client.find_mbean('.*name=cdc.*')
+
+ return int(next(mbean.LastEventTime).strip())
+ except (StopIteration, RemoteCommandError):
+ ignite_cdc.logger.warn("Filed to read LastEventTime metric from
ignite_cdc, node: " + node.account.hostname)
+
+ return -1
+
+ return max([last_event_time_on(node) for node in ignite_cdc.nodes]) / 1_000
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_cdc_helper.py
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_cdc_helper.py
new file mode 100644
index 00000000000..b41fb604e92
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_cdc_helper.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+from typing import NamedTuple
+
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.cdc.cdc_helper import CdcHelper, CdcParams
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
+from ignitetest.services.utils.metrics.metrics import OPENCENSUS_TEMPLATE_FILE
+
+
+class IgniteToIgniteCdcHelper(CdcHelper):
+ """
+ CDC helper for the IgniteToIgniteCdcStreamer.
+ """
+ def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster,
cdc_params, ctx):
+ beans: list = super().get_src_cluster_cdc_ext_beans(src_cluster,
dst_cluster, cdc_params, ctx)
+
+ dst_cluster_client_config = dst_cluster.config._replace(
+ client_mode=True,
+ ssl_params=None,
+ ext_beans=[],
+ data_storage=None,
+ )
+
+ dummy_client = IgniteApplicationService(dst_cluster.context,
+ dst_cluster_client_config,
+ java_class_name="")
+ dst_cluster_client_config =
dummy_client.spec.extend_config(dst_cluster_client_config)
+
+ remove_bean_by_template_name(dst_cluster_client_config.ext_beans,
OPENCENSUS_TEMPLATE_FILE)
+
+ dst_cluster_client_config =
dst_cluster_client_config._replace(metric_exporters={})
+
+ dummy_client.free()
+
+ params = IgniteToIgniteCdcStreamerTemplateParams(
+ dst_cluster,
+ dst_cluster_client_config,
+ cdc=cdc_params
+ )
+
+ beans.append((
+ "ignite_to_ignite_cdc_streamer.j2",
+ params
+ ))
+
+ return beans
+
+
+def remove_bean_by_template_name(beans, template_name):
+ """
+ Removes bean from the list.
+
+ :param beans: List of beans. Bean is a tuple (template_name, params).
+ :param template_name: Template file name.
+ """
+ bean = next((b for b in beans if b[0] == template_name), None)
+
+ if bean:
+ beans.remove(bean)
+
+
+class IgniteToIgniteCdcStreamerTemplateParams(NamedTuple):
+ """
+ IgniteToIgniteCdcStreamer template parameters.
+ """
+ dst_cluster: IgniteAwareService
+ dst_cluster_client_config: IgniteConfiguration
+ cdc: CdcParams
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_client_cdc_helper.py
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_client_cdc_helper.py
new file mode 100644
index 00000000000..4180512d985
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_ignite_client_cdc_helper.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+from typing import NamedTuple
+
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.cdc.cdc_helper import CdcHelper, CdcParams
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.utils.ignite_configuration import
IgniteThinClientConfiguration
+
+
+class IgniteToIgniteClientCdcHelper(CdcHelper):
+ """
+ CDC helper for the IgniteToIgniteClientCdcStreamer.
+ """
+ def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster,
cdc_params, ctx):
+ beans: list = super().get_src_cluster_cdc_ext_beans(src_cluster,
dst_cluster, cdc_params, ctx)
+
+ addresses = [dst_cluster.nodes[0].account.hostname + ":" +
+
str(dst_cluster.config.client_connector_configuration.port)]
+
+ dst_cluster_client_config = IgniteThinClientConfiguration(
+ addresses=addresses,
+ version=dst_cluster.config.version)
+
+ dummy_client = IgniteApplicationService(dst_cluster.context,
+ dst_cluster_client_config,
+ java_class_name="")
+ dst_cluster_client_config =
dummy_client.spec.extend_config(dst_cluster_client_config)
+
+ dummy_client.free()
+
+ params = IgniteToIgniteClientCdcStreamerTemplateParams(
+ dst_cluster,
+ dst_cluster_client_config,
+ cdc=cdc_params
+ )
+
+ beans.append((
+ "ignite_to_ignite_client_cdc_streamer.j2",
+ params
+ ))
+
+ return beans
+
+
+class IgniteToIgniteClientCdcStreamerTemplateParams(NamedTuple):
+ """
+ IgniteToIgniteClientCdcStreamer template parameters.
+ """
+ dst_cluster: IgniteAwareService
+ dst_cluster_client_config: IgniteThinClientConfiguration
+ cdc: CdcParams
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_kafka_cdc_helper.py
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_kafka_cdc_helper.py
new file mode 100644
index 00000000000..5aaf1314d8d
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_to_kafka_cdc_helper.py
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+import os
+import time
+
+from ignitetest.services.utils import IgniteServiceType
+from ignitetest.services.utils.cdc.cdc_helper import CdcHelper, CdcParams
+from ignitetest.services.utils.cdc.kafka_to_ignite import
KafkaToIgniteService, KafkaPropertiesTemplate
+from ignitetest.utils.bean import BeanRef, Bean
+
+DEFAULT_KAFKA_PARTITIONS_COUNT = 16
+DEFAULT_KAFKA_TO_IGNITE_NODES = 1
+DEFAULT_KAFKA_TO_IGNITE_THREAD_COUNT = 8
+DEFAULT_KAFKA_TOPIC = "ignite"
+DEFAULT_KAFKA_METADATA_TOPIC = "ignite-metadata"
+
+
+class IgniteToKafkaCdcHelper(CdcHelper):
+ """
+ CDC helper for IgniteToKafkaCdcStreamer.
+ """
+ def configure(self, src_cluster, dst_cluster, cdc_params):
+ ctx = super().configure(src_cluster, dst_cluster, cdc_params)
+
+ src_cluster.spec = get_ignite_to_kafka_spec(src_cluster.spec.__class__,
+
cdc_params.kafka.connection_string(),
+ src_cluster)
+ return ctx
+
+ def get_src_cluster_cdc_ext_beans(self, src_cluster, dst_cluster,
cdc_params, ctx):
+ ctx.kafka_to_ignite = KafkaToIgniteService(
+ dst_cluster.context,
+ cdc_params.kafka,
+ dst_cluster,
+ cdc_params=cdc_params,
+ jvm_opts=dst_cluster.spec.jvm_opts,
+ merge_with_default=True,
+ modules=dst_cluster.modules
+ )
+
+ beans = super().get_src_cluster_cdc_ext_beans(src_cluster,
dst_cluster, cdc_params, ctx)
+
+ beans.append((
+ "ignite_to_kafka_cdc_streamer.j2",
+ Bean("org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer",
+ bean_id="cdcConsumer",
+ kafka_properties=BeanRef("kafkaProperties"),
+ caches=cdc_params.caches,
+ max_batch_size=cdc_params.max_batch_size,
+ only_primary=cdc_params.only_primary,
+ kafka_partitions=cdc_params.kafka_partitions,
+ kafka_request_timeout=cdc_params.kafka_request_timeout,
+ metadata_topic=cdc_params.metadata_topic,
+ topic=cdc_params.topic)
+ ))
+
+ return beans
+
+ def start_ignite_cdc(self, ctx):
+ ctx.cdc_params.kafka.create_topic(
+ name=ctx.cdc_params.topic,
+ partitions=ctx.cdc_params.kafka_partitions,
+ retention_ms=ctx.cdc_params.kafka_retention_ms)
+
+ ctx.cdc_params.kafka.create_topic(
+ name=ctx.cdc_params.metadata_topic,
+ partitions=ctx.cdc_params.kafka_partitions)
+
+ ctx.kafka_to_ignite.start()
+
+ super().start_ignite_cdc(ctx)
+
+ def stop_ignite_cdc(self, ctx, timeout_sec):
+ super().stop_ignite_cdc(ctx, timeout_sec)
+
+ start = time.time()
+
+ ctx.kafka_to_ignite.await_all_consumed(timeout_sec)
+
+ kafka_to_ignite_lag_sec = time.time() - start
+
+ ctx.kafka_to_ignite.stop()
+
+ metrics = {
+ "kafka_to_ignite_lag_sec": kafka_to_ignite_lag_sec
+ }
+
+ return metrics
+
+
+def get_ignite_to_kafka_spec(base, kafka_connection_string, service):
+ """
+ Dynamically create IgniteSpec subclass to run IgniteToKafkaCdcStreamer in
scope of service provided.
+
+ :param base: Base IgniteSpec class.
+ :param kafka_connection_string: Kafka connection string.
+ :param service: IgniteService.
+ :return: IgniteSpec instance for IgniteToKafkaCdcStreamer.
+ """
+ class IgniteToKafkaSpec(base):
+ def libs(self):
+ libs = super().libs()
+
+ libs.extend([os.path.join(self.service.config_dir)])
+
+ return libs
+
+ def config_templates(self):
+ templates = super().config_templates()
+
+ templates.extend([
+ ("kafka.properties",
KafkaPropertiesTemplate("ignite_to_kafka.properties.j2", {
+ "kafka_connection_string": kafka_connection_string
+ }))])
+
+ return templates
+
+ return IgniteToKafkaSpec(service, service.spec.jvm_opts,
merge_with_default=True)
+
+
+class KafkaCdcParams(CdcParams):
+ """
+ Parameters for Kafka CDC.
+ """
+ def __init__(self,
+ kafka=None,
+ kafka_partitions=None,
+ kafka_request_timeout=None,
+ topic=None,
+ metadata_topic=None,
+ kafka_to_ignite_client_type=None,
+ kafka_to_ignite_nodes=None,
+ kafka_to_ignite_thread_count=None,
+ kafka_to_ignite_max_batch_size=None,
+ kafka_to_ignite_metadata_consumer_group=None,
+ kafka_to_ignite_kafka_consumer_poll_timeout=None,
+ kafka_to_ignite_kafka_request_timeout=None,
+ kafka_retention_ms=None,
+ **kwargs):
+ super().__init__(**kwargs)
+
+ self.kafka = kafka
+
+ self.kafka_partitions = kafka_partitions \
+ if kafka_partitions is not None else DEFAULT_KAFKA_PARTITIONS_COUNT
+
+ self.kafka_request_timeout = kafka_request_timeout
+
+ self.topic = topic \
+ if topic is not None else DEFAULT_KAFKA_TOPIC
+
+ self.metadata_topic = metadata_topic \
+ if metadata_topic is not None else DEFAULT_KAFKA_METADATA_TOPIC
+
+ self.kafka_retention_ms = kafka_retention_ms
+
+ self.kafka_to_ignite_client_type = kafka_to_ignite_client_type \
+ if kafka_to_ignite_client_type is not None else
IgniteServiceType.NODE
+
+ self.kafka_to_ignite_nodes = kafka_to_ignite_nodes \
+ if kafka_to_ignite_nodes is not None else
DEFAULT_KAFKA_TO_IGNITE_NODES
+
+ self.kafka_to_ignite_thread_count = kafka_to_ignite_thread_count \
+ if kafka_to_ignite_thread_count is not None else
DEFAULT_KAFKA_TO_IGNITE_THREAD_COUNT
+
+ self.kafka_to_ignite_max_batch_size = kafka_to_ignite_max_batch_size
+
+ self.kafka_to_ignite_metadata_consumer_group =
kafka_to_ignite_metadata_consumer_group
+
+ self.kafka_to_ignite_kafka_consumer_poll_timeout =
kafka_to_ignite_kafka_consumer_poll_timeout
+
+ self.kafka_to_ignite_kafka_request_timeout =
kafka_to_ignite_kafka_request_timeout
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/kafka_to_ignite.py
b/modules/ducktests/tests/ignitetest/services/utils/cdc/kafka_to_ignite.py
new file mode 100644
index 00000000000..1ffb501ecd1
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/kafka_to_ignite.py
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+"""
+This module contains kafka-to-ignite.sh utility wrapper.
+"""
+import os
+import time
+from copy import deepcopy
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils import IgniteServiceType
+from ignitetest.services.utils.config_template import ConfigTemplate
+from ignitetest.services.utils.ignite_configuration import
IgniteThinClientConfiguration
+from ignitetest.services.utils.ignite_spec import envs_to_exports
+from ignitetest.utils.bean import Bean
+
+
+class KafkaToIgniteService(IgniteService):
+ """
+ Kafka to Ignite utility (kafka-to-ignite.sh) wrapper.
+ """
+ def __init__(self, context, kafka, dst_cluster, cdc_params, jvm_opts=None,
+ merge_with_default=True, startup_timeout_sec=60,
shutdown_timeout_sec=60, modules=None):
+ def add_cdc_ext_module(_modules):
+ if _modules:
+ _modules.append("cdc-ext")
+ else:
+ _modules = ["cdc-ext"]
+
+ return _modules
+
+ assert cdc_params.kafka_to_ignite_nodes <=
cdc_params.kafka_partitions, \
+ (f"number of nodes ({cdc_params.kafka_to_ignite_nodes}) more then "
+ f"kafka topic partitions ({cdc_params.kafka_partitions})")
+
+ super().__init__(context, self.get_config(dst_cluster,
cdc_params.kafka_to_ignite_client_type),
+ cdc_params.kafka_to_ignite_nodes, jvm_opts,
merge_with_default, startup_timeout_sec,
+ shutdown_timeout_sec, add_cdc_ext_module(modules))
+
+ self.cdc_params = cdc_params
+
+ self.main_java_class =
"org.apache.ignite.cdc.kafka.KafkaToIgniteCommandLineStartup"
+
+ self.spec = get_kafka_to_ignite_spec(self.spec.__class__,
+ kafka.connection_string(), self)
+
+ self.spec.jvm_opts += ["-Dlog4j.configurationFile=file:" +
self.log_config_file]
+
+ self.kafka = kafka
+
+ def _prepare_configs(self, node):
+ parts_per_node = round(self.cdc_params.kafka_partitions /
self.num_nodes)
+
+ idx = self.idx(node) - 1
+
+ parts_from = idx * parts_per_node
+
+ if idx == self.num_nodes - 1:
+ parts_to = self.cdc_params.kafka_partitions
+ else:
+ parts_to = (idx + 1) * parts_per_node
+
+ self.config = self.add_kafka_streamer_config(parts_from, parts_to)
+
+ super()._prepare_configs(node)
+
+ def add_kafka_streamer_config(self, parts_from, parts_to):
+ ext_beans = [("bean.j2", Bean(
+
"org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration",
+ caches=self.cdc_params.caches,
+ max_batch_size=self.cdc_params.kafka_to_ignite_max_batch_size,
+
kafka_request_timeout=self.cdc_params.kafka_to_ignite_kafka_request_timeout,
+
kafka_consumer_poll_timeout=self.cdc_params.kafka_to_ignite_kafka_consumer_poll_timeout,
+
metadata_consumer_group=self.cdc_params.kafka_to_ignite_metadata_consumer_group,
+ metadata_topic=self.cdc_params.metadata_topic,
+ topic=self.cdc_params.topic,
+ thread_count=self.cdc_params.kafka_to_ignite_thread_count,
+ kafka_parts_to=parts_to,
+ kafka_parts_from=parts_from)),
+ ("properties_macro.j2", {
+ "id": "kafkaProperties",
+ "location": "kafka.properties"
+ })
+ ]
+
+ return self.config._replace(ext_beans=ext_beans)
+
+ @staticmethod
+ def get_config(dst_cluster, client_type):
+ """
+ Returns kafka-to-ignite.sh ignite client configuration.
+
+ :param dst_cluster: Destination cluster.
+ :param client_type: Client type.
+ :return: Ignite client configuration.
+ """
+ if client_type == IgniteServiceType.NODE:
+ return dst_cluster.config._replace(
+ client_mode=True,
+ data_storage=None
+ )
+ else:
+ addresses = [dst_cluster.nodes[0].account.hostname + ":" +
+
str(dst_cluster.config.client_connector_configuration.port)]
+
+ return IgniteThinClientConfiguration(
+ addresses=addresses,
+ version=dst_cluster.config.version
+ )
+
+ def await_started(self):
+ """
+ Awaits kafka-to-ignite.sh started.
+ """
+ self.logger.info("Waiting for KafkaToIgnite(s) to start ...")
+
+ self.await_event(">>> Kafka partitions", self.startup_timeout_sec,
from_the_beginning=True,
+ log_file="kafka-ignite-streamer.log")
+
+ def await_all_consumed(self, timeout_sec):
+ """
+ Awaits all events are consumed from Kafka.
+
+ :param timeout_sec: Timeout.
+ """
+ start = time.time()
+ end = start + timeout_sec
+
+ while True:
+ now = time.time()
+ if now > end:
+ raise TimeoutError(f"Timed out waiting {timeout_sec} seconds
for kafka_to_ignite.sh "
+ f"to consume all events from kafka.")
+
+ offsets = self.kafka.offsets([
+ self.cdc_params.topic,
+ self.cdc_params.metadata_topic
+ ])
+
+ all_consumed = all(map(lambda o: o.lag == 0, offsets))
+
+ if all_consumed:
+ self.logger.info("All events are consumed from kafka.")
+
+ break
+ elif any([not self.alive(n) for n in self.nodes]):
+ raise RuntimeError("Some kafka_to_ignite.sh instances are not
alive while not "
+ "all events are consumed from kafka.")
+ else:
+ for offset in offsets:
+ if offset.lag > 0:
+ self.logger.debug(f"offsets: [topic={offset.topic},
part={offset.part}, "
+ f"lag={offset.lag}]")
+
+ time.sleep(2)
+
+
+def get_kafka_to_ignite_spec(base, kafka_connection_string, service):
+ """
+ Dynamically create IgniteSpec subclass to run KafkaToIgniteCdcStreamer in
scope of service provided.
+
+ :param base: Base IgniteSpec class.
+ :param kafka_connection_string: Kafka connection string.
+ :param service: IgniteService.
+ :return: IgniteSpec instance for kafka-to-ignite application.
+ """
+ class KafkaToIgniteSpec(base):
+ def command(self, _):
+ envs = deepcopy(self.envs())
+
+ envs["IGNITE_HOME"] = self.service.home_dir
+
+ cmd = "%s %s %s %s 2>&1 | tee -a %s &" % \
+ (envs_to_exports(envs),
+ self.script("kafka-to-ignite.sh"),
+ self.config_file_path(),
+ self._jvm_opts(),
+ os.path.join(self.service.log_dir, "console.log"))
+
+ return cmd
+
+ def libs(self):
+ libs = super().libs()
+
+ libs.extend([os.path.join(self.service.config_dir)])
+
+ return libs
+
+ def config_templates(self):
+ templates = super().config_templates()
+
+ templates.extend([
+ ("kafka.properties",
KafkaPropertiesTemplate("kafka_to_ignite.properties.j2", {
+ "kafka_connection_string": kafka_connection_string
+ }))])
+
+ return templates
+
+ def config_file_path(self):
+ return self.service.config_file if
self.service.config.service_type == IgniteServiceType.NODE \
+ else self.service.thin_client_config_file
+
+ def script(self, cmd):
+ if self.service.config.version.is_dev:
+ return os.path.join(self.service.spec.extensions_home(),
"modules", "cdc-ext", "bin", cmd)
+ else:
+ return self.service.script(cmd)
+
+ return KafkaToIgniteSpec(service, service.spec.jvm_opts)
+
+
+class KafkaPropertiesTemplate(ConfigTemplate):
+ """
+ Kafka client configuration properties file.
+ """
+ def __init__(self, template_file_name, params):
+ super().__init__(template_file_name)
+
+ self.default_params = params
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_cdc.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_cdc.j2
new file mode 100644
index 00000000000..a739a533dc8
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_cdc.j2
@@ -0,0 +1,44 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+ <bean class="org.apache.ignite.cdc.CdcConfiguration" autowire="byType">
+ {% if params.metric_exporter_spi | length > 0 %}
+ <property name="metricExporterSpi">
+ <list>
+ {% for exporter in params.metric_exporter_spi %}
+ {{ misc_utils.bean(exporter) }}
+ {% endfor %}
+ </list>
+ </property>
+ {% endif %}
+
+ {% if params.check_frequency != None %}
+ <property name="checkFrequency" value="{{ params.check_frequency
}}"/>
+ {% endif %}
+
+ {% if params.keep_binary != None %}
+ <property name="keepBinary" value="{{ params.keep_binary }}"/>
+ {% endif %}
+
+ {% if params.lock_timeout != None %}
+ <property name="lockTimeout" value="{{ params.lock_timeout }}"/>
+ {% endif %}
+ </bean>
+{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_cdc_streamer.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_cdc_streamer.j2
new file mode 100644
index 00000000000..e5cc560efcb
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_cdc_streamer.j2
@@ -0,0 +1,44 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'ignite_configuration_macro.j2' as ignite_configuration %}
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+ <bean id="cdcConsumer"
class="org.apache.ignite.cdc.IgniteToIgniteCdcStreamer">
+ <property name="destinationIgniteConfiguration">
+ {{ ignite_configuration.apply(params.dst_cluster_client_config,
params.dst_cluster) }}
+ </property>
+
+ {% if params.cdc.max_batch_size != None %}
+ <property name="maxBatchSize" value="{{ params.cdc.max_batch_size }}"/>
+ {% endif %}
+
+ {% if params.cdc.only_primary != None %}
+ <property name="onlyPrimary" value="{{ params.cdc.only_primary }}"/>
+ {% endif %}
+
+ <property name="caches">
+ <list>
+ {% for v in params.cdc.caches %}
+ <value>{{ v }}</value>
+ {% endfor %}
+ </list>
+ </property>
+ </bean>
+ {{ misc_utils.ext_beans(params.dst_cluster_client_config) }}
+{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_client_cdc_streamer.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_client_cdc_streamer.j2
new file mode 100644
index 00000000000..0ec79852d2c
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_ignite_client_cdc_streamer.j2
@@ -0,0 +1,43 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'client_configuration_macro.j2' as client_configuration %}
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+ <bean id="cdcConsumer"
class="org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer">
+ <property name="destinationClientConfiguration">
+ {{ client_configuration.apply(params.dst_cluster_client_config,
params.dst_cluster) }}
+ </property>
+
+ {% if params.cdc.max_batch_size != None %}
+ <property name="maxBatchSize" value="{{ params.cdc.max_batch_size }}"/>
+ {% endif %}
+
+ {% if params.cdc.only_primary != None %}
+ <property name="onlyPrimary" value="{{ params.cdc.only_primary }}"/>
+ {% endif %}
+
+ <property name="caches">
+ <list>
+ {% for v in params.cdc.caches %}
+ <value>{{ v }}</value>
+ {% endfor %}
+ </list>
+ </property>
+ </bean>
+{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka.properties.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka.properties.j2
new file mode 100644
index 00000000000..e686999d943
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka.properties.j2
@@ -0,0 +1,19 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+bootstrap.servers={{ kafka_connection_string }}
+request.timeout.ms=10000
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka_cdc_streamer.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka_cdc_streamer.j2
new file mode 100644
index 00000000000..8fe9b095936
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/ignite_to_kafka_cdc_streamer.j2
@@ -0,0 +1,24 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, params) %}
+ {{ misc_utils.bean(params) }}
+
+ <util:properties id="kafkaProperties" location="kafka.properties"/>
+{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/kafka_to_ignite.properties.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/kafka_to_ignite.properties.j2
new file mode 100644
index 00000000000..e4c2749bcd1
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/kafka_to_ignite.properties.j2
@@ -0,0 +1,22 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+bootstrap.servers={{ kafka_connection_string }}
+request.timeout.ms=10000
+group.id=kafka-to-ignite
+auto.offset.reset=earliest
+enable.auto.commit=false
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/properties_macro.j2
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/properties_macro.j2
new file mode 100644
index 00000000000..d47e5ae2cb5
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/services/utils/cdc/templates/properties_macro.j2
@@ -0,0 +1,20 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% macro apply(config, properties) %}
+ <util:properties id="{{ properties.id }}" location="{{ properties.location
}}" />
+{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/config_template.py
b/modules/ducktests/tests/ignitetest/services/utils/config_template.py
index 16a6a9a3533..69fb32966fe 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/config_template.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/config_template.py
@@ -22,12 +22,13 @@ from jinja2 import FileSystemLoader, Environment
IGNITE_TEMPLATE_PATH =
os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
ZK_TEMPLATE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"..", "zk", "templates")
+CDC_TEMPLATE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"cdc", "templates")
DEFAULT_IGNITE_CONF = "ignite.xml.j2"
DEFAULT_THIN_CLIENT_CONF = "thin_client_config.xml.j2"
DEFAULT_THIN_JDBC_CONF = "thin_jdbc_config.xml.j2"
DEFAULT_LOG4J2_CONF = "log4j2.xml.j2"
-TEMPLATE_PATHES = [IGNITE_TEMPLATE_PATH, ZK_TEMPLATE_PATH]
+TEMPLATE_PATHES = [IGNITE_TEMPLATE_PATH, ZK_TEMPLATE_PATH, CDC_TEMPLATE_PATH]
class ConfigTemplate:
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 87fe8b6732d..a99aa163026 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -205,6 +205,9 @@ class IgniteSpec(metaclass=ABCMeta):
return [os.path.join(project_dir, module_path) for module_path in
module_libs]
+ def extensions_home(self):
+ return os.path.join(self.service.install_root, "ignite-extensions")
+
def _module_libs(self, module_name):
"""
Get list of paths to be added to classpath for the passed module for
current spec.
@@ -212,7 +215,12 @@ class IgniteSpec(metaclass=ABCMeta):
if module_name == "ducktests":
return self.__get_module_libs(self.__home(str(DEV_BRANCH)),
module_name, is_dev=True)
- return self.__get_module_libs(self.__home(), module_name,
self.service.config.version.is_dev)
+ if module_name.endswith("-ext") and self.service.config.version.is_dev:
+ home = self.extensions_home()
+ else:
+ home = self.__home()
+
+ return self.__get_module_libs(home, module_name,
self.service.config.version.is_dev)
@abstractmethod
def command(self, node):
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
index ca2c7e2a99b..d6b28e30ff4 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
@@ -79,7 +79,7 @@
</property>
{% elif value.ref | length > 0 %}
<property name="{{ name | snake_to_camel }}" ref="{{
value.ref }}"/>
- {% else %}
+ {% elif value != None %}
<property name="{{ name | snake_to_camel }}">
{{ bean(value) }}
</property>
diff --git
a/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_abstract_test.py
b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_abstract_test.py
new file mode 100644
index 00000000000..08e7fdbd610
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_abstract_test.py
@@ -0,0 +1,496 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+import os
+import difflib
+import json
+from copy import copy, deepcopy
+from time import sleep
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.kafka.kafka import KafkaSettings, KafkaService
+from ignitetest.services.utils.cdc.cdc_helper import CdcParams
+from ignitetest.services.utils.cdc.ignite_to_kafka_cdc_helper import
IgniteToKafkaCdcHelper
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import
IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.cache import
CacheConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster, TcpDiscoveryVmIpFinder, \
+ TcpDiscoverySpi
+from ignitetest.services.utils.ssl.client_connector_configuration import
ClientConnectorConfiguration
+from ignitetest.services.zk.zookeeper import ZookeeperSettings,
ZookeeperService
+from ignitetest.utils.bean import Bean
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import IgniteVersion
+
+
+TEST_CACHE_NAME = "cdc-test-cache"
+JAVA_CLIENT_CLASS_NAME =
"org.apache.ignite.internal.ducktest.tests.cdc.CdcContinuousUpdatesApplication"
+
+TEST_DURATION_SEC = 10
+RANGE = 5000
+DEFAULT_SERVER_NODES_COUNT = 2
+DEFAULT_CLIENT_NODES_COUNT = 1
+
+
+class CdcReplicationAbstractTest(IgniteTest):
+ def __init__(self, test_context):
+ super().__init__(test_context)
+
+ self.pds = True
+
+ """
+ Base class for CDC replication tests.
+ """
+ def run(self, ignite_version, mode, cdc_helper, cdc_params=None):
+ """
+ Run CDC replication test.
+
+ :param ignite_version: Ignite version.
+ :param mode: Active-passive or active-active mode.
+ :param cdc_helper: CDC helper for particular CDC streamer
implementation.
+ :param cdc_params: CDC test parameters.
+ :return: CDC metrics if any.
+ """
+ if cdc_params is None:
+ cdc_params = CdcParams()
+
+ if cdc_params.caches is None:
+ cdc_params.caches = self.caches()
+
+ src_cluster = self.src_cluster(ignite_version)
+
+ dst_cluster = self.dst_cluster(ignite_version)
+
+ if mode == "active-active":
+ cdc_streamer_metrics = self.run_active_active(src_cluster,
dst_cluster, cdc_helper, cdc_params,
+
self.default_active_active_load)
+ else:
+ cdc_streamer_metrics = self.run_active_passive(src_cluster,
dst_cluster, cdc_helper, cdc_params,
+
self.default_active_passive_load)
+
+ self.logger.info(f"Cdc metrics:\n{json.dumps(cdc_streamer_metrics,
indent=4)}")
+
+ if not self.check_partitions_are_same(src_cluster, dst_cluster):
+ raise AssertionError("Partitions are different in source and
destination clusters.")
+
+ src_cluster.stop()
+
+ dst_cluster.stop()
+
+ return cdc_streamer_metrics
+
+ def run_active_passive(self, src_cluster, dst_cluster, cdc_helper,
cdc_params, do_load=None):
+ """
+ Run test in active-passive mode. Client operations are performed
against source cluster.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :param cdc_helper: CDC helper for particular CDC streamer
implementation.
+ :param cdc_params: CDC test parameters.
+ :param do_load: lambda function to run client operations against
source cluster. Use default if None.
+ :return: CDC metrics if any.
+ """
+ src_cluster_config = deepcopy(src_cluster.config)
+ dst_cluster_config = deepcopy(dst_cluster.config)
+
+ src_ctx = self.setup_active_passive(src_cluster, dst_cluster,
cdc_helper, cdc_params)
+
+ if do_load is None:
+ self.default_active_passive_load(src_cluster, dst_cluster,
src_cluster_config, dst_cluster_config)
+ else:
+ do_load(src_cluster, dst_cluster, src_cluster_config,
dst_cluster_config)
+
+ cdc_helper.wait_cdc(src_ctx, no_new_events_period_secs=10,
timeout_sec=300)
+
+ cdc_streamer_metrics = cdc_helper.stop_ignite_cdc(src_ctx,
timeout_sec=300)
+
+ return cdc_streamer_metrics
+
+ def run_active_active(self, src_cluster, dst_cluster, cdc_helper,
cdc_params, do_load=None):
+ """
+ Run test in active-active mode. Client operations are performed both
against source and destination clusters.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :param cdc_helper: CDC helper for particular CDC streamer
implementation.
+ :param cdc_params: CDC test parameters.
+ :param do_load: lambda function to run client operations against
source cluster. Use default if None.
+ :return: CDC metrics if any.
+ """
+ src_cluster_config = deepcopy(src_cluster.config)
+ dst_cluster_config = deepcopy(dst_cluster.config)
+
+ src_ctx, dst_ctx = self.setup_active_active(src_cluster, dst_cluster,
cdc_helper, cdc_params)
+
+ if do_load is None:
+ self.default_active_active_load(src_cluster, dst_cluster,
src_cluster_config, dst_cluster_config)
+ else:
+ do_load(src_cluster, dst_cluster, src_cluster_config,
dst_cluster_config)
+
+ cdc_helper.wait_cdc(src_ctx, no_new_events_period_secs=10,
timeout_sec=300)
+ cdc_helper.wait_cdc(dst_ctx, no_new_events_period_secs=10,
timeout_sec=300)
+
+ cdc_streamer_metrics = {
+ "src": cdc_helper.stop_ignite_cdc(src_ctx, timeout_sec=300),
+ "dst": cdc_helper.stop_ignite_cdc(dst_ctx, timeout_sec=300)
+ }
+
+ return cdc_streamer_metrics
+
+ def default_active_active_load(self, src_cluster, dst_cluster,
+ src_cluster_config, dst_cluster_config):
+ """
+ Default client operations against source and destination clusters.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :param src_cluster_config: Source cluster configuration.
+ :param dst_cluster_config: Destination cluster configuration.
+ """
+ client1 = IgniteApplicationService(
+ self.test_context,
+ self.client_config(src_cluster, src_cluster_config),
+ java_class_name=JAVA_CLIENT_CLASS_NAME,
+ num_nodes=DEFAULT_CLIENT_NODES_COUNT,
+ params={"cacheName": TEST_CACHE_NAME, "pacing": 1, "clusterCnt":
2, "clusterIdx": 0, "range": RANGE},
+ modules=self.modules())
+
+ client2 = IgniteApplicationService(
+ self.test_context,
+ self.client_config(dst_cluster, dst_cluster_config),
+ java_class_name=JAVA_CLIENT_CLASS_NAME,
+ num_nodes=DEFAULT_CLIENT_NODES_COUNT,
+ params={"cacheName": TEST_CACHE_NAME, "pacing": 1, "clusterCnt":
2, "clusterIdx": 1, "range": RANGE},
+ modules=self.modules())
+
+ client1.start()
+ client2.start()
+
+ sleep(TEST_DURATION_SEC)
+
+ client1.stop()
+ client2.stop()
+
+ assert int(client1.extract_result("putCnt")) > 0
+ assert int(client1.extract_result("removeCnt")) > 0
+
+ assert int(client2.extract_result("putCnt")) > 0
+ assert int(client2.extract_result("removeCnt")) > 0
+
+ def default_active_passive_load(self, src_cluster, dst_cluster,
+ src_cluster_config, dst_cluster_config):
+ """
+ Default client operations against source cluster.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :param src_cluster_config: Source cluster configuration.
+ :param dst_cluster_config: Destination cluster configuration.
+ """
+ client = IgniteApplicationService(
+ self.test_context,
+ self.client_config(src_cluster, src_cluster_config),
+ java_class_name=JAVA_CLIENT_CLASS_NAME,
+ num_nodes=DEFAULT_CLIENT_NODES_COUNT,
+ params={"cacheName": TEST_CACHE_NAME, "pacing": 5, "clusterCnt":
1, "clusterIdx": 0, "range": RANGE},
+ modules=self.modules())
+
+ client.start()
+
+ sleep(TEST_DURATION_SEC)
+
+ client.stop()
+
+ assert int(client.extract_result("putCnt")) > 0
+ assert int(client.extract_result("removeCnt")) > 0
+
+ def check_partitions_are_same(self, src_cluster, dst_cluster):
+ """
+ Compare partitions on source and destination clusters.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :return: True if there is no any divergence between partitions on
source and destination clusters.
+ """
+ src_cluster_dump = self.dump_partitions(src_cluster)
+
+ dst_cluster_dump = self.dump_partitions(dst_cluster)
+
+ if src_cluster_dump != dst_cluster_dump:
+ def diff(source, destination):
+ return "".join(difflib.unified_diff(
+ source.splitlines(True),
+ destination.splitlines(True),
+ "source",
+ "destination",
+ n=1)
+ )
+
+ self.logger.debug("Partitions are different in source and
destination clusters:\n"
+ f"{diff(src_cluster_dump, dst_cluster_dump)}")
+ return False
+ else:
+ return True
+
+ def dump_partitions(self, cluster):
+ """
+ Dump partitions info skipping the cluster-specific fields.
+ Saves original dump file in the service log directory.
+
+ :param cluster: Cluster to dump partitions from.
+ """
+ dump_filename =
ControlUtility(cluster).idle_verify_dump(cluster.nodes[0])
+
+ orig_dump_filename = os.path.join(cluster.log_dir,
'idle_verify_dump_orig.txt')
+
+ cluster.nodes[0].account.ssh(f"mv {dump_filename}
{orig_dump_filename}")
+
+ processed_dump_filename = os.path.join(cluster.log_dir,
'idle_verify_dump.txt')
+
+ cluster.nodes[0].account.ssh(
+ f"cat {orig_dump_filename} | "
+ f"sed -E 's/, partVerHash=[-0-9]+]/]/g' | "
+ f"sed -E 's/ consistentId=[^,]+,//g' > "
+ f"{processed_dump_filename}")
+
+ return cluster.nodes[0].account.ssh_output(f"cat
{processed_dump_filename}").decode("utf-8")
+
+ def setup_active_passive(self, src_cluster, dst_cluster, cdc_helper,
cdc_params):
+ """
+ Setup active-passive replication.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :param cdc_helper: CDC helper for particular CDC streamer
implementation.
+ :param cdc_params: CDC test parameters.
+ :return: CDC context.
+ """
+ enable_cdc(src_cluster)
+
+ setup_conflict_resolver(dst_cluster, "2", cdc_params)
+
+ dst_cluster.config.discovery_spi.prepare_on_start(cluster=dst_cluster)
+
+ ctx = cdc_helper.configure(src_cluster, dst_cluster, cdc_params)
+
+ dst_cluster.start()
+ ControlUtility(dst_cluster).activate()
+ self.on_dst_cluster_start(dst_cluster)
+
+ src_cluster.start()
+ ControlUtility(src_cluster).activate()
+ self.on_src_cluster_start(src_cluster)
+
+ cdc_helper.start_ignite_cdc(ctx)
+
+ return ctx
+
+ def setup_active_active(self, src_cluster, dst_cluster, cdc_helper,
cdc_params):
+ """
+ Setup active-active replication.
+
+ :param src_cluster: Source cluster.
+ :param dst_cluster: Destination cluster.
+ :param cdc_helper: CDC helper for particular CDC streamer
implementation.
+ :param cdc_params: CDC test parameters.
+ :return: CDC context.
+ """
+ enable_cdc(src_cluster)
+ enable_cdc(dst_cluster)
+
+ setup_conflict_resolver(src_cluster, "1", cdc_params)
+ setup_conflict_resolver(dst_cluster, "2", cdc_params)
+
+ src_cdc_params = copy(cdc_params)
+ dst_cdc_params = copy(cdc_params)
+
+ if isinstance(cdc_helper, IgniteToKafkaCdcHelper):
+ src_cdc_params.topic = src_cdc_params.topic + "-src-to-dst"
+ src_cdc_params.metadata_topic = src_cdc_params.metadata_topic +
"-src-to-dst"
+
+ dst_cdc_params.topic = dst_cdc_params.topic + "-dst-to-src"
+ dst_cdc_params.metadata_topic = dst_cdc_params.metadata_topic +
"-dst-to-src"
+
+ src_cluster.config.discovery_spi.prepare_on_start(cluster=src_cluster)
+ dst_cluster.config.discovery_spi.prepare_on_start(cluster=dst_cluster)
+
+ src_config_orig = deepcopy(src_cluster.config)
+
+ src_ctx = cdc_helper.configure(src_cluster, dst_cluster,
src_cdc_params)
+ src_config_cdc_configured = src_cluster.config
+
+ src_cluster.config = src_config_orig
+ dst_ctx = cdc_helper.configure(dst_cluster, src_cluster,
dst_cdc_params)
+
+ src_cluster.config = src_config_cdc_configured
+
+ src_cluster.start()
+ ControlUtility(src_cluster).activate()
+ self.on_src_cluster_start(src_cluster)
+
+ dst_cluster.start()
+ ControlUtility(dst_cluster).activate()
+ self.on_dst_cluster_start(dst_cluster)
+
+ cdc_helper.start_ignite_cdc(src_ctx)
+ cdc_helper.start_ignite_cdc(dst_ctx)
+
+ return src_ctx, dst_ctx
+
+ def on_src_cluster_start(self, src_cluster):
+ """
+ To be overriden to perform some actions on source cluster start.
+
+ :param src_cluster: Source cluster.
+ """
+ pass
+
+ def on_dst_cluster_start(self, dst_cluster):
+ """
+ To be overriden to perform some actions on destination cluster start.
+
+ :param dst_cluster: Destination cluster.
+ """
+ pass
+
+ def caches(self):
+ """
+ :return: List of caches to be used in the test.
+ """
+ return [TEST_CACHE_NAME]
+
+ def start_kafka(self, kafka_nodes, zk_nodes=1):
+ """
+ Start Kafka and Zookeeper.
+
+ :param kafka_nodes: Kafka nodes count.
+ :param zk_nodes: Zookeeper nodes count.
+ :return: Zookeeper and Kafka services.
+ """
+ zk_settings = ZookeeperSettings()
+ zk = ZookeeperService(self.test_context, zk_nodes,
settings=zk_settings)
+
+ kafka_settings =
KafkaSettings(zookeeper_connection_string=zk.connection_string())
+ kafka = KafkaService(self.test_context, kafka_nodes,
settings=kafka_settings)
+
+ zk.start_async()
+ kafka.start()
+
+ return zk, kafka
+
+ def stop_kafka(self, zk, kafka):
+ """
+ Stop Kafka and Zookeeper.
+ """
+ kafka.stop(force_stop=False, allow_fail=True)
+
+ zk.stop(force_stop=False)
+
+ def src_cluster(self, ignite_version):
+ """
+ :return: Source cluster service.
+ """
+ return IgniteService(self.test_context,
self.ignite_config(ignite_version, "src"),
+ DEFAULT_SERVER_NODES_COUNT,
modules=self.modules())
+
+ def dst_cluster(self, ignite_version):
+ """
+ :return: Destination cluster service.
+ """
+ return IgniteService(self.test_context,
self.ignite_config(ignite_version, "dst"),
+ DEFAULT_SERVER_NODES_COUNT,
modules=self.modules())
+
+ def modules(self):
+ """
+ :return: List of modules to be used in the test.
+ """
+ return ["cdc-ext"]
+
+ def client_config(self, cluster, config):
+ """
+ Create client node configuration to run aginst cluster passed.
+
+ :param cluster: Cluster Ignite service.
+ :param config: Cluster Ignite configuration.
+ :return: Client node ignite configuration.
+ """
+ return config._replace(
+ client_mode=True,
+ data_storage=None,
+ ext_beans=[],
+ discovery_spi=from_ignite_cluster(cluster)
+ )
+
+ def ignite_config(self, ignite_version, ignite_instance_name):
+ """
+ Create ignite configuration for server nodes.
+
+ :param ignite_version: Ignite version.
+ :param ignite_instance_name: Ignite instance name.
+ :return: Server node ignite configuration.
+ """
+ config = IgniteConfiguration(
+ discovery_spi=TcpDiscoverySpi(ip_finder=TcpDiscoveryVmIpFinder()),
+ ignite_instance_name=ignite_instance_name,
+ version=IgniteVersion(ignite_version),
+ data_storage=DataStorageConfiguration(),
+ caches=[CacheConfiguration(name=TEST_CACHE_NAME)],
+ client_connector_configuration=ClientConnectorConfiguration()
+ )
+
+ if self.pds:
+ config = config._replace(
+ data_storage=config.data_storage._replace(
+ default=config.data_storage.default._replace(
+ persistence_enabled=True
+ )
+ )
+ )
+
+ return config
+
+
+def enable_cdc(cluster):
+ """
+ Enable CDC on cluster. Changes cluster config in place.
+
+ :param cluster: Ignite cluster.
+ """
+ cluster.config = cluster.config._replace(
+ data_storage=cluster.config.data_storage._replace(
+ default=cluster.config.data_storage.default._replace(
+ cdc_enabled=True
+ )
+ )
+ )
+
+
+def setup_conflict_resolver(cluster, cluster_id, cdc_params: CdcParams):
+ """
+ Setup conflict resolver plugin. Changes cluster config in place.
+
+ :param cluster: Ignite cluster.
+ :param cluster_id: Cluster ID.
+ :param cdc_params: CDC test parameters.
+ """
+ cluster.config = cluster.config._replace(
+ plugins=[*cluster.config.plugins,
+ ('bean.j2',
+
Bean("org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider",
+ cluster_id=cluster_id,
+
conflict_resolve_field=cdc_params.conflict_resolve_field,
+ caches=cdc_params.caches))],
+ )
diff --git
a/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_test.py
b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_test.py
new file mode 100644
index 00000000000..43d48524b75
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_replication_test.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+from ducktape.mark import defaults
+
+from ignitetest.services.utils import IgniteServiceType
+from ignitetest.services.utils.cdc.ignite_to_ignite_cdc_helper import
IgniteToIgniteCdcHelper
+from ignitetest.services.utils.cdc.ignite_to_kafka_cdc_helper import
KafkaCdcParams, \
+ IgniteToKafkaCdcHelper
+from ignitetest.services.utils.cdc.ignite_to_ignite_client_cdc_helper import
IgniteToIgniteClientCdcHelper
+from ignitetest.tests.cdc.cdc_replication_abstract_test import
CdcReplicationAbstractTest
+from ignitetest.utils import cluster, ignite_versions
+from ignitetest.utils.version import DEV_BRANCH
+
+WAL_FORCE_ARCHIVE_TIMEOUT_MS = 100
+
+
+class CdcReplicationTest(CdcReplicationAbstractTest):
+ """
+ CDC replication tests.
+ """
+ @cluster(num_nodes=6)
+ @ignite_versions(str(DEV_BRANCH))
+ @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+ def ignite_to_ignite_test(self, ignite_version, pds, mode):
+ self.pds = pds
+
+ return self.run(ignite_version, mode, IgniteToIgniteCdcHelper())
+
+ @cluster(num_nodes=6)
+ @ignite_versions(str(DEV_BRANCH))
+ @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+ def ignite_to_ignite_client_test(self, ignite_version, pds, mode):
+ self.pds = pds
+
+ return self.run(ignite_version, mode, IgniteToIgniteClientCdcHelper())
+
+ @cluster(num_nodes=10)
+ @ignite_versions(str(DEV_BRANCH))
+ @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+ def ignite_to_kafka_to_ignite_test(self, ignite_version, pds, mode):
+ self.pds = pds
+
+ zk, kafka = self.start_kafka(kafka_nodes=1)
+
+ res = self.run(ignite_version, mode, IgniteToKafkaCdcHelper(),
KafkaCdcParams(kafka=kafka))
+
+ self.stop_kafka(zk, kafka)
+
+ return res
+
+ @cluster(num_nodes=10)
+ @ignite_versions(str(DEV_BRANCH))
+ @defaults(pds=[True, False], mode=["active-active", "active-passive"])
+ def ignite_to_kafka_to_ignite_client_test(self, ignite_version, pds, mode):
+ self.pds = pds
+
+ zk, kafka = self.start_kafka(kafka_nodes=1)
+
+ res = self.run(ignite_version, mode, IgniteToKafkaCdcHelper(),
KafkaCdcParams(
+ kafka=kafka,
+ kafka_to_ignite_client_type=IgniteServiceType.THIN_CLIENT
+ ))
+
+ self.stop_kafka(zk, kafka)
+
+ return res
+
+ def ignite_config(self, ignite_version, ignite_instance_name):
+ cfg = super().ignite_config(ignite_version, ignite_instance_name)
+
+ cfg = cfg._replace(
+ data_storage=cfg.data_storage._replace(
+ wal_force_archive_timeout=WAL_FORCE_ARCHIVE_TIMEOUT_MS
+ )
+ )
+
+ return cfg