This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 47d1311 IGNITE-13433 Benchmark confirms operation's latency drop
decrease on Cellular switch comparing to PME-free switch (#8232)
47d1311 is described below
commit 47d131124996cfc3f8a986a3b7406a364424bbf0
Author: Anton Vinogradov <[email protected]>
AuthorDate: Tue Sep 15 10:19:59 2020 +0300
IGNITE-13433 Benchmark confirms operation's latency drop decrease on
Cellular switch comparing to PME-free switch (#8232)
---
.../CellularPreparedTxStreamer.java | 92 +++++++++++++
.../cellular_affinity_test/CellularTxStreamer.java | 112 +++++++++++++++
.../DistributionChecker.java | 4 +-
.../SingleKeyTxStreamerApplication.java | 2 +-
.../ducktest/utils/IgniteAwareApplication.java | 153 ++++++++++++++++-----
modules/ducktests/tests/docker/run_tests.sh | 4 +-
.../tests/ignitetest/services/ignite_app.py | 54 ++++++--
.../tests/ignitetest/services/utils/ignite_spec.py | 9 +-
.../ignitetest/tests/add_node_rebalance_test.py | 6 -
.../ignitetest/tests/cellular_affinity_test.py | 120 +++++++++++++++-
.../tests/ignitetest/tests/pme_free_switch_test.py | 14 +-
.../tests/ignitetest/utils/ignite_test.py | 7 -
12 files changed, 484 insertions(+), 93 deletions(-)
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
new file mode 100644
index 0000000..43424eb
--- /dev/null
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Prepares transactions at specified cell.
+ */
+public class CellularPreparedTxStreamer extends IgniteAwareApplication {
+ /** {@inheritDoc} */
+ @Override protected void run(JsonNode jsonNode) throws Exception {
+ final String cacheName = jsonNode.get("cacheName").asText();
+ final String attr = jsonNode.get("attr").asText();
+ final String cell = jsonNode.get("cell").asText();
+ final int txCnt = jsonNode.get("txCnt").asInt();
+
+ markInitialized();
+
+ waitForActivation();
+
+ IgniteCache<Integer, Integer> cache =
ignite.getOrCreateCache(cacheName);
+
+ log.info("Starting Prepared Txs...");
+
+ Affinity<Integer> aff = ignite.affinity(cacheName);
+
+ int cnt = 0;
+ int i = -1; // Negative keys to have no intersection with load.
+
+ while (cnt != txCnt && !terminated()) {
+ Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i);
+
+ Map<Object, Long> stat = nodes.stream().collect(
+ Collectors.groupingBy(n -> n.attributes().get(attr),
Collectors.counting()));
+
+ assert 1 == stat.keySet().size() :
+ "Partition should be located on nodes from only one cell " +
+ "[key=" + i + ", nodes=" + nodes.size() + ", stat=" + stat
+ "]";
+
+ if (stat.containsKey(cell)) {
+ cnt++;
+
+ Transaction tx = ignite.transactions().txStart();
+
+ cache.put(i, i);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ if (cnt % 100 == 0)
+ log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + ",
cell=" + stat.keySet() + "]");
+ }
+
+ i--;
+ }
+
+ log.info("All transactions prepared (" + cnt + ")");
+
+ while (!terminated()) {
+ log.info("Waiting for SIGTERM.");
+
+ U.sleep(1000);
+ }
+
+ markFinished();
+ }
+}
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java
new file mode 100644
index 0000000..ebfc6f2
--- /dev/null
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Streams transactions to specified cell.
+ */
+public class CellularTxStreamer extends IgniteAwareApplication {
+ /** {@inheritDoc} */
+ @Override public void run(JsonNode jsonNode) throws Exception {
+ String cacheName = jsonNode.get("cacheName").asText();
+ int warmup = jsonNode.get("warmup").asInt();
+ String cell = jsonNode.get("cell").asText();
+ String attr = jsonNode.get("attr").asText();
+
+ markInitialized();
+
+ waitForActivation();
+
+ IgniteCache<Integer, Integer> cache =
ignite.getOrCreateCache(cacheName);
+
+ long[] max = new long[20];
+
+ Arrays.fill(max, -1);
+
+ int key = 0;
+
+ int cnt = 0;
+
+ long initTime = 0;
+
+ boolean record = false;
+
+ Affinity<Integer> aff = ignite.affinity(cacheName);
+
+ while (!terminated()) {
+ key++;
+
+ Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+ Map<Object, Long> stat = nodes.stream().collect(
+ Collectors.groupingBy(n -> n.attributes().get(attr),
Collectors.counting()));
+
+ if (!stat.containsKey(cell))
+ continue;
+
+ cnt++;
+
+ long start = System.currentTimeMillis();
+
+ cache.put(key, key);
+
+ long finish = System.currentTimeMillis();
+
+ long time = finish - start;
+
+ if (!record && cnt > warmup) {
+ record = true;
+
+ initTime = System.currentTimeMillis();
+
+ log.info("Warmup finished");
+ }
+
+ if (record) {
+ for (int i = 0; i < max.length; i++) {
+ if (max[i] <= time) {
+ System.arraycopy(max, i, max, i + 1, max.length - i -
1);
+
+ max[i] = time;
+
+ break;
+ }
+ }
+ }
+
+ if (cnt % 1000 == 0)
+ log.info("Application streamed " + cnt + " transactions
[worst_latency=" + Arrays.toString(max) + "]");
+ }
+
+ recordResult("WORST_LATENCY", Arrays.toString(max));
+ recordResult("STREAMED", cnt - warmup);
+ recordResult("MEASURE_DURATION", System.currentTimeMillis() -
initTime);
+
+ markFinished();
+ }
+}
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
index 2daf63e..22b2c94 100644
---
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
@@ -28,9 +28,7 @@ import
org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
*
*/
public class DistributionChecker extends IgniteAwareApplication {
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
String cacheName = jsonNode.get("cacheName").asText();
String attr = jsonNode.get("attr").asText();
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
index 8c8be15..14b53a9 100644
---
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
@@ -55,7 +55,7 @@ public class SingleKeyTxStreamerApplication extends
IgniteAwareApplication {
if (!record && cnt > warmup) {
record = true;
- initTime = System.currentTimeMillis();;
+ initTime = System.currentTimeMillis();
markInitialized();
}
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
index 107787b..a74b53f 100644
---
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -17,9 +17,14 @@
package org.apache.ignite.internal.ducktest.utils;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -55,8 +60,8 @@ public abstract class IgniteAwareApplication {
/** Terminated. */
private static volatile boolean terminated;
- /** Shutdown hook. */
- private static volatile Thread hook;
+ /** State mutex. */
+ private static final Object stateMux = new Object();
/** Ignite. */
protected Ignite ignite;
@@ -68,7 +73,7 @@ public abstract class IgniteAwareApplication {
* Default constructor.
*/
protected IgniteAwareApplication() {
- Runtime.getRuntime().addShutdownHook(hook = new Thread(() -> {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("SIGTERM recorded.");
if (!finished && !broken)
@@ -79,8 +84,13 @@ public abstract class IgniteAwareApplication {
if (log.isDebugEnabled())
log.debug("Waiting for graceful termination...");
+ int iter = 0;
+
while (!finished && !broken) {
- log.info("Waiting for graceful termination cycle...");
+ log.info("Waiting for graceful termination cycle... [iter=" +
++iter + "]");
+
+ if (iter == 100)
+ dumpThreads();
try {
U.sleep(100);
@@ -101,49 +111,72 @@ public abstract class IgniteAwareApplication {
* Used to marks as started to perform actions. Suitable for async runs.
*/
protected void markInitialized() {
- assert !inited;
+ log.info("Marking as initialized.");
- log.info(APP_INITED);
+ synchronized (stateMux) {
+ assert !inited;
+ assert !finished;
+ assert !broken;
- inited = true;
+ log.info(APP_INITED);
+
+ inited = true;
+ }
}
/**
*
*/
protected void markFinished() {
- assert !finished;
- assert !broken;
+ log.info("Marking as finished.");
- log.info(APP_FINISHED);
+ synchronized (stateMux) {
+ assert inited;
+ assert !finished;
+ assert !broken;
- if (!terminated())
- removeShutdownHook();
+ log.info(APP_FINISHED);
- finished = true;
+ finished = true;
+ }
}
/**
*
*/
- private void markBroken() {
- assert !finished;
- assert !broken;
+ protected void markBroken(Throwable th) {
+ log.info("Marking as broken.");
- log.info(APP_BROKEN);
+ synchronized (stateMux) {
+ if (broken) {
+ log.info("Already marked as broken.");
- removeShutdownHook();
+ return;
+ }
- broken = true;
+ recordResult("ERROR", th.toString());
+
+ assert !finished;
+
+ log.error(APP_BROKEN);
+
+ broken = true;
+ }
}
/**
*
*/
- private void removeShutdownHook() {
- Runtime.getRuntime().removeShutdownHook(hook);
+ private void terminate() {
+ log.info("Marking as initialized.");
+
+ synchronized (stateMux) {
+ assert !terminated;
+
+ log.info(APP_TERMINATED);
- log.info("Shutdown hook removed.");
+ terminated = true;
+ }
}
/**
@@ -157,17 +190,6 @@ public abstract class IgniteAwareApplication {
/**
*
*/
- private void terminate() {
- assert !terminated;
-
- log.info(APP_TERMINATED);
-
- terminated = true;
- }
-
- /**
- *
- */
protected boolean terminated() {
return terminated;
}
@@ -226,12 +248,71 @@ public abstract class IgniteAwareApplication {
catch (Throwable th) {
log.error("Unexpected Application failure... ", th);
- recordResult("ERROR", th.getMessage());
-
- markBroken();
+ if (!broken)
+ markBroken(th);
}
finally {
log.info("Application finished.");
}
}
+
+ /**
+ *
+ */
+ private static void dumpThreads() {
+ ThreadInfo[] infos =
ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
+
+ for (ThreadInfo info : infos) {
+ log.info(info.toString());
+
+ if ("main".equals(info.getThreadName())) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("main\n");
+
+ for (StackTraceElement element : info.getStackTrace()) {
+ sb.append("\tat ").append(element.toString());
+ sb.append('\n');
+ }
+
+ log.info(sb.toString());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ protected void waitForActivation() throws
IgniteInterruptedCheckedException {
+ boolean newApi =
ignite.cluster().localNode().version().greaterThanEqual(2, 9, 0);
+
+ while (newApi ? ignite.cluster().state() != ClusterState.ACTIVE :
!ignite.cluster().active()) {
+ U.sleep(100);
+
+ log.info("Waiting for cluster activation");
+ }
+
+ log.info("Cluster Activated");
+ }
+
+ /**
+ *
+ */
+ protected void waitForRebalanced() throws
IgniteInterruptedCheckedException {
+ boolean possible =
ignite.cluster().localNode().version().greaterThanEqual(2, 8, 0);
+
+ if (possible) {
+ GridCachePartitionExchangeManager<?, ?> mgr =
((IgniteEx)ignite).context().cache().context().exchange();
+
+ while (!mgr.lastFinishedFuture().rebalanced()) {
+ U.sleep(1000);
+
+ log.info("Waiting for cluster rebalance finish");
+ }
+
+ log.info("Cluster Rebalanced");
+ }
+ else
+ throw new UnsupportedOperationException("Operation supported since
2.8.0");
+ }
}
diff --git a/modules/ducktests/tests/docker/run_tests.sh
b/modules/ducktests/tests/docker/run_tests.sh
index d8eb270..93a8b7c 100755
--- a/modules/ducktests/tests/docker/run_tests.sh
+++ b/modules/ducktests/tests/docker/run_tests.sh
@@ -21,7 +21,7 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# DuckerUp parameters are specified with env variables
# Num of cotainers that ducktape will prepare for tests
-IGNITE_NUM_CONTAINERS=${IGNITE_NUM_CONTAINERS:-11}
+IGNITE_NUM_CONTAINERS=${IGNITE_NUM_CONTAINERS:-13}
# Image name to run nodes
default_image_name="ducker-ignite-openjdk-8"
@@ -55,7 +55,7 @@ The options are as follows:
Display this help message.
-n|--num-nodes
- Specify how many nodes to start. Default number of nodes to start: 11.
+ Specify how many nodes to start. Default number of nodes to start: 13 (12
+ 1 used by ducktape).
-j|--max-parallel
Specify max number of tests that can be run in parallel.
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py
b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index b40d01a..9e396f2 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -19,6 +19,9 @@ This module contains the base class to build Ignite aware
application written on
import re
+# pylint: disable=W0622
+from ducktape.errors import TimeoutError
+
from ignitetest.services.utils.ignite_aware import IgniteAwareService
@@ -38,7 +41,7 @@ class IgniteApplicationService(IgniteAwareService):
self.servicejava_class_name = servicejava_class_name
self.java_class_name = java_class_name
self.timeout_sec = timeout_sec
- self.stop_timeout_sec = 10
+ self.params = params
def start(self):
super().start()
@@ -46,25 +49,46 @@ class IgniteApplicationService(IgniteAwareService):
self.logger.info("Waiting for Ignite aware Application (%s) to
start..." % self.java_class_name)
self.await_event("Topology snapshot", self.timeout_sec,
from_the_beginning=True)
-
self.await_event("IGNITE_APPLICATION_INITIALIZED\\|IGNITE_APPLICATION_BROKEN",
self.timeout_sec,
- from_the_beginning=True)
- try:
- self.await_event("IGNITE_APPLICATION_INITIALIZED", 1,
from_the_beginning=True)
- except Exception:
- raise Exception("Java application execution failed. %s" %
self.extract_result("ERROR")) from None
+ self.__check_status("IGNITE_APPLICATION_INITIALIZED",
timeout=self.timeout_sec)
- # pylint: disable=W0221
- def stop_node(self, node, clean_shutdown=True, timeout_sec=20):
- self.logger.info("%s Stopping node %s" % (self.__class__.__name__,
str(node.account)))
- node.account.kill_java_processes(self.servicejava_class_name,
clean_shutdown=clean_shutdown, allow_fail=True)
+ def stop_async(self, clean_shutdown=True):
+ """
+ Stops node in async way.
+ """
+ self.logger.info("%s Stopping node %s" % (self.__class__.__name__,
str(self.nodes[0].account)))
+ self.nodes[0].account.kill_java_processes(self.servicejava_class_name,
clean_shutdown=clean_shutdown,
+ allow_fail=True)
- stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ def await_stopped(self, timeout_sec=10):
+ """
+ Awaits node stop finish.
+ """
+ stopped = self.wait_node(self.nodes[0], timeout_sec=timeout_sec)
assert stopped, "Node %s: did not stop within the specified timeout of
%s seconds" % \
- (str(node.account), str(self.stop_timeout_sec))
+ (str(self.nodes[0].account), str(timeout_sec))
-
self.await_event("IGNITE_APPLICATION_FINISHED\\|IGNITE_APPLICATION_BROKEN",
from_the_beginning=True,
- timeout_sec=timeout_sec)
+ self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec)
+
+ # pylint: disable=W0221
+ def stop_node(self, node, clean_shutdown=True, timeout_sec=10):
+ assert node == self.nodes[0]
+ self.stop_async(clean_shutdown)
+ self.await_stopped(timeout_sec)
+
+ def __check_status(self, desired, timeout=1):
+ self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout,
from_the_beginning=True)
+
+ try:
+ self.await_event("IGNITE_APPLICATION_BROKEN", 1,
from_the_beginning=True)
+ raise Exception("Java application execution failed. %s" %
self.extract_result("ERROR"))
+ except TimeoutError:
+ pass
+
+ try:
+ self.await_event(desired, 1, from_the_beginning=True)
+ except Exception:
+ raise Exception("Java application execution failed.") from None
def clean_node(self, node):
if self.alive(node):
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 124fefb..afb975a 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -22,11 +22,10 @@ import importlib
import json
from abc import ABCMeta, abstractmethod
-from ignitetest.services.utils.ignite_path import IgnitePath
from ignitetest.services.utils.config_template import
IgniteClientConfigTemplate, IgniteServerConfigTemplate
-from ignitetest.utils.version import DEV_BRANCH
-
+from ignitetest.services.utils.ignite_path import IgnitePath
from ignitetest.services.utils.ignite_persistence import IgnitePersistenceAware
+from ignitetest.utils.version import DEV_BRANCH
def resolve_spec(service, context, config, **kwargs):
@@ -147,6 +146,8 @@ class ApacheIgniteNodeSpec(IgniteNodeSpec,
IgnitePersistenceAware):
libs.append("log4j")
libs = list(map(lambda m: self.path.module(m) + "/*", libs))
+ libs.append(IgnitePath(DEV_BRANCH).module("ducktests") + "/*")
+
self.envs = {
'EXCLUDE_TEST_CLASSES': 'true',
'IGNITE_LOG_DIR': self.PERSISTENT_ROOT,
@@ -183,7 +184,7 @@ class ApacheIgniteApplicationSpec(IgniteApplicationSpec,
IgnitePersistenceAware)
}
self.jvm_opts.extend([
- "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file ",
+ "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file",
"-Dlog4j.configDebug=true",
"-DIGNITE_NO_SHUTDOWN_HOOK=true", # allows to perform operations
on app termination.
"-Xmx1G",
diff --git
a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
index 0681424..82bc730 100644
--- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
@@ -47,15 +47,11 @@ class AddNodeRebalanceTest(IgniteTest):
* Put data to it via IgniteClientApp.
* Start one more node and awaits for rebalance to finish.
"""
- self.stage("Start Ignite nodes")
-
node_config =
IgniteConfiguration(version=IgniteVersion(ignite_version))
ignites = IgniteService(self.test_context, config=node_config,
num_nodes=self.NUM_NODES - 1)
ignites.start()
- self.stage("Starting DataGenerationApplication")
-
# This client just put some data to the cache.
app_config = node_config._replace(client_mode=True,
discovery_spi=from_ignite_cluster(ignites))
IgniteApplicationService(self.test_context, config=app_config,
@@ -66,8 +62,6 @@ class AddNodeRebalanceTest(IgniteTest):
ignite = IgniteService(self.test_context,
node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
num_nodes=1)
- self.stage("Starting Ignite node")
-
ignite.start()
start = self.monotonic()
diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
index c957e43..3c38b22 100644
--- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
@@ -22,11 +22,12 @@ from jinja2 import Template
from ignitetest.services.ignite import IgniteService
from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.control_utility import ControlUtility
from ignitetest.services.utils.ignite_configuration import
IgniteConfiguration, IgniteClientConfiguration
from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster
from ignitetest.utils import ignite_versions, version_if
from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, IgniteVersion
+from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST_2_8
# pylint: disable=W0223
@@ -40,6 +41,8 @@ class CellularAffinity(IgniteTest):
CACHE_NAME = "test-cache"
+ PREPARED_TX_CNT = 500 # possible amount at real cluster under load (per
cell).
+
CONFIG_TEMPLATE = """
<property name="cacheConfiguration">
<list>
@@ -55,6 +58,7 @@ class CellularAffinity(IgniteTest):
</property>
<property name="name" value="{{ cacheName }}"/>
<property name="backups" value="{{ nodes }}"/>
+ <property name="atomicityMode" value="TRANSACTIONAL"/>
</bean>
</list>
</property>
@@ -73,15 +77,17 @@ class CellularAffinity(IgniteTest):
@cluster(num_nodes=NUM_NODES * 3 + 1)
@version_if(lambda version: version >= DEV_BRANCH)
@ignite_versions(str(DEV_BRANCH))
- def test(self, ignite_version):
+ def test_distribution(self, ignite_version):
"""
- Test Cellular Affinity scenario (partition distribution).
+ Tests Cellular Affinity scenario (partition distribution).
"""
cell1 = self.start_cell(ignite_version, ['-D' +
CellularAffinity.ATTRIBUTE + '=1'])
self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE +
'=2'], joined_cluster=cell1)
self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE +
'=XXX', '-DRANDOM=42'],
joined_cluster=cell1)
+ ControlUtility(cell1, self.test_context).activate()
+
checker = IgniteApplicationService(
self.test_context,
IgniteClientConfiguration(version=IgniteVersion(ignite_version),
discovery_spi=from_ignite_cluster(cell1)),
@@ -92,15 +98,117 @@ class CellularAffinity(IgniteTest):
checker.run()
- def start_cell(self, version, jvm_opts, joined_cluster=None):
+ # pylint: disable=R0914
+ @cluster(num_nodes=NUM_NODES * (3 + 1))
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+ def test_latency(self, ignite_version):
+ """
+ Tests Cellular switch tx latency.
+ """
+ data = {}
+
+ cell1, prepared_tx_loader1 =
self.start_cell_with_prepared_txs(ignite_version, "C1")
+ _, prepared_tx_loader2 =
self.start_cell_with_prepared_txs(ignite_version, "C2", joined_cluster=cell1)
+ _, prepared_tx_loader3 =
self.start_cell_with_prepared_txs(ignite_version, "C3", joined_cluster=cell1)
+
+ loaders = [prepared_tx_loader1, prepared_tx_loader2,
prepared_tx_loader3]
+
+ failed_loader = prepared_tx_loader3
+
+ tx_streamer1 = self.start_tx_streamer(ignite_version, "C1",
joined_cluster=cell1)
+ tx_streamer2 = self.start_tx_streamer(ignite_version, "C2",
joined_cluster=cell1)
+ tx_streamer3 = self.start_tx_streamer(ignite_version, "C3",
joined_cluster=cell1)
+
+ streamers = [tx_streamer1, tx_streamer2, tx_streamer3]
+
+ for streamer in streamers: # starts tx streaming with latency record
(with some warmup).
+ streamer.start()
+
+ ControlUtility(cell1,
self.test_context).disable_baseline_auto_adjust() # baseline set.
+ ControlUtility(cell1, self.test_context).activate()
+
+ for loader in loaders:
+ loader.await_event("All transactions prepared", 180,
from_the_beginning=True)
+
+ for streamer in streamers:
+ streamer.await_event("Warmup finished", 180,
from_the_beginning=True)
+
+ failed_loader.stop_async() # node left with prepared txs.
+
+ for streamer in streamers:
+ streamer.await_event("Node left topology\\|Node FAILED", 60,
from_the_beginning=True)
+
+ for streamer in streamers: # just an assertion that we have PME-free
switch.
+ streamer.await_event("exchangeFreeSwitch=true", 60,
from_the_beginning=True)
+
+ for streamer in streamers: # waiting for streaming continuation.
+ streamer.await_event("Application streamed", 60)
+
+ for streamer in streamers: # stops streaming and records results.
+ streamer.stop_async()
+
+ for streamer in streamers:
+ streamer.await_stopped()
+
+ cell = streamer.params["cell"]
+
+ data["[%s cell %s]" % ("alive" if cell is not
failed_loader.params["cell"] else "broken", cell)] = \
+ "worst_latency=%s, tx_streamed=%s, measure_duration=%s" % (
+ streamer.extract_result("WORST_LATENCY"),
streamer.extract_result("STREAMED"),
+ streamer.extract_result("MEASURE_DURATION"))
+
+ return data
+
+ def start_tx_streamer(self, version, cell, joined_cluster):
+ """
+ Starts transaction streamer.
+ """
+ return IgniteApplicationService(
+ self.test_context,
+ IgniteClientConfiguration(version=IgniteVersion(version),
properties=self.properties(),
+
discovery_spi=from_ignite_cluster(joined_cluster)),
+
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularTxStreamer",
+ params={"cacheName": CellularAffinity.CACHE_NAME,
+ "attr": CellularAffinity.ATTRIBUTE,
+ "cell": cell,
+ "warmup": 10000},
+ timeout_sec=180)
+
+ def start_cell_with_prepared_txs(self, version, cell_id,
joined_cluster=None):
+ """
+ Starts cell with prepared transactions.
+ """
+ nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE +
'=' + cell_id],
+ CellularAffinity.NUM_NODES - 1, joined_cluster)
+
+ prepared_tx_streamer = IgniteApplicationService( # last server node
at the cell.
+ self.test_context,
+ IgniteConfiguration(version=IgniteVersion(version),
properties=self.properties(),
+ discovery_spi=from_ignite_cluster(nodes)), #
Server node.
+ java_class_name=
+
"org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularPreparedTxStreamer",
+ params={"cacheName": CellularAffinity.CACHE_NAME,
+ "attr": CellularAffinity.ATTRIBUTE,
+ "cell": cell_id,
+ "txCnt": CellularAffinity.PREPARED_TX_CNT},
+ jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id],
+ timeout_sec=180)
+
+ prepared_tx_streamer.start() # starts last server node and creates
prepared txs on it.
+
+ return nodes, prepared_tx_streamer
+
+ def start_cell(self, version, jvm_opts, nodes_cnt=NUM_NODES,
joined_cluster=None):
"""
Starts cell.
"""
- config = IgniteConfiguration(version=IgniteVersion(version),
properties=self.properties())
+ config = IgniteConfiguration(version=IgniteVersion(version),
properties=self.properties(),
+ cluster_state="INACTIVE")
+
if joined_cluster:
config =
config._replace(discovery_spi=from_ignite_cluster(joined_cluster))
- ignites = IgniteService(self.test_context, config,
num_nodes=CellularAffinity.NUM_NODES, jvm_opts=jvm_opts)
+ ignites = IgniteService(self.test_context, config,
num_nodes=nodes_cnt, jvm_opts=jvm_opts)
ignites.start()
diff --git a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
index 395b24b..3b54762 100644
--- a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
@@ -43,12 +43,10 @@ class PmeFreeSwitchTest(IgniteTest):
@ignite_versions(str(DEV_BRANCH), str(LATEST_2_7))
def test(self, ignite_version):
"""
- Test PME free scenario (node stop).
+ Tests PME free scenario (node stop).
"""
data = {}
- self.stage("Starting nodes")
-
config = IgniteConfiguration(
version=IgniteVersion(ignite_version),
caches=[CacheConfiguration(name='test-cache', backups=2,
atomicity_mode='TRANSACTIONAL')]
@@ -58,8 +56,6 @@ class PmeFreeSwitchTest(IgniteTest):
ignites.start()
- self.stage("Starting long_tx_streamer")
-
client_config = config._replace(client_mode=True,
discovery_spi=from_ignite_cluster(ignites, slice(0, self.NUM_NODES - 1)))
@@ -71,8 +67,6 @@ class PmeFreeSwitchTest(IgniteTest):
long_tx_streamer.start()
- self.stage("Starting single_key_tx_streamer")
-
single_key_tx_streamer = IgniteApplicationService(
self.test_context,
client_config,
@@ -85,20 +79,14 @@ class PmeFreeSwitchTest(IgniteTest):
if IgniteVersion(ignite_version) >= V_2_8_0:
ControlUtility(ignites,
self.test_context).disable_baseline_auto_adjust()
- self.stage("Stopping server node")
-
ignites.stop_node(ignites.nodes[self.NUM_NODES - 1])
long_tx_streamer.await_event("Node left topology", 60,
from_the_beginning=True)
time.sleep(30) # keeping txs alive for 30 seconds.
- self.stage("Stopping long_tx_streamer")
-
long_tx_streamer.stop()
- self.stage("Stopping single_key_tx_streamer")
-
single_key_tx_streamer.stop()
data["Worst latency (ms)"] =
single_key_tx_streamer.extract_result("WORST_LATENCY")
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index a4bfe7d..feb5993 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -29,13 +29,6 @@ class IgniteTest(Test):
def __init__(self, test_context):
super().__init__(test_context=test_context)
- def stage(self, msg):
- """
- Print stage mark.
- :param msg: Stage mark message.
- """
- self.logger.info("[TEST_STAGE] " + msg)
-
@staticmethod
def monotonic():
"""