Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 921df28ce -> 6d409c6b9


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d409c6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d409c6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d409c6b

Branch: refs/heads/ignite-3479
Commit: 6d409c6b959f7fb7e10ce2e09eac4fadd4798b71
Parents: 921df28
Author: sboikov <[email protected]>
Authored: Mon Sep 25 12:11:52 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Sep 25 12:11:52 2017 +0300

----------------------------------------------------------------------
 .../mvcc/CacheCoordinatorsSharedManager.java    | 77 +++++++++++---------
 1 file changed, 44 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6d409c6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 6850bce..6b5f99c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -50,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -87,7 +84,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = 
new ConcurrentSkipListMap<>();
 
     /** */
-    private final ConcurrentHashMap8<Long, AtomicInteger> activeQueries = new 
ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, AtomicInteger> activeQueries = new 
ConcurrentHashMap<>();
 
     /** */
     private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new 
ConcurrentHashMap<>();
@@ -96,6 +93,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new 
ConcurrentHashMap<>();
 
     /** */
+    private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new 
ConcurrentHashMap<>();
+
+    /** */
     private final AtomicLong futIdCntr = new AtomicLong();
 
     /** */
@@ -476,7 +476,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @param txId Transaction ID.
      * @return Counter.
      */
-    private synchronized MvccCoordinatorVersionResponse 
assignTxCounter(GridCacheVersion txId, long futId) {
+    private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion 
txId, long futId) {
         assert crdVer != 0;
 
         long nextCtr = mvccCntr.incrementAndGet();
@@ -511,17 +511,15 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private void onTxDone(Long txCntr) {
         GridFutureAdapter fut; // TODO IGNITE-3478.
 
-        synchronized (this) {
-            GridCacheVersion ver = activeTxs.remove(txCntr);
+        GridCacheVersion ver = activeTxs.remove(txCntr);
 
-            assert ver != null;
+        assert ver != null;
 
-            committedCntr.setIfGreater(txCntr);
+        committedCntr.setIfGreater(txCntr);
 
-            fut = waitTxFuts.remove(txCntr);
+        fut = waitTxFuts.remove(txCntr);
 
-            TestDebugLog.addMessage3("tx done", txCntr, null, null);
-        }
+        TestDebugLog.addMessage3("tx done", txCntr, null, null);
 
         if (fut != null)
             fut.onDone();
@@ -625,8 +623,20 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         }
     }
 
-    /** */
-    private Map<Long, GridFutureAdapter> waitTxFuts = new HashMap<>(); // TODO 
IGNITE-3478.
+    /**
+     *
+     */
+    private static class WaitTxFuture extends GridFutureAdapter {
+        /** */
+        private final long txId;
+
+        /**
+         * @param txId Transaction ID.
+         */
+        WaitTxFuture(long txId) {
+            this.txId = txId;
+        }
+    }
 
     /**
      * @param msg Message.
@@ -636,37 +646,38 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         GridLongList txs = msg.transactions();
 
-        // TODO IGNITE-3478.
-        GridCompoundFuture fut = null;
+        GridCompoundFuture resFut = null;
 
-        synchronized (this) {
-            for (int i = 0; i < txs.size(); i++) {
-                long txId = txs.get(i);
+        for (int i = 0; i < txs.size(); i++) {
+            Long txId = txs.get(i);
 
-                if (activeTxs.containsKey(txId)) {
-                    GridFutureAdapter fut0 = waitTxFuts.get(txId);
+            WaitTxFuture fut = waitTxFuts.get(txId);
 
-                    if (fut0 == null) {
-                        fut0 = new GridFutureAdapter();
+            if (fut == null) {
+                WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new 
WaitTxFuture(txId));
 
-                        waitTxFuts.put(txId, fut0);
-                    }
+                if (old != null)
+                    fut = old;
+            }
 
-                    if (fut == null)
-                        fut = new GridCompoundFuture();
+            if (!activeTxs.containsKey(txId))
+                fut.onDone();
 
-                    fut.add(fut0);
-                }
+            if (!fut.isDone()) {
+                if (resFut == null)
+                    resFut = new GridCompoundFuture();
+
+                resFut.add(fut);
             }
         }
 
-        if (fut != null)
-            fut.markInitialized();
+        if (resFut != null)
+            resFut.markInitialized();
 
-        if (fut == null || fut.isDone())
+        if (resFut == null || resFut.isDone())
             sendFutureResponse(nodeId, msg);
         else {
-            fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+            resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
                 @Override public void apply(IgniteInternalFuture fut) {
                     sendFutureResponse(nodeId, msg);
                 }

Reply via email to