This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c13de0318 IGNITE-20682 [ducktests] Add the extention point to the 
rebalance tests (#11002)
a1c13de0318 is described below

commit a1c13de0318872448e28b91a0d1d4c4657da8af2
Author: Sergey Korotkov <serge.korot...@gmail.com>
AuthorDate: Mon Oct 23 16:34:48 2023 +0700

    IGNITE-20682 [ducktests] Add the extention point to the rebalance tests 
(#11002)
---
 .../ducktest/tests/DataGenerationApplication.java  | 40 ++++++++++++++--
 .../ignitetest/tests/rebalance/in_memory_test.py   | 20 ++++----
 .../ignitetest/tests/rebalance/persistent_test.py  | 54 ++++++++++++----------
 .../tests/ignitetest/tests/rebalance/util.py       | 30 +++++++++++-
 modules/ducktests/tests/ignitetest/tests/util.py   |  6 ++-
 5 files changed, 110 insertions(+), 40 deletions(-)

diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
index 216fbf72d62..e96d76d72c2 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.ducktest.tests;
 
+import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.ignite.IgniteCache;
@@ -54,6 +57,13 @@ public class DataGenerationApplication extends 
IgniteAwareApplication {
         if (jsonNode.has("indexCount"))
             idxCnt = jsonNode.get("indexCount").asInt();
 
+        byte[] dataPattern = null;
+
+        if (jsonNode.has("dataPatternBase64"))
+            dataPattern = 
Optional.ofNullable(jsonNode.get("dataPatternBase64").asText(null))
+                .map(b64 -> Base64.getDecoder().decode(b64))
+                .orElse(null);
+
         markInitialized();
 
         for (int i = 1; i <= cacheCnt; i++) {
@@ -82,7 +92,7 @@ public class DataGenerationApplication extends 
IgniteAwareApplication {
 
             IgniteCache<Integer, BinaryObject> cache = 
ignite.getOrCreateCache(ccfg);
 
-            generateCacheData(cache.getName(), entrySize, from, to, idxCnt);
+            generateCacheData(cache.getName(), entrySize, from, to, idxCnt, 
dataPattern);
         }
 
         markFinished();
@@ -93,16 +103,38 @@ public class DataGenerationApplication extends 
IgniteAwareApplication {
      * @param entrySize Entry size.
      * @param from From key.
      * @param to To key.
+     * @param dataPattern If not-null pattern is used to fill the entry data 
field.
+     *                    It is filled with random data otherwise.
      */
-    private void generateCacheData(String cacheName, int entrySize, int from, 
int to, int idxCnt) {
+    private void generateCacheData(String cacheName, int entrySize, int from, 
int to, int idxCnt, byte[] dataPattern) {
         int flushEach = MAX_STREAMER_DATA_SIZE / entrySize + 
(MAX_STREAMER_DATA_SIZE % entrySize == 0 ? 0 : 1);
         int logEach = (to - from) / 10;
 
         BinaryObjectBuilder builder = ignite.binary().builder(VAL_TYPE);
 
-        byte[] data = new byte[entrySize];
+        byte[] data;
+
+        if (dataPattern != null) {
+            ByteArrayOutputStream buf = new ByteArrayOutputStream(entrySize);
+
+            int curPos = 0;
+
+            while (curPos < entrySize) {
+                int len = Math.min(dataPattern.length, entrySize - curPos);
+
+                buf.write(dataPattern, 0, len);
 
-        ThreadLocalRandom.current().nextBytes(data);
+                curPos += len;
+            }
+
+            assert buf.size() == entrySize;
+
+            data = buf.toByteArray();
+        }
+        else {
+            data = new byte[entrySize];
+            ThreadLocalRandom.current().nextBytes(data);
+        }
 
         try (IgniteDataStreamer<Integer, BinaryObject> stmr = 
ignite.dataStreamer(cacheName)) {
             for (int i = from; i < to; i++) {
diff --git 
a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py 
b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
index d1213d65ffd..9ad55243ce1 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -21,14 +21,13 @@ from ducktape.mark import defaults
 from ignitetest.services.ignite import IgniteService
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
 from ignitetest.tests.rebalance.util import start_ignite, get_result, 
TriggerEvent, NUM_NODES, \
-    await_rebalance_start, RebalanceParams
-from ignitetest.tests.util import preload_data, DataGenerationParams
+    await_rebalance_start, BaseRebalanceTest
+from ignitetest.tests.util import preload_data
 from ignitetest.utils import cluster, ignite_versions
-from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import DEV_BRANCH, LATEST
 
 
-class RebalanceInMemoryTest(IgniteTest):
+class RebalanceInMemoryTest(BaseRebalanceTest):
     """
     Tests rebalance scenarios in in-memory mode.
     """
@@ -81,12 +80,12 @@ class RebalanceInMemoryTest(IgniteTest):
         :param throttle: rebalanceThrottle config property.
         :return: Rebalance and data preload stats.
         """
-        reb_params = RebalanceParams(trigger_event=trigger_event, 
thread_pool_size=thread_pool_size,
-                                     batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
-                                     throttle=throttle)
+        reb_params = self.get_reb_params(trigger_event=trigger_event, 
thread_pool_size=thread_pool_size,
+                                         batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
+                                         throttle=throttle)
 
-        data_gen_params = DataGenerationParams(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
-                                               entry_size=entry_size, 
preloaders=preloaders)
+        data_gen_params = self.get_data_gen_params(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
+                                                   entry_size=entry_size, 
preloaders=preloaders)
 
         ignites = start_ignite(self.test_context, ignite_version, reb_params, 
data_gen_params)
 
@@ -100,7 +99,8 @@ class RebalanceInMemoryTest(IgniteTest):
             rebalance_nodes = ignites.nodes[:-1]
         else:
             ignite = IgniteService(self.test_context,
-                                   
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), 
num_nodes=1)
+                                   
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)), 
num_nodes=1,
+                                   modules=reb_params.modules)
             ignite.start()
             rebalance_nodes = ignite.nodes
 
diff --git 
a/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py 
b/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
index 1aba0d71e3f..0599316d322 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
@@ -24,14 +24,13 @@ from ignitetest.services.utils.control_utility import 
ControlUtility
 from ignitetest.services.utils.ignite_aware import IgniteAwareService
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
 from ignitetest.tests.rebalance.util import NUM_NODES, start_ignite, 
TriggerEvent, \
-    get_result, check_type_of_rebalancing, await_rebalance_start, 
RebalanceParams
-from ignitetest.tests.util import preload_data, DataGenerationParams
+    get_result, check_type_of_rebalancing, await_rebalance_start, 
BaseRebalanceTest
+from ignitetest.tests.util import preload_data
 from ignitetest.utils import cluster, ignite_versions
-from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import DEV_BRANCH, LATEST
 
 
-class RebalancePersistentTest(IgniteTest):
+class RebalancePersistentTest(BaseRebalanceTest):
     """
     Tests rebalance scenarios in persistent mode.
     """
@@ -45,12 +44,12 @@ class RebalancePersistentTest(IgniteTest):
         Tests rebalance on node join.
         """
 
-        reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN, 
thread_pool_size=thread_pool_size,
-                                     batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
-                                     throttle=throttle, persistent=True)
+        reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, 
thread_pool_size=thread_pool_size,
+                                         batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
+                                         throttle=throttle)
 
-        data_gen_params = DataGenerationParams(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
-                                               entry_size=entry_size, 
preloaders=preloaders)
+        data_gen_params = self.get_data_gen_params(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
+                                                   entry_size=entry_size, 
preloaders=preloaders)
 
         ignites = start_ignite(self.test_context, ignite_version, reb_params, 
data_gen_params)
 
@@ -64,7 +63,7 @@ class RebalancePersistentTest(IgniteTest):
             data_gen_params=data_gen_params)
 
         new_node = IgniteService(self.test_context, 
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
-                                 num_nodes=1)
+                                 num_nodes=1, modules=reb_params.modules)
         new_node.start()
 
         control_utility.add_to_baseline(new_node.nodes)
@@ -93,12 +92,12 @@ class RebalancePersistentTest(IgniteTest):
         Tests rebalance on node left.
         """
 
-        reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_LEFT, 
thread_pool_size=thread_pool_size,
-                                     batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
-                                     throttle=throttle, persistent=True)
+        reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_LEFT, 
thread_pool_size=thread_pool_size,
+                                         batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
+                                         throttle=throttle)
 
-        data_gen_params = DataGenerationParams(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
-                                               entry_size=entry_size, 
preloaders=preloaders)
+        data_gen_params = self.get_data_gen_params(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
+                                                   entry_size=entry_size, 
preloaders=preloaders)
 
         ignites = start_ignite(self.test_context, ignite_version, reb_params, 
data_gen_params)
 
@@ -142,15 +141,15 @@ class RebalancePersistentTest(IgniteTest):
 
         preload_entries = 10_000
 
-        reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN, 
thread_pool_size=thread_pool_size,
-                                     batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
-                                     throttle=throttle, persistent=True,
-                                     
jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0',
-                                               
'-DIGNITE_PREFER_WAL_REBALANCE=true']
-                                     )
+        reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN, 
thread_pool_size=thread_pool_size,
+                                         batch_size=batch_size, 
batches_prefetch_count=batches_prefetch_count,
+                                         throttle=throttle,
+                                         
jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0',
+                                                   
'-DIGNITE_PREFER_WAL_REBALANCE=true']
+                                         )
 
-        data_gen_params = DataGenerationParams(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
-                                               entry_size=entry_size, 
preloaders=preloaders)
+        data_gen_params = self.get_data_gen_params(backups=backups, 
cache_count=cache_count, entry_count=entry_count,
+                                                   entry_size=entry_size, 
preloaders=preloaders)
 
         ignites = start_ignite(self.test_context, ignite_version, reb_params, 
data_gen_params)
 
@@ -163,7 +162,9 @@ class RebalancePersistentTest(IgniteTest):
             self.test_context,
             preloader_config,
             
java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
-            params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0, 
"to": preload_entries}
+            modules=data_gen_params.modules,
+            params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0, 
"to": preload_entries,
+                    "dataPatternBase64": data_gen_params.data_pattern_base64}
         )
 
         preloader.run()
@@ -202,6 +203,11 @@ class RebalancePersistentTest(IgniteTest):
 
         return result
 
+    def get_reb_params(self, **kwargs):
+        return super().get_reb_params(**kwargs)._replace(
+            persistent=True
+        )
+
 
 def await_and_check_rebalance(service: IgniteService, rebalance_nodes: list = 
None, is_full: bool = True):
     """
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/util.py 
b/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
index 18ebb173c5a..9a5351dd238 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
@@ -29,6 +29,7 @@ from ignitetest.services.utils.ignite_configuration import 
IgniteConfiguration,
 from ignitetest.services.utils.ignite_configuration.data_storage import 
DataRegionConfiguration
 from ignitetest.tests.util import DataGenerationParams
 from ignitetest.utils.enum import constructible
+from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import IgniteVersion
 
 NUM_NODES = 4
@@ -54,6 +55,8 @@ class RebalanceParams(NamedTuple):
     throttle: int = None
     persistent: bool = False
     jvm_opts: list = None
+    modules: list = []
+    plugins: list = []
 
 
 class RebalanceMetrics(NamedTuple):
@@ -102,9 +105,13 @@ def start_ignite(test_context, ignite_version: str, 
rebalance_params: RebalanceP
         
rebalance_batches_prefetch_count=rebalance_params.batches_prefetch_count,
         rebalance_throttle=rebalance_params.throttle)
 
+    if rebalance_params.plugins:
+        node_config = node_config._replace(plugins=[*node_config.plugins, 
*rebalance_params.plugins])
+
     ignites = IgniteService(test_context, config=node_config,
                             num_nodes=node_count if 
rebalance_params.trigger_event else node_count - 1,
-                            jvm_opts=rebalance_params.jvm_opts)
+                            jvm_opts=rebalance_params.jvm_opts,
+                            modules=rebalance_params.modules)
     ignites.start()
 
     return ignites
@@ -247,3 +254,24 @@ def check_type_of_rebalancing(rebalance_nodes: list, 
is_full: bool = True):
             assert msg in i, i
 
         return output
+
+
+class BaseRebalanceTest(IgniteTest):
+    """
+    Base class for rebalance tests.
+    """
+    def get_reb_params(self, **kwargs):
+        """
+        Create rebalance parameters.
+        :param kwargs: RebalanceParams cstor parameters.
+        :return: instance of RebalanceParams.
+        """
+        return RebalanceParams(**kwargs)
+
+    def get_data_gen_params(self, **kwargs):
+        """
+        Create parameters for data generation application.
+        :param kwargs: DataGenerationParams cstor parameters.
+        :return: instance of DataGenerationParams.
+        """
+        return DataGenerationParams(**kwargs)
diff --git a/modules/ducktests/tests/ignitetest/tests/util.py 
b/modules/ducktests/tests/ignitetest/tests/util.py
index b8192a3443b..bc40cf860a4 100644
--- a/modules/ducktests/tests/ignitetest/tests/util.py
+++ b/modules/ducktests/tests/ignitetest/tests/util.py
@@ -35,6 +35,8 @@ class DataGenerationParams(NamedTuple):
     entry_size: int = 50_000
     preloaders: int = 1
     index_count: int = 0
+    data_pattern_base64: str = None
+    modules: list = []
 
     @property
     def data_region_max_size(self):
@@ -79,8 +81,10 @@ def preload_data(context, config, data_gen_params: 
DataGenerationParams, timeout
                 "entrySize": data_gen_params.entry_size,
                 "from": _from,
                 "to": _to,
-                "indexCount": data_gen_params.index_count
+                "indexCount": data_gen_params.index_count,
+                "dataPatternBase64": data_gen_params.data_pattern_base64
             },
+            modules=data_gen_params.modules,
             shutdown_timeout_sec=timeout)
         app.start_async()
 

Reply via email to