IgniteDiagnosticMessage: fixed TxEntriesInfoClosure, added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e9457e7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e9457e7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e9457e7 Branch: refs/heads/ignite-2.1.2-exchange Commit: 7e9457e71200bfe6218f7805328d45a42918da92 Parents: b16f725 Author: Igor Seliverstov <[email protected]> Authored: Wed Jun 21 17:02:29 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jun 21 17:02:29 2017 +0300 ---------------------------------------------------------------------- .../internal/IgniteDiagnosticMessage.java | 19 +- .../managers/IgniteDiagnosticMessagesTest.java | 288 +++++++++++++++++++ .../ignite/testframework/GridStringLogger.java | 35 ++- 3 files changed, 333 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e9457e7/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java index 4f37f53..8715e22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal; +import java.io.IOException; import java.nio.ByteBuffer; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashSet; @@ -265,7 +267,7 @@ public class IgniteDiagnosticMessage implements Message { private final int cacheId; /** */ - private final Set<KeyCacheObject> keys; + private Collection<KeyCacheObject> keys; /** * @param cacheId Cache ID. @@ -321,6 +323,21 @@ public class IgniteDiagnosticMessage implements Message { this.keys.addAll(other0.keys); } + + /** + * @param out Output stream. + * @throws IOException If failed. + */ + private void writeObject(java.io.ObjectOutputStream out) + throws IOException { + /* + Transform to List, otherwise Set unmarshalling fails since need + call KeyCacheObject.finishUnmarshal before adding in Set. + */ + this.keys = new ArrayList<>(keys); + + out.defaultWriteObject(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e9457e7/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java index 08dbc66..1d1b519 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java @@ -17,20 +17,29 @@ package org.apache.ignite.internal.managers; +import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -40,12 +49,15 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * @@ -178,6 +190,282 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testSeveralLongRunningTxs() throws Exception { + int timeout = 3500; + + System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, String.valueOf(timeout)); + + try { + testSpi = true; + + startGrid(0); + + GridStringLogger strLog = this.strLog = new GridStringLogger(); + + strLog.logLength(1024 * 100); + + startGrid(1); + + awaitPartitionMapExchange(); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + + final Ignite node0 = ignite(0); + final Ignite node1 = ignite(1); + + node0.createCache(ccfg); + + UUID id0 = node0.cluster().localNode().id(); + + TestRecordingCommunicationSpi.spi(node0).blockMessages(GridNearLockResponse.class, node1.name()); + + IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME); + + int txCnt = 4; + + final List<Integer> keys = primaryKeys(cache, txCnt, 0); + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + + Integer key = keys.get(idx.getAndIncrement() % keys.size()); + + cache.putIfAbsent(key, String.valueOf(key)); + + tx.commit(); + } + + return null; + } + }, txCnt * 2, "tx"); + + U.sleep(timeout * 2); + + assertFalse(fut.isDone()); + + TestRecordingCommunicationSpi.spi(node0).stopBlock(); + + fut.get(); + + String log = strLog.toString(); + + assertTrue(log.contains("Cache entries [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME) + + ", cacheName=" + DEFAULT_CACHE_NAME + "]:")); + assertTrue(countTxKeysInASingleBlock(log) == txCnt); + + assertTrue(log.contains("General node info [id=" + id0)); + } + finally { + System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT); + } + } + + /** + * @param log Log. + * @return Count of keys in the first Cache entries block. + */ + private int countTxKeysInASingleBlock(String log) { + int idx1 = log.indexOf("Cache entries"); + int idx2 = log.indexOf("Local communication statistics"); + + assert idx1 != -1 && idx2 != -1; + + // The first cache entries info block. + String txInfo = log.substring(idx1, idx2); + + String srch = " Key ["; // Search string. + int len = 9; // Search string length. + + int idx0, cnt = 0; + + while ((idx0 = txInfo.indexOf(srch) + len) >= len) { + txInfo = txInfo.substring(idx0); + + cnt++; + } + + return cnt; + } + + /** + * @throws Exception If failed. + */ + public void testLongRunningTx() throws Exception { + System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, "3500"); + + try { + startGrid(0); + + GridStringLogger strLog = this.strLog = new GridStringLogger(); + + startGrid(1); + + awaitPartitionMapExchange(); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + + final Ignite node0 = ignite(0); + final Ignite node1 = ignite(1); + + node0.createCache(ccfg); + + UUID id0 = node0.cluster().localNode().id(); + + final CountDownLatch l1 = new CountDownLatch(1); + final CountDownLatch l2 = new CountDownLatch(1); + + final AtomicReference<Integer> key = new AtomicReference<>(); + + GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>(); + + fut.add(GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = node0.transactions().txStart()) { + IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME); + + key.set(primaryKey(cache)); + + cache.putIfAbsent(key.get(), "dummy val"); + + l1.countDown(); + l2.await(); + + tx.commit(); + } + + return null; + } + }, "tx-1")); + + fut.add(GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = node1.transactions().txStart()) { + l1.await(); + + IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + + cache.replace(key.get(), "dummy val2"); + + tx.commit(); + } + + return null; + } + }, "tx-2")); + + fut.markInitialized(); + + U.sleep(10_000); + + assertFalse(fut.isDone()); + + l2.countDown(); + + fut.get(); + + String log = strLog.toString(); + + assertTrue(log.contains("Cache entries [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME) + ", cacheName=" + DEFAULT_CACHE_NAME + "]:")); + assertTrue(log.contains("General node info [id=" + id0)); + } + finally { + System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoteTx() throws Exception { + int timeout = 3500; + + System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, String.valueOf(timeout)); + + try { + testSpi = true; + + startGrid(0); + + GridStringLogger strLog = this.strLog = new GridStringLogger(); + + strLog.logLength(1024 * 100); + + startGrid(1); + + awaitPartitionMapExchange(); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(1); + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + final Ignite node0 = ignite(0); + final Ignite node1 = ignite(1); + + node0.createCache(ccfg); + + UUID id0 = node0.cluster().localNode().id(); + + TestRecordingCommunicationSpi.spi(node0).blockMessages(GridDhtTxPrepareResponse.class, node1.name()); + + int txCnt = 4; + + final List<Integer> keys = primaryKeys(node1.cache(DEFAULT_CACHE_NAME), txCnt, 0); + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = node1.transactions().txStart()) { + IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + + Integer key = keys.get(idx.getAndIncrement()); + + cache.getAndPut(key, "new-" + key); + + tx.commit(); + } + + return null; + } + }, txCnt, "tx"); + + U.sleep(timeout * 2); + + assertFalse(fut.isDone()); + + TestRecordingCommunicationSpi.spi(node0).stopBlock(); + + fut.get(); + + String log = strLog.toString(); + + assertTrue(log.contains("Related transactions [")); + assertTrue(log.contains("General node info [id=" + id0)); + } + finally { + System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT); + } + } + + /** + * @throws Exception If failed. + */ private void checkBasicDiagnosticInfo() throws Exception { startGrids(3); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e9457e7/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java index 2a25542..ba85bf0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java @@ -38,6 +38,9 @@ public class GridStringLogger implements IgniteLogger { private final boolean dbg; /** */ + private volatile int chars = CHAR_CNT; + + /** */ private final IgniteLogger echo; /** @@ -64,22 +67,38 @@ public class GridStringLogger implements IgniteLogger { } /** + * @param chars History buffer length. + */ + public void logLength(int chars) { + this.chars = chars; + } + + /** + * @return History buffer length. + */ + private int logLength() { + return chars; + } + + /** * @param msg Message to log. */ - private void log(String msg) { + private synchronized void log(String msg) { buf.append(msg).append(U.nl()); if (echo != null) echo.info("[GridStringLogger echo] " + msg); - if (buf.length() > CHAR_CNT) { + int logLength = logLength(); + + if (buf.length() > logLength) { if (echo != null) echo.warning("Cleaning GridStringLogger history."); - buf.delete(0, buf.length() - CHAR_CNT); + buf.delete(0, buf.length() - logLength); } - assert buf.length() <= CHAR_CNT; + assert buf.length() <= logLength; } /** {@inheritDoc} */ @@ -108,7 +127,7 @@ public class GridStringLogger implements IgniteLogger { } /** {@inheritDoc} */ - @Override public void warning(String msg, @Nullable Throwable e) { + @Override public synchronized void warning(String msg, @Nullable Throwable e) { log(msg); if (e != null) @@ -121,7 +140,7 @@ public class GridStringLogger implements IgniteLogger { } /** {@inheritDoc} */ - @Override public void error(String msg, @Nullable Throwable e) { + @Override public synchronized void error(String msg, @Nullable Throwable e) { log(msg); if (e != null) @@ -156,12 +175,12 @@ public class GridStringLogger implements IgniteLogger { /** * Resets logger. */ - public void reset() { + public synchronized void reset() { buf.setLength(0); } /** {@inheritDoc} */ - @Override public String toString() { + @Override public synchronized String toString() { return buf.toString(); } } \ No newline at end of file
