Repository: ignite Updated Branches: refs/heads/ignite-3478 40d435980 -> 9ae39c402
ignite-3478 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ae39c40 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ae39c40 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ae39c40 Branch: refs/heads/ignite-3478 Commit: 9ae39c402f089bbea043af4d0bca97ffc9c335a0 Parents: 40d4359 Author: sboikov <[email protected]> Authored: Fri Sep 22 16:15:30 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 22 16:15:30 2017 +0300 ---------------------------------------------------------------------- .../mvcc/CacheCoordinatorsSharedManager.java | 6 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 88 ++++++++++++++------ 2 files changed, 65 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9ae39c40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 34a15b1..0050659 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -64,7 +64,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager public static final long COUNTER_NA = 0L; /** */ - public static final boolean STAT_CNTRS = true; + private static final boolean STAT_CNTRS = false; /** */ private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR; @@ -489,10 +489,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager assert old == null : txId; - long cleanupVer = Long.MAX_VALUE; + long cleanupVer = committedCntr.get() - 1; for (Long qryVer : activeQueries.keySet()) { - if (qryVer < cleanupVer) + if (qryVer <= cleanupVer) cleanupVer = qryVer - 1; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ae39c40/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index e5eb0ee..1c37171 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; @@ -242,29 +243,42 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testGetAll1() throws Exception { + public void testActiveQueriesCleanup() throws Exception { startGridsMultiThreaded(SRVS); - try { - client = true; + client = true; - Ignite ignite = startGrid(SRVS); + Ignite srv0 = startGrid(SRVS); - CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 512); + final int NODES = SRVS + 1; - IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 512); - Set<Integer> keys = new HashSet<>(); + srv0.createCache(ccfg); - keys.addAll(primaryKeys(ignite(0).cache(ccfg.getName()), 2)); + final long stopTime = System.currentTimeMillis() + 5000; - Map<Integer, Integer> res = cache.getAll(keys); + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - verifyCoordinatorInternalState(); - } - finally { - stopAllGrids(); - } + IgniteCache cache = ignite(idx % NODES).cache(DEFAULT_CACHE_NAME); + + while (System.currentTimeMillis() < stopTime) { + int keyCnt = rnd.nextInt(10) + 1; + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < keyCnt; i++) + keys.add(rnd.nextInt()); + + cache.getAll(keys); + } + } + }, NODES * 2, "get-thread"); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); } /** @@ -811,6 +825,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCleanupWaitsForGet3() throws Exception { + for (int i = 0; i < 4; i++) { + cleanupWaitsForGet3(i + 1); + + afterTest(); + } + } + + /** + * @param updates Number of updates. + * @throws Exception If failed. + */ + private void cleanupWaitsForGet3(int updates) throws Exception { /* Simulate case when coordinator assigned query version has active transaction, query is delayed, after this active transaction finish and the same key is @@ -833,7 +859,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final Integer key1 = 1; final Integer key2 = 2; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < updates; i++) { try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key1, i); cache.put(key2, i); @@ -874,7 +900,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { clientSpi.waitForBlocked(); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < updates; i++) { try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key1, i + 3); @@ -913,7 +939,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { putFut.get(); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < updates; i++) { try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key2, i + 4); @@ -2082,19 +2108,29 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { assertTrue(ackFuts.isEmpty()); // TODO IGNITE-3478 -// assertTrue(GridTestUtils.waitForCondition( -// new GridAbsPredicate() { -// @Override public boolean apply() { -// Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries"); -// -// return activeQrys.isEmpty(); -// } -// }, 5000) -// ); + // checkActiveQueriesCleanup(node); } } /** + * @param node Node. + * @throws Exception If failed. + */ + private void checkActiveQueriesCleanup(Ignite node) throws Exception { + final CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators(); + + assertTrue(GridTestUtils.waitForCondition( + new GridAbsPredicate() { + @Override public boolean apply() { + Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries"); + + return activeQrys.isEmpty(); + } + }, 5000) + ); + } + + /** * @param caches Caches. * @param rnd Random. * @return Random cache.
