Repository: ignite Updated Branches: refs/heads/ignite-3479 921df28ce -> 6d409c6b9
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d409c6b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d409c6b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d409c6b Branch: refs/heads/ignite-3479 Commit: 6d409c6b959f7fb7e10ce2e09eac4fadd4798b71 Parents: 921df28 Author: sboikov <[email protected]> Authored: Mon Sep 25 12:11:52 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 25 12:11:52 2017 +0300 ---------------------------------------------------------------------- .../mvcc/CacheCoordinatorsSharedManager.java | 77 +++++++++++--------- 1 file changed, 44 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6d409c6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 6850bce..6b5f99c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -50,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -87,7 +84,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); /** */ - private final ConcurrentHashMap8<Long, AtomicInteger> activeQueries = new ConcurrentHashMap8<>(); + private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); /** */ private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); @@ -96,6 +93,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); /** */ + private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>(); + + /** */ private final AtomicLong futIdCntr = new AtomicLong(); /** */ @@ -476,7 +476,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param txId Transaction ID. * @return Counter. */ - private synchronized MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { + private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { assert crdVer != 0; long nextCtr = mvccCntr.incrementAndGet(); @@ -511,17 +511,15 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private void onTxDone(Long txCntr) { GridFutureAdapter fut; // TODO IGNITE-3478. - synchronized (this) { - GridCacheVersion ver = activeTxs.remove(txCntr); + GridCacheVersion ver = activeTxs.remove(txCntr); - assert ver != null; + assert ver != null; - committedCntr.setIfGreater(txCntr); + committedCntr.setIfGreater(txCntr); - fut = waitTxFuts.remove(txCntr); + fut = waitTxFuts.remove(txCntr); - TestDebugLog.addMessage3("tx done", txCntr, null, null); - } + TestDebugLog.addMessage3("tx done", txCntr, null, null); if (fut != null) fut.onDone(); @@ -625,8 +623,20 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } } - /** */ - private Map<Long, GridFutureAdapter> waitTxFuts = new HashMap<>(); // TODO IGNITE-3478. + /** + * + */ + private static class WaitTxFuture extends GridFutureAdapter { + /** */ + private final long txId; + + /** + * @param txId Transaction ID. + */ + WaitTxFuture(long txId) { + this.txId = txId; + } + } /** * @param msg Message. @@ -636,37 +646,38 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager GridLongList txs = msg.transactions(); - // TODO IGNITE-3478. - GridCompoundFuture fut = null; + GridCompoundFuture resFut = null; - synchronized (this) { - for (int i = 0; i < txs.size(); i++) { - long txId = txs.get(i); + for (int i = 0; i < txs.size(); i++) { + Long txId = txs.get(i); - if (activeTxs.containsKey(txId)) { - GridFutureAdapter fut0 = waitTxFuts.get(txId); + WaitTxFuture fut = waitTxFuts.get(txId); - if (fut0 == null) { - fut0 = new GridFutureAdapter(); + if (fut == null) { + WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId)); - waitTxFuts.put(txId, fut0); - } + if (old != null) + fut = old; + } - if (fut == null) - fut = new GridCompoundFuture(); + if (!activeTxs.containsKey(txId)) + fut.onDone(); - fut.add(fut0); - } + if (!fut.isDone()) { + if (resFut == null) + resFut = new GridCompoundFuture(); + + resFut.add(fut); } } - if (fut != null) - fut.markInitialized(); + if (resFut != null) + resFut.markInitialized(); - if (fut == null || fut.isDone()) + if (resFut == null || resFut.isDone()) sendFutureResponse(nodeId, msg); else { - fut.listen(new IgniteInClosure<IgniteInternalFuture>() { + resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { @Override public void apply(IgniteInternalFuture fut) { sendFutureResponse(nodeId, msg); }
