Repository: ignite Updated Branches: refs/heads/ignite-6149 9d90972df -> 7a9943265
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a994326 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a994326 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a994326 Branch: refs/heads/ignite-6149 Commit: 7a99432655307acf7c5e190e20952322f68e0e4f Parents: 9d90972 Author: sboikov <[email protected]> Authored: Wed Sep 13 12:44:29 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 13 13:40:25 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 21 ++-- .../cache/mvcc/CacheMvccTransactionsTest.java | 109 +++++++++++++++++++ 2 files changed, 121 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7a994326/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 3e699ac..a507985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1375,6 +1375,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager while (cur.next()) { CacheDataRow oldVal = cur.get(); + assert oldVal.link() != 0 : oldVal; + if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && activeTxs.contains(oldVal.mvccCounter())) { if (waitTxs == null) @@ -1384,17 +1386,22 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager waitTxs.add(oldVal.mvccCounter()); } - else if (!first) { + else { + // Should not delete oldest version which is less then cleanup version . int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); if (cmp <= 0) { - boolean rmvd = dataTree.removex(oldVal); + if (first) + first = false; + else { + boolean rmvd = dataTree.removex(oldVal); + + assert rmvd; - assert rmvd; + rowStore.removeRow(oldVal.link()); + } } } - - first = false; } return waitTxs; @@ -1629,10 +1636,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccCoordinatorVersion ver) throws IgniteCheckedException { -// log.info("mvccFind [k=" + key.value(cctx.cacheObjectContext(), false) + -// ", topVer=" + ver.topologyVersion() + -// ", cntr=" + ver.counter() + ']'); - key.valueBytes(cctx.cacheObjectContext()); int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; http://git-wip-us.apache.org/repos/asf/ignite/blob/7a994326/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index a5fd61e..002da40 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.util.lang.GridInClosure3; @@ -472,6 +473,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final Integer key1 = primaryKey(ignite(0).cache(cache.getName())); final Integer key2 = primaryKey(ignite(1).cache(cache.getName())); + info("Test keys [key1=" + key1 + ", key2=" + key2 + ']'); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(key1, 1); cache.put(key2, 1); @@ -582,6 +585,112 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCleanupWaitsForGet() throws Exception { + boolean vals[] = {true, false}; + + for (boolean otherPuts : vals) { + for (boolean putOnStart : vals) { + cleanupWaitsForGet(otherPuts, putOnStart); + + afterTest(); + } + } + } + + /** + * @param otherPuts {@code True} to update unrelated keys to increment mvcc counter. + * @param putOnStart {@code True} to put data in cache before getAll. + * @throws Exception If failed. + */ + private void cleanupWaitsForGet(boolean otherPuts, final boolean putOnStart) throws Exception { + info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + putOnStart + "]"); + + testSpi = true; + + client = false; + + final Ignite srv = startGrid(0); + + client = true; + + final Ignite client = startGrid(1); + + awaitPartitionMapExchange(); + + final IgniteCache<Object, Object> srvCache = + srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 16)); + + final Integer key1 = 1; + final Integer key2 = 2; + + if (putOnStart) { + try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + srvCache.put(key1, 0); + srvCache.put(key2, 0); + + tx.commit(); + } + } + + if (otherPuts) { + for (int i = 0; i < 3; i++) { + try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + srvCache.put(1_000_000 + i, 99); + + tx.commit(); + } + } + } + + TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + + clientSpi.blockMessages(GridNearGetRequest.class, getTestIgniteInstanceName(0)); + + IgniteInternalFuture<?> getFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName()); + + Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2)); + + if (putOnStart) { + assertEquals(2, vals.size()); + assertEquals(0, (Object)vals.get(key1)); + assertEquals(0, (Object)vals.get(key2)); + } + else + assertEquals(0, vals.size()); + + return null; + } + }, "get-thread"); + + clientSpi.waitForBlocked(); + + for (int i = 0; i < 5; i++) { + try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + srvCache.put(key1, i + 1); + srvCache.put(key2, i + 1); + + tx.commit(); + } + } + + clientSpi.stopBlock(true); + + getFut.get(); + + IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName()); + + Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2)); + + assertEquals(2, vals.size()); + assertEquals(5, (Object)vals.get(key1)); + assertEquals(5, (Object)vals.get(key2)); + } + + /** + * @throws Exception If failed. + */ public void testPutAllGetAll_SingleNode() throws Exception { putAllGetAll(1, 0, 0, 64); }
