ignite-3478

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af0c3bc2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af0c3bc2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af0c3bc2

Branch: refs/heads/ignite-3479
Commit: af0c3bc2190804cbb39618bacd6e8f262f12a11b
Parents: 9ae39c4
Author: sboikov <[email protected]>
Authored: Fri Sep 22 16:55:24 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 22 16:55:24 2017 +0300

----------------------------------------------------------------------
 .../near/GridNearTxFinishAndAckFuture.java      |  2 +-
 .../near/GridNearTxFinishFuture.java            |  2 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 48 ++++++++------------
 .../cache/mvcc/CoordinatorTxAckRequest.java     | 18 ++++----
 4 files changed, 31 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index 8474ab7..7d03d46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -59,7 +59,7 @@ public class GridNearTxFinishAndAckFuture extends 
GridFutureAdapter<IgniteIntern
                         assert crd != null;
 
                         IgniteInternalFuture<Void> ackFut = 
fut.context().coordinators().ackTxCommit(
-                            crd, tx.nearXidVersion());
+                            crd, tx.mvccCoordinatorVersion());
 
                         ackFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Void>>() {
                             @Override public void 
apply(IgniteInternalFuture<Void> ackFut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 5f18e9b..347a694 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -425,7 +425,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
 
             assert crd != null;
 
-            cctx.coordinators().ackTxRollback(crd, tx.nearXidVersion());
+            cctx.coordinators().ackTxRollback(crd, 
tx.mvccCoordinatorVersion());
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 0050659..641e6d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
@@ -82,7 +83,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
 
     /** */
-    private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new 
ConcurrentHashMap<>();
+    private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = 
new ConcurrentSkipListMap<>();
 
     /** */
     private final Map<Long, Integer> activeQueries = new HashMap<>();
@@ -268,12 +269,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
     /**
      * @param crd Coordinator.
-     * @param txId Transaction ID.
+     * @param mvccVer Transaction version.
      * @return Acknowledge future.
      */
-    public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, 
GridCacheVersion txId) {
+    public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, 
MvccCoordinatorVersion mvccVer) {
         assert crd != null;
-        assert txId != null;
+        assert mvccVer != null;
 
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crd, true);
 
@@ -282,7 +283,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         try {
             cctx.gridIO().sendToGridTopic(crd,
                 MSG_TOPIC,
-                new CoordinatorTxAckRequest(fut.id, txId),
+                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
                 MSG_POLICY);
         }
         catch (ClusterTopologyCheckedException e) {
@@ -299,10 +300,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
     /**
      * @param crd Coordinator.
-     * @param txId Transaction ID.
+     * @param mvccVer Transaction version.
      */
-    public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) {
-        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId);
+    public void ackTxRollback(ClusterNode crd, MvccCoordinatorVersion mvccVer) 
{
+        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, 
mvccVer.counter());
 
         msg.skipResponse(true);
 
@@ -424,7 +425,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @param msg Message.
      */
     private void processCoordinatorTxAckRequest(UUID nodeId, 
CoordinatorTxAckRequest msg) {
-        onTxDone(msg.txId());
+        onTxDone(msg.txCounter());
 
         if (STAT_CNTRS)
             statCntrs[2].update();
@@ -482,10 +483,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
         MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
 
-        for (Long txVer : activeTxs.values())
+        for (Long txVer : activeTxs.keySet())
             res.addTx(txVer);
 
-        Object old = activeTxs.put(txId, nextCtr);
+        Object old = activeTxs.put(nextCtr, txId);
 
         assert old == null : txId;
 
@@ -502,19 +503,19 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     }
 
     /**
-     * @param txId Transaction ID.
+     * @param txCntr Counter assigned to transaction.
      */
-    private void onTxDone(GridCacheVersion txId) {
+    private void onTxDone(Long txCntr) {
         GridFutureAdapter fut; // TODO IGNITE-3478.
 
         synchronized (this) {
-            Long cntr = activeTxs.remove(txId);
+            GridCacheVersion ver = activeTxs.remove(txCntr);
 
-            assert cntr != null;
+            assert ver != null;
 
-            committedCntr.setIfGreater(cntr);
+            committedCntr.setIfGreater(txCntr);
 
-            fut = waitTxFuts.remove(cntr);
+            fut = waitTxFuts.remove(txCntr);
         }
 
         if (fut != null)
@@ -534,7 +535,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         Long trackCntr = mvccCntr;
 
-        for (Long txVer : activeTxs.values()) {
+        for (Long txVer : activeTxs.keySet()) {
             if (txVer < trackCntr)
                 trackCntr = txVer;
 
@@ -592,7 +593,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             for (int i = 0; i < txs.size(); i++) {
                 long txId = txs.get(i);
 
-                if (hasActiveTx(txId)) {
+                if (activeTxs.containsKey(txId)) {
                     GridFutureAdapter fut0 = waitTxFuts.get(txId);
 
                     if (fut0 == null) {
@@ -643,15 +644,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         }
     }
 
-    private boolean hasActiveTx(long txId) {
-        for (Long id : activeTxs.values()) {
-            if (id == txId)
-                return true;
-        }
-
-        return false;
-    }
-
     /**
      * @param topVer Topology version.
      * @return MVCC coordinator for given topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
index 071a411..14cd6a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
@@ -38,7 +38,7 @@ public class CoordinatorTxAckRequest implements 
MvccCoordinatorMessage {
     private long futId;
 
     /** */
-    private GridCacheVersion txId;
+    private long txCntr;
 
     /** */
     private byte flags;
@@ -52,11 +52,11 @@ public class CoordinatorTxAckRequest implements 
MvccCoordinatorMessage {
 
     /**
      * @param futId Future ID.
-     * @param txId Transaction ID.
+     * @param txCntr Counter assigned to transaction.
      */
-    CoordinatorTxAckRequest(long futId, GridCacheVersion txId) {
+    CoordinatorTxAckRequest(long futId, long txCntr) {
         this.futId = futId;
-        this.txId = txId;
+        this.txCntr = txCntr;
     }
 
     /** {@inheritDoc} */
@@ -94,10 +94,10 @@ public class CoordinatorTxAckRequest implements 
MvccCoordinatorMessage {
     }
 
     /**
-     * @return Transaction ID.s
+     * @return Counter assigned tp transaction.
      */
-    public GridCacheVersion txId() {
-        return txId;
+    public long txCounter() {
+        return txCntr;
     }
 
     /** {@inheritDoc} */
@@ -125,7 +125,7 @@ public class CoordinatorTxAckRequest implements 
MvccCoordinatorMessage {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage("txId", txId))
+                if (!writer.writeLong("txCntr", txCntr))
                     return false;
 
                 writer.incrementState();
@@ -160,7 +160,7 @@ public class CoordinatorTxAckRequest implements 
MvccCoordinatorMessage {
                 reader.incrementState();
 
             case 2:
-                txId = reader.readMessage("txId");
+                txCntr = reader.readLong("txCntr");
 
                 if (!reader.isLastRead())
                     return false;

Reply via email to