This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a1c13de0318 IGNITE-20682 [ducktests] Add the extention point to the
rebalance tests (#11002)
a1c13de0318 is described below
commit a1c13de0318872448e28b91a0d1d4c4657da8af2
Author: Sergey Korotkov <[email protected]>
AuthorDate: Mon Oct 23 16:34:48 2023 +0700
IGNITE-20682 [ducktests] Add the extention point to the rebalance tests
(#11002)
---
.../ducktest/tests/DataGenerationApplication.java | 40 ++++++++++++++--
.../ignitetest/tests/rebalance/in_memory_test.py | 20 ++++----
.../ignitetest/tests/rebalance/persistent_test.py | 54 ++++++++++++----------
.../tests/ignitetest/tests/rebalance/util.py | 30 +++++++++++-
modules/ducktests/tests/ignitetest/tests/util.py | 6 ++-
5 files changed, 110 insertions(+), 40 deletions(-)
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
index 216fbf72d62..e96d76d72c2 100644
---
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
@@ -54,6 +57,13 @@ public class DataGenerationApplication extends
IgniteAwareApplication {
if (jsonNode.has("indexCount"))
idxCnt = jsonNode.get("indexCount").asInt();
+ byte[] dataPattern = null;
+
+ if (jsonNode.has("dataPatternBase64"))
+ dataPattern =
Optional.ofNullable(jsonNode.get("dataPatternBase64").asText(null))
+ .map(b64 -> Base64.getDecoder().decode(b64))
+ .orElse(null);
+
markInitialized();
for (int i = 1; i <= cacheCnt; i++) {
@@ -82,7 +92,7 @@ public class DataGenerationApplication extends
IgniteAwareApplication {
IgniteCache<Integer, BinaryObject> cache =
ignite.getOrCreateCache(ccfg);
- generateCacheData(cache.getName(), entrySize, from, to, idxCnt);
+ generateCacheData(cache.getName(), entrySize, from, to, idxCnt,
dataPattern);
}
markFinished();
@@ -93,16 +103,38 @@ public class DataGenerationApplication extends
IgniteAwareApplication {
* @param entrySize Entry size.
* @param from From key.
* @param to To key.
+ * @param dataPattern If not-null pattern is used to fill the entry data
field.
+ * It is filled with random data otherwise.
*/
- private void generateCacheData(String cacheName, int entrySize, int from,
int to, int idxCnt) {
+ private void generateCacheData(String cacheName, int entrySize, int from,
int to, int idxCnt, byte[] dataPattern) {
int flushEach = MAX_STREAMER_DATA_SIZE / entrySize +
(MAX_STREAMER_DATA_SIZE % entrySize == 0 ? 0 : 1);
int logEach = (to - from) / 10;
BinaryObjectBuilder builder = ignite.binary().builder(VAL_TYPE);
- byte[] data = new byte[entrySize];
+ byte[] data;
+
+ if (dataPattern != null) {
+ ByteArrayOutputStream buf = new ByteArrayOutputStream(entrySize);
+
+ int curPos = 0;
+
+ while (curPos < entrySize) {
+ int len = Math.min(dataPattern.length, entrySize - curPos);
+
+ buf.write(dataPattern, 0, len);
- ThreadLocalRandom.current().nextBytes(data);
+ curPos += len;
+ }
+
+ assert buf.size() == entrySize;
+
+ data = buf.toByteArray();
+ }
+ else {
+ data = new byte[entrySize];
+ ThreadLocalRandom.current().nextBytes(data);
+ }
try (IgniteDataStreamer<Integer, BinaryObject> stmr =
ignite.dataStreamer(cacheName)) {
for (int i = from; i < to; i++) {
diff --git
a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
index d1213d65ffd..9ad55243ce1 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -21,14 +21,13 @@ from ducktape.mark import defaults
from ignitetest.services.ignite import IgniteService
from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster
from ignitetest.tests.rebalance.util import start_ignite, get_result,
TriggerEvent, NUM_NODES, \
- await_rebalance_start, RebalanceParams
-from ignitetest.tests.util import preload_data, DataGenerationParams
+ await_rebalance_start, BaseRebalanceTest
+from ignitetest.tests.util import preload_data
from ignitetest.utils import cluster, ignite_versions
-from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, LATEST
-class RebalanceInMemoryTest(IgniteTest):
+class RebalanceInMemoryTest(BaseRebalanceTest):
"""
Tests rebalance scenarios in in-memory mode.
"""
@@ -81,12 +80,12 @@ class RebalanceInMemoryTest(IgniteTest):
:param throttle: rebalanceThrottle config property.
:return: Rebalance and data preload stats.
"""
- reb_params = RebalanceParams(trigger_event=trigger_event,
thread_pool_size=thread_pool_size,
- batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
- throttle=throttle)
+ reb_params = self.get_reb_params(trigger_event=trigger_event,
thread_pool_size=thread_pool_size,
+ batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
+ throttle=throttle)
- data_gen_params = DataGenerationParams(backups=backups,
cache_count=cache_count, entry_count=entry_count,
- entry_size=entry_size,
preloaders=preloaders)
+ data_gen_params = self.get_data_gen_params(backups=backups,
cache_count=cache_count, entry_count=entry_count,
+ entry_size=entry_size,
preloaders=preloaders)
ignites = start_ignite(self.test_context, ignite_version, reb_params,
data_gen_params)
@@ -100,7 +99,8 @@ class RebalanceInMemoryTest(IgniteTest):
rebalance_nodes = ignites.nodes[:-1]
else:
ignite = IgniteService(self.test_context,
-
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
num_nodes=1)
+
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
num_nodes=1,
+ modules=reb_params.modules)
ignite.start()
rebalance_nodes = ignite.nodes
diff --git
a/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
b/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
index 1aba0d71e3f..0599316d322 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/persistent_test.py
@@ -24,14 +24,13 @@ from ignitetest.services.utils.control_utility import
ControlUtility
from ignitetest.services.utils.ignite_aware import IgniteAwareService
from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster
from ignitetest.tests.rebalance.util import NUM_NODES, start_ignite,
TriggerEvent, \
- get_result, check_type_of_rebalancing, await_rebalance_start,
RebalanceParams
-from ignitetest.tests.util import preload_data, DataGenerationParams
+ get_result, check_type_of_rebalancing, await_rebalance_start,
BaseRebalanceTest
+from ignitetest.tests.util import preload_data
from ignitetest.utils import cluster, ignite_versions
-from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, LATEST
-class RebalancePersistentTest(IgniteTest):
+class RebalancePersistentTest(BaseRebalanceTest):
"""
Tests rebalance scenarios in persistent mode.
"""
@@ -45,12 +44,12 @@ class RebalancePersistentTest(IgniteTest):
Tests rebalance on node join.
"""
- reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN,
thread_pool_size=thread_pool_size,
- batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
- throttle=throttle, persistent=True)
+ reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN,
thread_pool_size=thread_pool_size,
+ batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
+ throttle=throttle)
- data_gen_params = DataGenerationParams(backups=backups,
cache_count=cache_count, entry_count=entry_count,
- entry_size=entry_size,
preloaders=preloaders)
+ data_gen_params = self.get_data_gen_params(backups=backups,
cache_count=cache_count, entry_count=entry_count,
+ entry_size=entry_size,
preloaders=preloaders)
ignites = start_ignite(self.test_context, ignite_version, reb_params,
data_gen_params)
@@ -64,7 +63,7 @@ class RebalancePersistentTest(IgniteTest):
data_gen_params=data_gen_params)
new_node = IgniteService(self.test_context,
ignites.config._replace(discovery_spi=from_ignite_cluster(ignites)),
- num_nodes=1)
+ num_nodes=1, modules=reb_params.modules)
new_node.start()
control_utility.add_to_baseline(new_node.nodes)
@@ -93,12 +92,12 @@ class RebalancePersistentTest(IgniteTest):
Tests rebalance on node left.
"""
- reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_LEFT,
thread_pool_size=thread_pool_size,
- batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
- throttle=throttle, persistent=True)
+ reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_LEFT,
thread_pool_size=thread_pool_size,
+ batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
+ throttle=throttle)
- data_gen_params = DataGenerationParams(backups=backups,
cache_count=cache_count, entry_count=entry_count,
- entry_size=entry_size,
preloaders=preloaders)
+ data_gen_params = self.get_data_gen_params(backups=backups,
cache_count=cache_count, entry_count=entry_count,
+ entry_size=entry_size,
preloaders=preloaders)
ignites = start_ignite(self.test_context, ignite_version, reb_params,
data_gen_params)
@@ -142,15 +141,15 @@ class RebalancePersistentTest(IgniteTest):
preload_entries = 10_000
- reb_params = RebalanceParams(trigger_event=TriggerEvent.NODE_JOIN,
thread_pool_size=thread_pool_size,
- batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
- throttle=throttle, persistent=True,
-
jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0',
-
'-DIGNITE_PREFER_WAL_REBALANCE=true']
- )
+ reb_params = self.get_reb_params(trigger_event=TriggerEvent.NODE_JOIN,
thread_pool_size=thread_pool_size,
+ batch_size=batch_size,
batches_prefetch_count=batches_prefetch_count,
+ throttle=throttle,
+
jvm_opts=['-DIGNITE_PDS_WAL_REBALANCE_THRESHOLD=0',
+
'-DIGNITE_PREFER_WAL_REBALANCE=true']
+ )
- data_gen_params = DataGenerationParams(backups=backups,
cache_count=cache_count, entry_count=entry_count,
- entry_size=entry_size,
preloaders=preloaders)
+ data_gen_params = self.get_data_gen_params(backups=backups,
cache_count=cache_count, entry_count=entry_count,
+ entry_size=entry_size,
preloaders=preloaders)
ignites = start_ignite(self.test_context, ignite_version, reb_params,
data_gen_params)
@@ -163,7 +162,9 @@ class RebalancePersistentTest(IgniteTest):
self.test_context,
preloader_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
- params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0,
"to": preload_entries}
+ modules=data_gen_params.modules,
+ params={"backups": 1, "cacheCount": 1, "entrySize": 1, "from": 0,
"to": preload_entries,
+ "dataPatternBase64": data_gen_params.data_pattern_base64}
)
preloader.run()
@@ -202,6 +203,11 @@ class RebalancePersistentTest(IgniteTest):
return result
+ def get_reb_params(self, **kwargs):
+ return super().get_reb_params(**kwargs)._replace(
+ persistent=True
+ )
+
def await_and_check_rebalance(service: IgniteService, rebalance_nodes: list =
None, is_full: bool = True):
"""
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
b/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
index 18ebb173c5a..9a5351dd238 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/util.py
@@ -29,6 +29,7 @@ from ignitetest.services.utils.ignite_configuration import
IgniteConfiguration,
from ignitetest.services.utils.ignite_configuration.data_storage import
DataRegionConfiguration
from ignitetest.tests.util import DataGenerationParams
from ignitetest.utils.enum import constructible
+from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import IgniteVersion
NUM_NODES = 4
@@ -54,6 +55,8 @@ class RebalanceParams(NamedTuple):
throttle: int = None
persistent: bool = False
jvm_opts: list = None
+ modules: list = []
+ plugins: list = []
class RebalanceMetrics(NamedTuple):
@@ -102,9 +105,13 @@ def start_ignite(test_context, ignite_version: str,
rebalance_params: RebalanceP
rebalance_batches_prefetch_count=rebalance_params.batches_prefetch_count,
rebalance_throttle=rebalance_params.throttle)
+ if rebalance_params.plugins:
+ node_config = node_config._replace(plugins=[*node_config.plugins,
*rebalance_params.plugins])
+
ignites = IgniteService(test_context, config=node_config,
num_nodes=node_count if
rebalance_params.trigger_event else node_count - 1,
- jvm_opts=rebalance_params.jvm_opts)
+ jvm_opts=rebalance_params.jvm_opts,
+ modules=rebalance_params.modules)
ignites.start()
return ignites
@@ -247,3 +254,24 @@ def check_type_of_rebalancing(rebalance_nodes: list,
is_full: bool = True):
assert msg in i, i
return output
+
+
+class BaseRebalanceTest(IgniteTest):
+ """
+ Base class for rebalance tests.
+ """
+ def get_reb_params(self, **kwargs):
+ """
+ Create rebalance parameters.
+ :param kwargs: RebalanceParams cstor parameters.
+ :return: instance of RebalanceParams.
+ """
+ return RebalanceParams(**kwargs)
+
+ def get_data_gen_params(self, **kwargs):
+ """
+ Create parameters for data generation application.
+ :param kwargs: DataGenerationParams cstor parameters.
+ :return: instance of DataGenerationParams.
+ """
+ return DataGenerationParams(**kwargs)
diff --git a/modules/ducktests/tests/ignitetest/tests/util.py
b/modules/ducktests/tests/ignitetest/tests/util.py
index b8192a3443b..bc40cf860a4 100644
--- a/modules/ducktests/tests/ignitetest/tests/util.py
+++ b/modules/ducktests/tests/ignitetest/tests/util.py
@@ -35,6 +35,8 @@ class DataGenerationParams(NamedTuple):
entry_size: int = 50_000
preloaders: int = 1
index_count: int = 0
+ data_pattern_base64: str = None
+ modules: list = []
@property
def data_region_max_size(self):
@@ -79,8 +81,10 @@ def preload_data(context, config, data_gen_params:
DataGenerationParams, timeout
"entrySize": data_gen_params.entry_size,
"from": _from,
"to": _to,
- "indexCount": data_gen_params.index_count
+ "indexCount": data_gen_params.index_count,
+ "dataPatternBase64": data_gen_params.data_pattern_base64
},
+ modules=data_gen_params.modules,
shutdown_timeout_sec=timeout)
app.start_async()