This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c28cefb IGNITE-12340 Extend test coverage of ability to track
system/user time held in transaction - Fixes #7027.
c28cefb is described below
commit c28cefbdbf6e382ebf41bbdcd88eac6697db48a2
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 28 21:20:51 2019 +0300
IGNITE-12340 Extend test coverage of ability to track system/user time held
in transaction - Fixes #7027.
Signed-off-by: Ivan Rakov <[email protected]>
---
.../GridTransactionsSystemUserTimeMetricsTest.java | 724 ++++++++++++++++++---
1 file changed, 629 insertions(+), 95 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
index 2355f72..f6d03d5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridTransactionsSystemUserTimeMetricsTest.java
@@ -16,9 +16,17 @@
*/
package org.apache.ignite.internal.processors.cache;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.LongStream;
+import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
+import javax.management.MBeanException;
+import javax.management.ReflectionException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
@@ -28,9 +36,10 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TransactionsMXBeanImpl;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import
org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.mxbean.TransactionsMXBean;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -65,7 +74,7 @@ import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*
*/
@SystemPropertiesList(value = {
- @WithSystemProperty(key = IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD,
value = "1000"),
+ @WithSystemProperty(key = IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD,
value = "999"),
@WithSystemProperty(key =
IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT, value = "1.0"),
@WithSystemProperty(key =
IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT, value = "5"),
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value =
"500")
@@ -87,50 +96,62 @@ public class GridTransactionsSystemUserTimeMetricsTest
extends GridCommonAbstrac
private static final long SYSTEM_DELAY = 1000;
/** */
- private static final int TX_COUNT_FOR_LOG_THROTTLING_CHECK = 4;
+ private static final long EPSILON = 300;
/** */
- private static final long LONG_TRAN_TIMEOUT = Math.min(SYSTEM_DELAY,
USER_DELAY);
-
- /** */
- private static final String TRANSACTION_TIME_DUMP_REGEX = ".*?ransaction
time dump .*";
+ private static final String TRANSACTION_TIME_DUMP_REGEX = ".*?ransaction
time dump .*?totalTime=[0-9]{1,4}, " +
+ "systemTime=[0-9]{1,4}, userTime=[0-9]{1,4},
cacheOperationsTime=[0-9]{1,4}.*";
/** */
private static final String ROLLBACK_TIME_DUMP_REGEX =
".*?Long transaction time dump
.*?cacheOperationsTime=[0-9]{1,4}.*?rollbackTime=[0-9]{1,4}.*";
/** */
- private static final String TRANSACTION_TIME_DUMPS_SKIPPED_REGEX =
- "Transaction time dumps skipped because of log throttling: " +
TX_COUNT_FOR_LOG_THROTTLING_CHECK / 2;
+ private LogListener logTxDumpLsnr = new
MessageOrderLogListener(TRANSACTION_TIME_DUMP_REGEX);
/** */
- private LogListener logTxDumpLsnr = new
MessageOrderLogListener(TRANSACTION_TIME_DUMP_REGEX);
+ private TransactionDumpListener transactionDumpLsnr = new
TransactionDumpListener(TRANSACTION_TIME_DUMP_REGEX);
/** */
- private final TransactionDumpListener transactionDumpLsnr = new
TransactionDumpListener(TRANSACTION_TIME_DUMP_REGEX);
+ private LogListener rollbackDumpLsnr = new
MessageOrderLogListener(ROLLBACK_TIME_DUMP_REGEX);
/** */
- private final TransactionDumpListener rollbackDumpLsnr = new
TransactionDumpListener(ROLLBACK_TIME_DUMP_REGEX);
+ private static CommonLogProxy testLog = new CommonLogProxy(null);
/** */
- private final TransactionDumpListener transactionDumpsSkippedLsnr =
- new TransactionDumpListener(TRANSACTION_TIME_DUMPS_SKIPPED_REGEX);
+ private final ListeningTestLogger listeningTestLog = new
ListeningTestLogger(false, log());
/** */
- private final ListeningTestLogger testLog = new ListeningTestLogger(false,
log());
+ private static IgniteLogger oldLog;
+
+ /** Flag which is set to true if we need to slow system time. */
+ private volatile boolean slowSystem;
+
+ /** Flag which is set to true if we need to simulate transaction failure.
*/
+ private volatile boolean simulateFailure;
+
+ /** */
+ private static boolean gridStarted = false;
/** */
- private volatile boolean slowPrepare;
+ private Ignite client;
+
+ /** */
+ private IgniteCache<Integer, Integer> cache;
+
+ /** */
+ private Callable<Object> txCallable = () -> {
+ Integer val = cache.get(1);
+
+ cache.put(1, val + 1);
+
+ return null;
+ };
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- testLog.registerListener(logTxDumpLsnr);
- testLog.registerListener(transactionDumpLsnr);
- testLog.registerListener(rollbackDumpLsnr);
- testLog.registerListener(transactionDumpsSkippedLsnr);
-
cfg.setGridLogger(testLog);
boolean isClient = igniteInstanceName.contains(CLIENT);
@@ -155,97 +176,325 @@ public class GridTransactionsSystemUserTimeMetricsTest
extends GridCommonAbstrac
}
/** */
- @Override protected void afterTest() throws Exception {
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ oldLog = GridTestUtils.getFieldValue(IgniteTxAdapter.class, "log");
+
+ GridTestUtils.setFieldValue(IgniteTxAdapter.class, "log", testLog);
+ }
+
+ /** */
+ @Override protected void afterTestsStopped() throws Exception {
+ GridTestUtils.setFieldValue(IgniteTxAdapter.class, "log", oldLog);
+
+ oldLog = null;
+
+ gridStarted = false;
+
stopAllGrids();
- super.afterTest();
+ super.afterTestsStopped();
}
/** */
- @Test
- public void testTransactionsSystemUserTime() throws Exception {
- Ignite ignite = startGrids(2);
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
- Ignite client = startGrid(CLIENT);
+ testLog.setImpl(listeningTestLog);
- IgniteLogger oldLog =
GridTestUtils.getFieldValue(IgniteTxAdapter.class, "log");
+ listeningTestLog.registerListener(logTxDumpLsnr);
+ listeningTestLog.registerListener(transactionDumpLsnr);
+ listeningTestLog.registerListener(rollbackDumpLsnr);
- GridTestUtils.setFieldValue(IgniteTxAdapter.class, "log", testLog);
+ if (!gridStarted) {
+ startGrids(2);
- try {
- assertTrue(client.configuration().isClientMode());
+ gridStarted = true;
+ }
- IgniteCache<Integer, Integer> cache =
client.getOrCreateCache(CACHE_NAME);
+ client = startGrid(CLIENT);
- cache.put(1, 1);
+ cache = client.getOrCreateCache(CACHE_NAME);
- Callable<Object> txCallable = () -> {
- Integer val = cache.get(1);
+ cache.put(1, 1);
- cache.put(1, val + 1);
+ applyJmxParameters(1000L, 0.0, 5);
+ }
- return null;
- };
+ /** */
+ @Override protected void afterTest() throws Exception {
+ stopGrid(CLIENT);
- DynamicMBean tranMBean = metricRegistry(CLIENT, null, TX_METRICS);
+ super.afterTest();
+ }
- //slow user
- slowPrepare = false;
+ /**
+ * Applies JMX parameters to client node in runtime. Parameters are
spreading through the cluster, so this method
+ * allows to change system/user time tracking without restarting the
cluster.
+ *
+ * @param threshold Long transaction time dump threshold.
+ * @param coefficient Transaction time dump samples coefficient.
+ * @param limit Transaction time dump samples per second limit.
+ * @return Transaction MX bean.
+ * @throws Exception If failed.
+ */
+ private TransactionsMXBean applyJmxParameters(Long threshold, Double
coefficient, Integer limit) throws Exception {
+ TransactionsMXBean tmMxBean = getMxBean(
+ CLIENT,
+ "Transactions",
+ TransactionsMXBean.class,
+ TransactionsMXBeanImpl.class
+ );
- doInTransaction(client, () -> {
- Integer val = cache.get(1);
+ if (threshold != null)
+ tmMxBean.setLongTransactionTimeDumpThreshold(threshold);
- doSleep(USER_DELAY);
+ if (coefficient != null)
+ tmMxBean.setTransactionTimeDumpSamplesCoefficient(coefficient);
- cache.put(1, val + 1);
+ if (limit != null)
+ tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(limit);
- return null;
+ return tmMxBean;
+ }
+
+ /**
+ * Allows to make N asynchronous transactions executing {@link
#txCallable} in separate thread pool,
+ * with given delay on user time for each transaction.
+ *
+ * @param client Client.
+ * @param txCnt Transactions count.
+ * @param userDelay User delay for each transaction.
+ */
+ private void doAsyncTransactions(Ignite client, int txCnt, long userDelay)
{
+ ExecutorService executorSrvc = Executors.newFixedThreadPool(txCnt);
+
+ for (int i = 0; i < txCnt; i++) {
+ executorSrvc.submit(() -> {
+ try {
+ doInTransaction(client, () -> {
+ doSleep(userDelay);
+
+ txCallable.call();
+
+ return null;
+ });
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
});
+ }
- assertEquals(2, cache.get(1).intValue());
+ executorSrvc.shutdown();
+ }
- assertTrue((Long)tranMBean.getAttribute(METRIC_TOTAL_USER_TIME) >=
USER_DELAY);
- assertTrue((Long)tranMBean.getAttribute(METRIC_TOTAL_SYSTEM_TIME)
< LONG_TRAN_TIMEOUT);
+ /**
+ * Allows to run a transaction which executes {@link #txCallable} with
given system delay, user delay and
+ * mode.
+ *
+ * @param client Client.
+ * @param sysDelay System delay.
+ * @param userDelay User delay.
+ * @param mode Mode, see {@link TxTestMode}.
+ * @throws Exception If failed.
+ */
+ private void doTransaction(Ignite client, boolean sysDelay, boolean
userDelay, TxTestMode mode) throws Exception {
+ if (sysDelay)
+ slowSystem = true;
- try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
REPEATABLE_READ)) {
- Integer val = cache.get(1);
+ if (mode == TxTestMode.FAIL)
+ simulateFailure = true;
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
REPEATABLE_READ)) {
+ if (userDelay)
doSleep(USER_DELAY);
- cache.put(1, val + 1);
+ txCallable.call();
+ if (mode == TxTestMode.ROLLBACK)
tx.rollback();
- }
+ else
+ tx.commit();
+ }
- assertEquals(2, cache.get(1).intValue());
+ slowSystem = false;
+ simulateFailure = false;
+ }
- assertTrue(rollbackDumpLsnr.check());
+ /**
+ * Allows to run a transaction which executes {@link #txCallable} with
given system delay, user delay and
+ * mode, also measures it's start time, completion time, gets MX bean with
metrics and gives it in a result.
+ *
+ * @param sysDelay System delay.
+ * @param userDelay User delay.
+ * @param mode Mode, see {@link TxTestMode}.
+ * @return Result, see {@link ClientTxTestResult}.
+ * @throws Exception If failed.
+ */
+ private ClientTxTestResult measureClientTransaction(boolean sysDelay,
boolean userDelay, TxTestMode mode) throws Exception {
+ logTxDumpLsnr.reset();
+ rollbackDumpLsnr.reset();
- //slow prepare
- slowPrepare = true;
+ long startTime = System.currentTimeMillis();
- doInTransaction(client, txCallable);
+ try {
+ doTransaction(client, sysDelay, userDelay, mode);
+ }
+ catch (Exception e) {
+ // Giving a time for transaction to rollback.
+ doSleep(500);
+ }
+
+ long completionTime = System.currentTimeMillis();
+
+ ClientTxTestResult res = new ClientTxTestResult(startTime,
completionTime, metricRegistry(CLIENT, null, TX_METRICS));
+
+ return res;
+ }
+
+ /**
+ * Checks that histogram long array is not null and is not empty.
+ *
+ * @param histogram Array.
+ * @param txCnt Total count of transactions, that should be presented in
histogram.
+ */
+ private void checkHistogram(long[] histogram, long txCnt) {
+ assertNotNull(histogram);
- assertTrue(logTxDumpLsnr.check());
+ long cnt = LongStream.of(histogram).sum();
- assertEquals(3, cache.get(1).intValue());
+ assertEquals("Must be " + txCnt + " transaction(s), actually were: " +
cnt + ". Histogram: " + histogram, txCnt, cnt);
+ }
+
+ /**
+ * Checks if metrics have correct values with given delay mode.
+ *
+ * @param res Should contains the result of transaction completion - start
time, completion time and MX bean
+ * from which metrics can be received.
+ * @param userDelayMode If true, we are checking metrics after transaction
with user delay. Otherwise,
+ * we are checking metrics after transaction with system delay.
+ * @throws MBeanException If getting of metric attribute failed.
+ * @throws AttributeNotFoundException If getting of metric attribute
failed.
+ * @throws ReflectionException If getting of metric attribute failed.
+ */
+ private void checkTxDelays(ClientTxTestResult res, boolean userDelayMode)
+ throws MBeanException, AttributeNotFoundException, ReflectionException
{
+ long userTime = (Long)res.mBean.getAttribute(METRIC_TOTAL_USER_TIME);
+ long sysTime = (Long)res.mBean.getAttribute(METRIC_TOTAL_SYSTEM_TIME);
+
+ if (userDelayMode) {
+ assertTrue(userTime >= USER_DELAY);
+ assertTrue(userTime < res.completionTime - res.startTime - sysTime
+ EPSILON);
+ assertTrue(sysTime >= 0);
+ assertTrue(sysTime < EPSILON);
+ }
+ else {
+ assertTrue(userTime >= 0);
+ assertTrue(userTime < EPSILON);
+ assertTrue(sysTime >= SYSTEM_DELAY);
+ assertTrue(sysTime < res.completionTime - res.startTime - userTime
+ EPSILON);
+ }
+
+
checkHistogram((long[])res.mBean.getAttribute(METRIC_SYSTEM_TIME_HISTOGRAM), 2);
+
checkHistogram((long[])res.mBean.getAttribute(METRIC_USER_TIME_HISTOGRAM), 2);
+ }
+
+ /**
+ * Test user time and system time with user delay on committed transaction.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUserDelayOnCommittedTx() throws Exception {
+ ClientTxTestResult res = measureClientTransaction(false, true,
TxTestMode.COMMIT);
+
+ assertTrue(logTxDumpLsnr.check());
+
+ checkTxDelays(res, true);
+ }
- assertTrue((Long)tranMBean.getAttribute(METRIC_TOTAL_SYSTEM_TIME)
>= SYSTEM_DELAY);
+ /**
+ * Test user time and system time with user delay on rolled back
transaction.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUserDelayOnRolledBackTx() throws Exception {
+ ClientTxTestResult res = measureClientTransaction(false, true,
TxTestMode.ROLLBACK);
- long[] sysTimeHisto =
(long[])tranMBean.getAttribute(METRIC_SYSTEM_TIME_HISTOGRAM);
- long[] userTimeHisto =
(long[])tranMBean.getAttribute(METRIC_USER_TIME_HISTOGRAM);
+ assertTrue(rollbackDumpLsnr.check());
- assertNotNull(sysTimeHisto);
- assertNotNull(userTimeHisto);
+ checkTxDelays(res, true);
+ }
- assertTrue(sysTimeHisto != null && sysTimeHisto.length > 0);
- assertTrue(userTimeHisto != null && userTimeHisto.length > 0);
+ /**
+ * Test user time and system time with user delay on failed transaction.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUserDelayOnFailedTx() throws Exception {
+ ClientTxTestResult res = measureClientTransaction(false, true,
TxTestMode.FAIL);
- logTxDumpLsnr.reset();
+ assertTrue(rollbackDumpLsnr.check());
- //checking settings changing via JMX with second client
- Ignite client2 = startGrid(CLIENT_2);
+ checkTxDelays(res, true);
+ }
+
+ /**
+ * Test user time and system time with system delay on committed
transaction.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSystemDelayOnCommittedTx() throws Exception {
+ ClientTxTestResult res = measureClientTransaction(true, false,
TxTestMode.COMMIT);
+ assertTrue(logTxDumpLsnr.check());
+
+ checkTxDelays(res, false);
+ }
+
+ /**
+ * Test user time and system time with system delay on rolled back
transaction.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSystemDelayOnRolledBackTx() throws Exception {
+ ClientTxTestResult res = measureClientTransaction(true, false,
TxTestMode.ROLLBACK);
+
+ assertTrue(rollbackDumpLsnr.check());
+
+ checkTxDelays(res, false);
+ }
+
+ /**
+ * Test user time and system time with system delay on failed transaction.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSystemDelayOnFailedTx() throws Exception {
+ ClientTxTestResult res = measureClientTransaction(true, false,
TxTestMode.FAIL);
+
+ assertTrue(rollbackDumpLsnr.check());
+
+ checkTxDelays(res, false);
+ }
+
+ /**
+ * Test that changing of JMX parameters spreads on cluster correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJmxParametersSpreading() throws Exception {
+ startGrid(CLIENT_2);
+
+ try {
TransactionsMXBean tmMxBean = getMxBean(
CLIENT,
"Transactions",
@@ -253,50 +502,216 @@ public class GridTransactionsSystemUserTimeMetricsTest
extends GridCommonAbstrac
TransactionsMXBeanImpl.class
);
- tmMxBean.setLongTransactionTimeDumpThreshold(0);
- tmMxBean.setTransactionTimeDumpSamplesCoefficient(0.0);
+ TransactionsMXBean tmMxBean2 = getMxBean(
+ CLIENT_2,
+ "Transactions",
+ TransactionsMXBean.class,
+ TransactionsMXBeanImpl.class
+ );
- doInTransaction(client2, txCallable);
+ int oldLimit =
tmMxBean.getTransactionTimeDumpSamplesPerSecondLimit();
+ long oldThreshold = tmMxBean.getLongTransactionTimeDumpThreshold();
+ double oldCoefficient =
tmMxBean.getTransactionTimeDumpSamplesCoefficient();
- assertFalse(logTxDumpLsnr.check());
+ try {
+ int newLimit = 1234;
+ long newThreshold = 99999;
+ double newCoefficient = 0.01;
- //testing dumps limit
+ tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(newLimit);
+ tmMxBean2.setLongTransactionTimeDumpThreshold(newThreshold);
+
tmMxBean.setTransactionTimeDumpSamplesCoefficient(newCoefficient);
- doSleep(1000);
+ assertEquals(newLimit,
tmMxBean2.getTransactionTimeDumpSamplesPerSecondLimit());
+ assertEquals(newThreshold,
tmMxBean.getLongTransactionTimeDumpThreshold());
+
assertTrue(tmMxBean2.getTransactionTimeDumpSamplesCoefficient() -
newCoefficient < 0.0001);
+ }
+ finally {
+ tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(oldLimit);
+ tmMxBean.setLongTransactionTimeDumpThreshold(oldThreshold);
+
tmMxBean.setTransactionTimeDumpSamplesCoefficient(oldCoefficient);
+ }
+ }
+ finally {
+ // CLIENT grid is stopped in afterTest.
+ stopGrid(CLIENT_2);
+ }
+ }
- transactionDumpLsnr.reset();
+ /**
+ * Tests that tx time dumps appear in log correctly and after tx
completion. Also checks that LRT dump
+ * now contains information about current system and user time.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLongTransactionDumpLimit() throws Exception {
+ logTxDumpLsnr.reset();
+ transactionDumpLsnr.reset();
+
+ int txCnt = 10;
+
+ List<String> txLogLines = new LinkedList<>();
+
+ txLogLines.add("First 10 long running transactions \\[total=" + txCnt
+ "\\]");
+
+ for (int i = 0; i < txCnt; i++)
+ txLogLines.add(".*?>>> Transaction .*? systemTime=[0-4]{1,4},
userTime=[0-4]{1,4}.*");
+
+ LogListener lrtLogLsnr = new
MessageOrderLogListener(txLogLines.toArray(new String[0]));
- transactionDumpsSkippedLsnr.reset();
+ listeningTestLog.registerListener(lrtLogLsnr);
- tmMxBean.setTransactionTimeDumpSamplesCoefficient(1.0);
+ applyJmxParameters(5000L, null, txCnt);
-
tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(TX_COUNT_FOR_LOG_THROTTLING_CHECK
/ 2);
+ doAsyncTransactions(client, txCnt, 5200);
- slowPrepare = false;
+ doSleep(3000);
- for (int i = 0; i < TX_COUNT_FOR_LOG_THROTTLING_CHECK; i++)
- doInTransaction(client, txCallable);
+ assertFalse(logTxDumpLsnr.check());
+
+ doSleep(3000);
+
+ assertTrue(logTxDumpLsnr.check());
+ assertTrue(transactionDumpLsnr.check());
+ assertTrue(lrtLogLsnr.check());
+
+ assertEquals(txCnt, transactionDumpLsnr.value());
+ }
+
+ /**
+ * Tests transactions sampling with dumping 100% of transactions in log.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSamplingCoefficient() throws Exception {
+ logTxDumpLsnr.reset();
+ transactionDumpLsnr.reset();
- assertEquals(TX_COUNT_FOR_LOG_THROTTLING_CHECK / 2,
transactionDumpLsnr.value());
+ int txCnt = 10;
- //testing skipped message in log
+ applyJmxParameters(null, 1.0, txCnt);
- doSleep(1000);
+ // Wait for a second to reset hit counter.
+ doSleep(1000);
+ for (int i = 0; i < txCnt; i++)
doInTransaction(client, txCallable);
- assertTrue(transactionDumpsSkippedLsnr.check());
+ assertTrue(logTxDumpLsnr.check());
+ assertTrue(transactionDumpLsnr.check());
- U.log(log, sysTimeHisto);
- U.log(log, userTimeHisto);
- }
- finally {
- GridTestUtils.setFieldValue(IgniteTxAdapter.class, "log", oldLog);
- }
+ assertEquals(txCnt, transactionDumpLsnr.value());
}
/**
+ * Tests transactions sampling with dumping 0% of transactions in log.
*
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoSamplingCoefficient() throws Exception {
+ logTxDumpLsnr.reset();
+
+ applyJmxParameters(null, 0.0, 10);
+
+ int txCnt = 10;
+
+ for (int i = 0; i < txCnt; i++)
+ doInTransaction(client, txCallable);
+
+ assertFalse(logTxDumpLsnr.check());
+ }
+
+ /**
+ * Tests transactions sampling with dumping 100% of transactions in log
but limited by 2 dump records per second.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSamplingLimit() throws Exception {
+ logTxDumpLsnr.reset();
+ transactionDumpLsnr.reset();
+
+ int txCnt = 10;
+ int txDumpCnt = 2;
+
+ LogListener transactionDumpsSkippedLsnr = LogListener
+ .matches("Transaction time dumps skipped because of log
throttling: " + (txCnt - txDumpCnt))
+ .build();
+
+ listeningTestLog.registerListener(transactionDumpsSkippedLsnr);
+
+ applyJmxParameters(null, 1.0, txDumpCnt);
+
+ // Wait for a second to reset hit counter.
+ doSleep(1000);
+
+ for (int i = 0; i < txCnt; i++)
+ doInTransaction(client, txCallable);
+
+ // Wait for a second to reset hit counter.
+ doSleep(1000);
+
+ // One more sample to print information about skipped previous samples.
+ doInTransaction(client, txCallable);
+
+ assertTrue(logTxDumpLsnr.check());
+ assertTrue(transactionDumpLsnr.check());
+ assertTrue(transactionDumpsSkippedLsnr.check());
+
+ assertEquals(txDumpCnt + 1, transactionDumpLsnr.value());
+ }
+
+ /**
+ * Tests transactions sampling with dumping 100% of transactions in log
and no threshold timeout.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSamplingNoThreshold() throws Exception {
+ logTxDumpLsnr.reset();
+ transactionDumpLsnr.reset();
+
+ int txCnt = 10;
+
+ applyJmxParameters(0L, 1.0, txCnt);
+
+ // Wait for a second to reset hit counter.
+ doSleep(1000);
+
+ for (int i = 0; i < txCnt; i++)
+ doInTransaction(client, txCallable);
+
+ assertTrue(logTxDumpLsnr.check());
+ assertTrue(transactionDumpLsnr.check());
+
+ assertEquals(txCnt, transactionDumpLsnr.value());
+ }
+
+ /**
+ * Tests transactions sampling with dumping 100% of transactions in log,
no threshold timeout but with limit of 5
+ * transactions per second.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSamplingNoThresholdWithLimit() throws Exception {
+ logTxDumpLsnr.reset();
+
+ int txCnt = 10;
+
+ applyJmxParameters(0L, 0.0, 5);
+
+ for (int i = 0; i < txCnt; i++)
+ doInTransaction(client, txCallable);
+
+ assertFalse(logTxDumpLsnr.check());
+ }
+
+ /**
+ * Test communication SPI, allowing to simulate system delay on lock and
transaction failure on prepare.
*/
private class TestCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
@@ -305,8 +720,21 @@ public class GridTransactionsSystemUserTimeMetricsTest
extends GridCommonAbstrac
if (msg instanceof GridIoMessage) {
Object msg0 = ((GridIoMessage)msg).message();
- if (slowPrepare && msg0 instanceof GridNearTxPrepareRequest)
- doSleep(SYSTEM_DELAY);
+ if (msg0 instanceof GridNearLockRequest || msg0 instanceof
MvccTxSnapshotRequest) {
+ if (slowSystem) {
+ slowSystem = false;
+
+ doSleep(SYSTEM_DELAY);
+ }
+ }
+
+ if (msg0 instanceof GridNearTxPrepareRequest) {
+ if (simulateFailure) {
+ simulateFailure = false;
+
+ throw new RuntimeException("Simulating prepare
failure.");
+ }
+ }
}
super.sendMessage(node, msg, ackClosure);
@@ -349,4 +777,110 @@ public class GridTransactionsSystemUserTimeMetricsTest
extends GridCommonAbstrac
return counter.get();
}
}
+
+ /**
+ * Enum to define transaction test mode.
+ */
+ enum TxTestMode {
+ /** If transaction should be committed. */
+ COMMIT,
+
+ /** If transaction should be rolled back. */
+ ROLLBACK,
+
+ /** If transaction should fail. */
+ FAIL
+ }
+
+ /**
+ * Result of running of a test transaction.
+ */
+ private static class ClientTxTestResult {
+ /** Start time. */
+ final long startTime;
+
+ /** Completion time. */
+ final long completionTime;
+
+ /** MX bean to receive metrics. */
+ final DynamicMBean mBean;
+
+ /** */
+ public ClientTxTestResult(long startTime, long completionTime,
DynamicMBean mBean) {
+ this.startTime = startTime;
+ this.completionTime = completionTime;
+ this.mBean = mBean;
+ }
+ }
+
+ /** */
+ private static class CommonLogProxy implements IgniteLogger {
+ /** */
+ private IgniteLogger impl;
+
+ /** */
+ public CommonLogProxy(IgniteLogger impl) {
+ this.impl = impl;
+ }
+
+ /** */
+ public void setImpl(IgniteLogger impl) {
+ this.impl = impl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger getLogger(Object ctgr) {
+ return impl.getLogger(ctgr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void trace(String msg) {
+ impl.trace(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void debug(String msg) {
+ impl.debug(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void info(String msg) {
+ impl.info(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void warning(String msg, Throwable e) {
+ impl.warning(msg, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void error(String msg, Throwable e) {
+ impl.error(msg, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTraceEnabled() {
+ return impl.isTraceEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDebugEnabled() {
+ return impl.isDebugEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInfoEnabled() {
+ return impl.isInfoEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isQuiet() {
+ return impl.isQuiet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String fileName() {
+ return impl.fileName();
+ }
+ }
}