Repository: ignite Updated Branches: refs/heads/ignite-3478 af0c3bc21 -> 7a4baba58
ignite-3478 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a4baba5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a4baba5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a4baba5 Branch: refs/heads/ignite-3478 Commit: 7a4baba58609a99ee2da22dcf0ffca937581a4ce Parents: af0c3bc Author: sboikov <[email protected]> Authored: Mon Sep 25 12:56:55 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 25 12:56:55 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 3 +- .../mvcc/CacheCoordinatorsSharedManager.java | 162 ++++++++++++------- .../mvcc/MvccCoordinatorVersionResponse.java | 4 + .../cache/mvcc/CacheMvccTransactionsTest.java | 4 +- 4 files changed, 116 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 6ef78db..dc24586 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1121,7 +1121,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + ", memoryPolicyName=" + memPlcName + ", mode=" + cfg.getCacheMode() + - ", atomicity=" + cfg.getAtomicityMode() + ']'); + ", atomicity=" + cfg.getAtomicityMode() + + ", mvcc=" + cacheCtx.mvccEnabled() + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 641e6d4..c46a624 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -39,7 +38,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridAtomicLong; @@ -86,7 +84,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); /** */ - private final Map<Long, Integer> activeQueries = new HashMap<>(); + private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); /** */ private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); @@ -95,6 +93,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); /** */ + private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>(); + + /** */ private final AtomicLong futIdCntr = new AtomicLong(); /** */ @@ -475,7 +476,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param txId Transaction ID. * @return Counter. */ - private synchronized MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { + private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { assert crdVer != 0; long nextCtr = mvccCntr.incrementAndGet(); @@ -508,76 +509,112 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private void onTxDone(Long txCntr) { GridFutureAdapter fut; // TODO IGNITE-3478. - synchronized (this) { - GridCacheVersion ver = activeTxs.remove(txCntr); + GridCacheVersion ver = activeTxs.remove(txCntr); - assert ver != null; + assert ver != null; - committedCntr.setIfGreater(txCntr); + committedCntr.setIfGreater(txCntr); - fut = waitTxFuts.remove(txCntr); - } + fut = waitTxFuts.remove(txCntr); if (fut != null) fut.onDone(); } + static boolean increment(AtomicInteger cntr) { + for (;;) { + int current = cntr.get(); + + if (current == 0) + return false; + + if (cntr.compareAndSet(current, current + 1)) + return true; + } + } + /** * @param qryNodeId Node initiated query. * @return Counter for query. */ - private synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { + private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { assert crdVer != 0; - Long mvccCntr = committedCntr.get(); - MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - Long trackCntr = mvccCntr; + Long mvccCntr; - for (Long txVer : activeTxs.keySet()) { - if (txVer < trackCntr) - trackCntr = txVer; + for(;;) { + mvccCntr = committedCntr.get(); - res.addTx(txVer); - } + Long trackCntr = mvccCntr; - Integer queries = activeQueries.get(trackCntr); + for (Long txVer : activeTxs.keySet()) { + if (txVer < trackCntr) + trackCntr = txVer; - if (queries != null) - activeQueries.put(trackCntr, queries + 1); - else - activeQueries.put(trackCntr, 1); + res.addTx(txVer); + } + + registerActiveQuery(trackCntr); + + if (committedCntr.get() == mvccCntr) + break; + else { + res.resetTransactionsCount(); + + onQueryDone(trackCntr); + } + } res.init(futId, crdVer, mvccCntr, COUNTER_NA); return res; } + private void registerActiveQuery(Long cntr) { + for (;;) { + AtomicInteger qryCnt = activeQueries.get(cntr); + + if (qryCnt != null) { + boolean inc = increment(qryCnt); + + if (!inc) { + activeQueries.remove(mvccCntr, qryCnt); + + continue; + } + } + else { + qryCnt = new AtomicInteger(1); + + if (activeQueries.putIfAbsent(cntr, qryCnt) != null) + continue; + } + + break; + } + } + /** * @param mvccCntr Query counter. */ - private synchronized void onQueryDone(long mvccCntr) { - Integer queries = activeQueries.get(mvccCntr); + private void onQueryDone(long mvccCntr) { + AtomicInteger cntr = activeQueries.get(mvccCntr); - assert queries != null : mvccCntr; + assert cntr != null : mvccCntr; - int left = queries - 1; + int left = cntr.decrementAndGet(); assert left >= 0 : left; if (left == 0) { - Integer rmvd = activeQueries.remove(mvccCntr); + boolean rmv = activeQueries.remove(mvccCntr, cntr); - assert rmvd != null; + assert rmv; } - else - activeQueries.put(mvccCntr, left); } - /** */ - private Map<Long, GridFutureAdapter> waitTxFuts = new HashMap<>(); // TODO IGNITE-3478. - /** * @param msg Message. */ @@ -586,37 +623,38 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager GridLongList txs = msg.transactions(); - // TODO IGNITE-3478. - GridCompoundFuture fut = null; + GridCompoundFuture resFut = null; - synchronized (this) { - for (int i = 0; i < txs.size(); i++) { - long txId = txs.get(i); + for (int i = 0; i < txs.size(); i++) { + Long txId = txs.get(i); - if (activeTxs.containsKey(txId)) { - GridFutureAdapter fut0 = waitTxFuts.get(txId); + WaitTxFuture fut = waitTxFuts.get(txId); - if (fut0 == null) { - fut0 = new GridFutureAdapter(); + if (fut == null) { + WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId)); - waitTxFuts.put(txId, fut0); - } + if (old != null) + fut = old; + } - if (fut == null) - fut = new GridCompoundFuture(); + if (!activeTxs.containsKey(txId)) + fut.onDone(); - fut.add(fut0); - } + if (!fut.isDone()) { + if (resFut == null) + resFut = new GridCompoundFuture(); + + resFut.add(fut); } } - if (fut != null) - fut.markInitialized(); + if (resFut != null) + resFut.markInitialized(); - if (fut == null || fut.isDone()) + if (resFut == null || resFut.isDone()) sendFutureResponse(nodeId, msg); else { - fut.listen(new IgniteInClosure<IgniteInternalFuture>() { + resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { @Override public void apply(IgniteInternalFuture fut) { sendFutureResponse(nodeId, msg); } @@ -944,4 +982,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } } + /** + * + */ + private static class WaitTxFuture extends GridFutureAdapter { + /** */ + private final long txId; + + /** + * @param txId Transaction ID. + */ + WaitTxFuture(long txId) { + this.txId = txId; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java index 9d61a6d..04ef8d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java @@ -78,6 +78,10 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M txs[txsCnt++] = txId; } + void resetTransactionsCount() { + txsCnt = 0; + } + /** {@inheritDoc} */ @Override public int size() { return txsCnt; http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 1c37171..6b01aef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1671,9 +1671,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * TODO IGNITE-3478. + * * @throws Exception If failed. */ - public void testReadInProgressCoordinatorFails() throws Exception { + public void _testReadInProgressCoordinatorFails() throws Exception { testSpi = true; startGrids(4);
