Repository: ignite Updated Branches: refs/heads/ignite-5937 bb969db04 -> 3255ce228
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3255ce22 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3255ce22 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3255ce22 Branch: refs/heads/ignite-5937 Commit: 3255ce22813ffc056001ab78df4bf8a2a747365f Parents: bb969db Author: sboikov <[email protected]> Authored: Tue Oct 10 16:33:37 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 10 18:21:14 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManager.java | 43 +++++-- .../cache/IgniteCacheOffheapManagerImpl.java | 32 ++++- .../distributed/dht/GridDhtCacheEntry.java | 5 +- .../persistence/GridCacheOffheapManager.java | 7 ++ .../cache/mvcc/CacheMvccTransactionsTest.java | 125 ++++++++++++++----- 5 files changed, 168 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 9e3d0fb..2c070fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -189,6 +189,21 @@ public interface IgniteCacheOffheapManager { throws IgniteCheckedException; /** + * @param entry Entry. + * @param val Value. + * @param ver Version. + * @param mvccVer Mvcc update version. + * @return {@code True} if value was inserted. + * @throws IgniteCheckedException If failed. + */ + public boolean mvccInitialValue( + GridCacheMapEntry entry, + @Nullable CacheObject val, + GridCacheVersion ver, + MvccCoordinatorVersion mvccVer + ) throws IgniteCheckedException; + + /** * @param primary {@code True} if on primary node. * @param entry Entry. * @param val Value. @@ -220,18 +235,10 @@ public interface IgniteCacheOffheapManager { /** * @param entry Entry. - * @param val Value. - * @param ver Version. - * @param mvccVer Mvcc update version. - * @return {@code True} if value was inserted. * @throws IgniteCheckedException If failed. */ - public boolean mvccInitialValue( - GridCacheMapEntry entry, - @Nullable CacheObject val, - GridCacheVersion ver, - MvccCoordinatorVersion mvccVer - ) throws IgniteCheckedException; + public void mvccRemoveAll(GridCacheMapEntry entry) + throws IgniteCheckedException; /** * @param cctx Cache context. @@ -542,6 +549,7 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. + * @param primary {@code True} if update is executed on primary node. * @param key Key. * @param val Value. * @param ver Version. @@ -557,6 +565,14 @@ public interface IgniteCacheOffheapManager { GridCacheVersion ver, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException; + /** + * @param cctx Cache context. + * @param primary {@code True} if update is executed on primary node. + * @param key Key. + * @param mvccVer Mvcc version. + * @return List of transactions to wait for. + * @throws IgniteCheckedException If failed. + */ @Nullable GridLongList mvccRemove( GridCacheContext cctx, boolean primary, @@ -568,6 +584,13 @@ public interface IgniteCacheOffheapManager { * @param key Key. * @throws IgniteCheckedException If failed. */ + void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; + + /** + * @param cctx Cache context. + * @param key Key. + * @throws IgniteCheckedException If failed. + */ void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 4fb5bfd..2bff203 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -416,8 +416,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager boolean primary, GridCacheMapEntry entry, MvccCoordinatorVersion mvccVer - ) - throws IgniteCheckedException { + ) throws IgniteCheckedException { return dataStore(entry.localPartition()).mvccRemove(entry.context(), primary, entry.key(), @@ -425,6 +424,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public void mvccRemoveAll(GridCacheMapEntry entry) throws IgniteCheckedException { + dataStore(entry.localPartition()).mvccRemoveAll(entry.context(), entry.key()); + } + + /** {@inheritDoc} */ @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part) throws IgniteCheckedException { dataStore(part).updateIndexes(cctx, key); @@ -1558,6 +1562,30 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } } + /** {@inheritDoc} */ + @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { + key.valueBytes(cctx.cacheObjectContext()); + + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + GridCursor<CacheDataRow> cur = dataTree.find( + new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), + new MvccSearchRow(cacheId, key, 1, 1), + CacheDataRowAdapter.RowData.KEY_ONLY); + + while (cur.next()) { + CacheDataRow row = cur.get(); + + assert row.link() != 0; + + boolean rmvd = dataTree.removex(row); + + assert rmvd; + + rowStore.removeRow(row.link()); + } + } + /** * @param cleanupRows Rows to cleanup. * @param findRmv {@code True} if need keep removed row entry. http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 77cc642..a3309a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -585,7 +585,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { ']'); } - removeValue(); + if (cctx.mvccEnabled()) + cctx.offheap().mvccRemoveAll(this); + else + removeValue(); // Give to GC. update(null, 0L, 0L, ver, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 45b78d4..e5a9736 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1294,6 +1294,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ + @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { + CacheDataStore delegate = init0(false); + + delegate.mvccRemoveAll(cctx, key); + } + + /** {@inheritDoc} */ @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataStore delegate = init0(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 1abc116..35ceed1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1296,42 +1296,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testAccountsTxGetAll_SingleNode() throws Exception { - accountsTxGetAll(1, 0, 0, 64, ReadMode.GET_ALL); + accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception { - accountsTxGetAll(1, 0, 0, 1, ReadMode.GET_ALL); + accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception { + accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception { - accountsTxGetAll(4, 2, 0, 64, ReadMode.GET_ALL); + accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception { - accountsTxGetAll(4, 2, 1, 64, ReadMode.GET_ALL); + accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception { - accountsTxGetAll(4, 2, 2, 64, ReadMode.GET_ALL); + accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception { - accountsTxGetAll(1, 0, 0, 1, ReadMode.SCAN); + accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN); } /** @@ -1339,6 +1346,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. * @param cacheParts Number of cache partitions. + * @param withRmvs If {@code true} then in addition to puts tests also executes removes. * @param readMode Read mode. * @throws Exception If failed. */ @@ -1347,6 +1355,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final int clients, int cacheBackups, int cacheParts, + final boolean withRmvs, final ReadMode readMode ) throws Exception @@ -1376,6 +1385,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; + final Set<Integer> rmvdIds = new HashSet<>(); + GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { @@ -1400,8 +1411,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { keys.add(id1); keys.add(id2); - Integer cntr1; - Integer cntr2; + Integer cntr1 = null; + Integer cntr2 = null; try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { MvccTestAccount a1; @@ -1412,28 +1423,74 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { a1 = accounts.get(id1); a2 = accounts.get(id2); - assertNotNull(a1); - assertNotNull(a2); + if (!withRmvs) { + assertNotNull(a1); + assertNotNull(a2); - cntr1 = a1.updateCnt + 1; - cntr2 = a2.updateCnt + 1; + cntr1 = a1.updateCnt + 1; + cntr2 = a2.updateCnt + 1; - cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1)); - cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2)); + cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1)); + cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2)); + } + else { + if (a1 != null || a2 != null) { + if (a1 != null && a2 != null) { + Integer rmvd = null; + + if (rnd.nextInt(10) == 0) { + synchronized (rmvdIds) { + if (rmvdIds.size() < 10) { + rmvd = rnd.nextBoolean() ? id1 : id2; + + assertTrue(rmvdIds.add(rmvd)); + } + } + } + + if (rmvd != null) { + cache.remove(rmvd); + + cache.put(rmvd.equals(id1) ? id2 : id1, + new MvccTestAccount(a1.val + a2.val, 1)); + } + else { + cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); + cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); + } + } + else { + if (a1 == null) { + cache.put(id1, new MvccTestAccount(100, 1)); + cache.put(id2, new MvccTestAccount(a2.val - 100, 1)); + + assertTrue(rmvdIds.remove(id1)); + } + else { + cache.put(id1, new MvccTestAccount(a1.val - 100, 1)); + cache.put(id2, new MvccTestAccount(100, 1)); + + assertTrue(rmvdIds.remove(id2)); + } + } + } + } tx.commit(); } - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + if (!withRmvs) { + Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); - MvccTestAccount a1 = accounts.get(id1); - MvccTestAccount a2 = accounts.get(id2); + MvccTestAccount a1 = accounts.get(id1); + MvccTestAccount a2 = accounts.get(id2); - assertNotNull(a1); - assertNotNull(a2); + assertNotNull(a1); + assertNotNull(a2); - assertTrue(a1.updateCnt >= cntr1); - assertTrue(a2.updateCnt >= cntr2); + assertTrue(a1.updateCnt >= cntr1); + assertTrue(a2.updateCnt >= cntr2); + } } info("Writer finished, updates: " + cnt); @@ -1469,23 +1526,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { else accounts = cache.getAll(keys); - assertEquals(ACCOUNTS, accounts.size()); + if (!withRmvs) + assertEquals(ACCOUNTS, accounts.size()); int sum = 0; for (int i = 0; i < ACCOUNTS; i++) { MvccTestAccount account = accounts.get(i); - assertNotNull(account); - - sum += account.val; + if (account != null) { + sum += account.val; - Integer cntr = lastUpdateCntrs.get(i); + Integer cntr = lastUpdateCntrs.get(i); - if (cntr != null) - assertTrue(cntr <= account.updateCnt); + if (cntr != null) + assertTrue(cntr <= account.updateCnt); - lastUpdateCntrs.put(i, cntr); + lastUpdateCntrs.put(i, cntr); + } + else + assertTrue(withRmvs); } assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); @@ -1501,9 +1561,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < ACCOUNTS; i++) { MvccTestAccount account = accounts.get(i); - info("Account [id=" + i + ", val=" + account.val + ']'); + assertTrue(account != null || withRmvs); + + info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']'); - sum += account.val; + if (account != null) + sum += account.val; } info("Sum: " + sum);
