ignite-3484
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/91b99117 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/91b99117 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/91b99117 Branch: refs/heads/ignite-3484 Commit: 91b9911731a387a3199ddbbc22704bc14af09995 Parents: c966451 Author: sboikov <[email protected]> Authored: Wed Sep 6 12:22:22 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 6 17:34:10 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 10 +- .../mvcc/CacheCoordinatorsSharedManager.java | 113 +++-- .../processors/cache/tree/CacheDataTree.java | 2 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 424 ++++++++++++++++++- 4 files changed, 515 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index ed52b85..9a4b17b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1552,18 +1552,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - // TODO IGNITE-3484: need special findCeiling method. + // TODO IGNITE-3484: need special method. GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, topVer, mvccCntr), - null, - CacheDataRowAdapter.RowData.NO_KEY); + new MvccSearchRow(cacheId, key, topVer + 1, mvccCntr)/*, + CacheDataRowAdapter.RowData.NO_KEY*/); CacheDataRow row = null; if (cur.next()) row = cur.get(); - afterRowFound(row, key); + assert row == null || key.equals(row.key()); + + //afterRowFound(row, key); return row; } http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 807d18a..f780922 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -34,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -52,15 +55,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory(); /** */ - private final AtomicLong mvccCntr = new AtomicLong(0L); + private final AtomicLong mvccCntr = new AtomicLong(1L); /** */ - private final AtomicLong committedCntr = new AtomicLong(0L); + private final GridAtomicLong committedCntr = new GridAtomicLong(1L); /** */ private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>(); /** */ + private final Map<Long, Integer> activeQueries = new HashMap<>(); + + /** */ private final ConcurrentMap<Long, MvccCounterFuture> cntrFuts = new ConcurrentHashMap<>(); /** */ @@ -210,21 +216,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } } - - /** - * @param txId Transaction ID. - * @return Counter. - */ - private long assignTxCounter(GridCacheVersion txId) { - long nextCtr = mvccCntr.incrementAndGet(); - - Object old = activeTxs.put(txId, nextCtr); - - assert old == null : txId; - - return nextCtr; - } - /** * @param nodeId Sender node ID. * @param msg Message. @@ -322,7 +313,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param msg Message. */ private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { - activeTxs.remove(msg.txId()); + onTxDone(msg.txId()); if (!msg.skipResponse()) { try { @@ -359,19 +350,93 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } /** + * @param txId Transaction ID. + * @return Counter. + */ + private synchronized long assignTxCounter(GridCacheVersion txId) { + long nextCtr = mvccCntr.getAndIncrement(); + + Object old = activeTxs.put(txId, nextCtr); + + assert old == null : txId; + + return nextCtr; + } + + /** + * @param txId Transaction ID. + */ + private synchronized void onTxDone(GridCacheVersion txId) { + Long cntr = activeTxs.remove(txId); + + assert cntr != null; + + committedCntr.setIfGreater(cntr); + } + + /** * @param qryNodeId Node initiated query. * @return Counter for query. */ - private long assignQueryCounter(UUID qryNodeId) { - // TODO IGNITE-3478 - return 3; + private synchronized long assignQueryCounter(UUID qryNodeId) { + Long mvccCntr = committedCntr.get(); + + Long minActive = minActiveTx(); + + if (minActive != null && minActive < mvccCntr) + mvccCntr = minActive - 1; + + Integer queries = activeQueries.get(mvccCntr); + + if (queries != null) + activeQueries.put(mvccCntr, queries + 1); + else + activeQueries.put(mvccCntr, 1); + + return mvccCntr; } /** - * @param cntr Query counter. + * @param mvccCntr Query counter. */ - private void onQueryDone(long cntr) { - // TODO IGNITE-3478 + private synchronized void onQueryDone(long mvccCntr) { + Integer queries = activeQueries.get(mvccCntr); + + assert queries != null : mvccCntr; + + int left = queries - 1; + + assert left >= 0 : left; + + if (left == 0) + activeQueries.remove(mvccCntr); + } + + private synchronized long cleanupVersion() { + long cntr = committedCntr.get(); + + Long minActive = minActiveTx(); + + if (minActive != null && minActive < cntr) + cntr = minActive - 1; + + for (Long qryCntr : activeQueries.keySet()) { + if (qryCntr <= cntr) + cntr = qryCntr - 1; + } + + return cntr; + } + + @Nullable private Long minActiveTx() { + Long min = null; + + for (Map.Entry<GridCacheVersion, Long> e : activeTxs.entrySet()) { + if (min == null || e.getValue() < min) + min = e.getValue(); + } + + return min; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index e846768..4b4860b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -163,7 +163,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { cmp = Long.compare(row.mvccUpdateTopologyVersion(), mvccTopVer); if (cmp != 0) - return 0; + return cmp; long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx); http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 19f1dc7..4c6b206 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -18,21 +18,34 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -158,7 +171,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { startGridsMultiThreaded(SRVS); try { - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { logCacheInfo(ccfg); ignite(0).createCache(ccfg); @@ -211,10 +224,383 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testSimplePutGetAll() throws Exception { + Ignite node = startGrid(0); + + IgniteTransactions txs = node.transactions(); + + final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0)); + + final int KEYS = 10_000; + + Set<Integer> keys = new HashSet<>(); + + for (int k = 0; k < KEYS; k++) + keys.add(k); + + Map<Object, Object> map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + + for (int v = 0; v < 3; v++) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int k = 0; k < KEYS; k++) { + if (k % 2 == 0) + cache.put(k, v); + } + + tx.commit(); + } + + map = cache.getAll(keys); + + for (int k = 0; k < KEYS; k++) { + if (k % 2 == 0) + assertEquals(v, map.get(k)); + else + assertNull(map.get(k)); + } + + assertEquals(KEYS / 2, map.size()); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + map = cache.getAll(keys); + + for (int k = 0; k < KEYS; k++) { + if (k % 2 == 0) + assertEquals(v, map.get(k)); + else + assertNull(map.get(k)); + } + + assertEquals(KEYS / 2, map.size()); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutAllGetAll() throws Exception { + final int RANGE = 20; + + final long time = 10_000; + + final int writers = 4; + + final int readers = 4; + + GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> writer = + new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>() { + @Override public void apply(Integer idx, IgniteCache<Object, Object> cache, AtomicBoolean stop) { + final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int min = idx * RANGE; + int max = min + RANGE; + + info("Thread range [min=" + min + ", max=" + max + ']'); + + Map<Integer, Integer> map = new HashMap<>(); + + int v = idx * 1_000_000; + + while (!stop.get()) { + while (map.size() < RANGE) + map.put(rnd.nextInt(min, max), v); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + map.clear(); + + v++; + } + + info("Writer done, updates: " + v); + } + }; + + GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> reader = + new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>() { + @Override public void apply(Integer idx, IgniteCache<Object, Object> cache, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Set<Integer> keys = new LinkedHashSet<>(); + + Map<Integer, Set<Integer>> uniqueReads = new HashMap<>(); + + for (int i = 0; i < writers; i++) + uniqueReads.put(i, new HashSet<Integer>()); + + while (!stop.get()) { + int range = rnd.nextInt(0, writers); + + int min = range * RANGE; + int max = min + RANGE; + + while (keys.size() < RANGE) + keys.add(rnd.nextInt(min, max)); + + Map<Object, Object> map = cache.getAll(keys); + + assertTrue("Invalid map size: " + map.size(), + map.isEmpty() || map.size() == RANGE); + + Integer val0 = null; + + for (Map.Entry<Object, Object> e: map.entrySet()) { + Object val = e.getValue(); + + assertNotNull(val); + + if (val0 == null) { + uniqueReads.get(range).add((Integer)val); + + val0 = (Integer)val; + } + else { + if (!F.eq(val0, val)) { + assertEquals("Unexpected value [range=" + range + ", key=" + e.getKey() + ']', + val0, + val); + } + } + } + + keys.clear(); + } + + info("Reader done, unique reads: "); + + for (Map.Entry<Integer, Set<Integer>> e : uniqueReads.entrySet()) + info("Range [idx=" + e.getKey() + ", uniqueReads=" + e.getValue().size() + ']'); + } + }; + + readWriteTest(time, writers, readers, null, writer, reader); + } + + /** + * @throws Exception If failed. + */ + public void testAccountsSumGetAll() throws Exception { + final int ACCOUNTS = 20; + + final int ACCOUNT_START_VAL = 1000; + + final long time = 10_000; + + final int writers = 1; + + final int readers = 1; + + final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() { + @Override public void apply(IgniteCache<Object, Object> cache) { + final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + Map<Integer, Account> accounts = new HashMap<>(); + + for (int i = 0; i < ACCOUNTS; i++) + accounts.put(i, new Account(ACCOUNT_START_VAL)); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(accounts); + + tx.commit(); + } + } + }; + + GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> writer = + new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>() { + @Override public void apply(Integer idx, IgniteCache<Object, Object> cache, AtomicBoolean stop) { + final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cnt = 0; + + while (!stop.get()) { + cnt++; + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer id1 = rnd.nextInt(ACCOUNTS); + Integer id2 = rnd.nextInt(ACCOUNTS); + + if (id1.equals(id2)) + continue; + + Account a1; + Account a2; + + TreeSet<Integer> keys = new TreeSet<>(); + + keys.add(id1); + keys.add(id2); + + Map<Object, Object> accounts = cache.getAll(keys); + + a1 = (Account)accounts.get(id1); + a2 = (Account)accounts.get(id1); + + assertNotNull(a1); + assertNotNull(a2); + + cache.put(id1, new Account(a1.val + 1)); + cache.put(id2, new Account(a2.val - 1)); + + tx.commit(); + } + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> reader = + new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>() { + @Override public void apply(Integer idx, IgniteCache<Object, Object> cache, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Set<Integer> keys = new LinkedHashSet<>(); + + while (!stop.get()) { + while (keys.size() < ACCOUNTS) + keys.add(rnd.nextInt(ACCOUNTS)); + + Map<Object, Object> accounts = cache.getAll(keys); + + assertEquals(ACCOUNTS, accounts.size()); + + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + Account account = (Account)accounts.get(i); + + assertNotNull(account); + + sum += account.val; + } + + assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); + } + + if (idx == 0) { + Map<Object, Object> accounts = cache.getAll(keys); + + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + Account account = (Account)accounts.get(i); + + info("Account [id=" + i + ", val=" + account.val + ']'); + + sum += account.val; + } + + info("Sum: " + sum); + } + } + }; + + readWriteTest(time, writers, readers, init, writer, reader); + } + + /** + * @param time Test time. + * @param writers Number of writers. + * @param readers Number of readers. + * @param writer Writers threads closure. + * @param reader Readers threads closure. + * @throws Exception If failed. + */ + private void readWriteTest(final long time, + final int writers, + final int readers, + IgniteInClosure<IgniteCache<Object, Object>> init, + final GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> writer, + final GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> reader) throws Exception { + final Ignite ignite = startGrid(0); + + final IgniteCache<Object, Object> cache = ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0)); + + if (init != null) + init.apply(cache); + + final long stopTime = U.currentTimeMillis() + time; + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + final AtomicInteger writerIdx = new AtomicInteger(); + + IgniteInternalFuture<?> writeFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + int idx = writerIdx.getAndIncrement(); + + writer.apply(idx, cache, stop); + } + catch (Throwable e) { + error("Unexpected error: " + e, e); + + stop.set(true); + + fail("Unexpected error: " + e); + } + + return null; + } + }, writers, "writer"); + + final AtomicInteger readerIdx = new AtomicInteger(); + + IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + int idx = readerIdx.getAndIncrement(); + + reader.apply(idx, cache, stop); + } + catch (Throwable e) { + error("Unexpected error: " + e, e); + + stop.set(true); + + fail("Unexpected error: " + e); + } + + return null; + } + }, readers, "reader"); + + while (System.currentTimeMillis() < stopTime && !stop.get()) + Thread.sleep(1000); + + stop.set(true); + + writeFut.get(); + readFut.get(); + } + finally { + stop.set(true); + } + } + + /** * @return Cache configurations. */ - private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { - List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + private List<CacheConfiguration<Object, Object>> cacheConfigurations() { + List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>(); ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0)); ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1)); @@ -262,16 +648,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @param backups Number of backups. * @return Cache configuration. */ - private CacheConfiguration<Integer, Integer> cacheConfiguration( + private CacheConfiguration<Object, Object> cacheConfiguration( CacheMode cacheMode, CacheWriteSynchronizationMode syncMode, int backups) { - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); ccfg.setCacheMode(cacheMode); ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setWriteSynchronizationMode(syncMode); ccfg.setMvccEnabled(true); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 1)); if (cacheMode == PARTITIONED) ccfg.setBackups(backups); @@ -299,4 +686,31 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { assertTrue(ackFuts.isEmpty()); } } + + /** + * + */ + static class Account { + /** */ + private final int val; + + /** + * @param val Value. + */ + public Account(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } }
