Repository: ignite
Updated Branches:
  refs/heads/ignite-6149 91bbb7cd2 -> 085a32190


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/085a3219
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/085a3219
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/085a3219

Branch: refs/heads/ignite-6149
Commit: 085a3219088bc1610d1928c437f075a6fb9a4f9b
Parents: 91bbb7c
Author: sboikov <[email protected]>
Authored: Fri Sep 8 16:02:16 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 8 17:16:26 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  14 +-
 .../cache/IgniteCacheOffheapManager.java        |  13 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  59 +++++-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 112 ++++++++++--
 .../cache/mvcc/CoordinatorFutureResponse.java   | 122 +++++++++++++
 .../cache/mvcc/CoordinatorTxAckResponse.java    | 122 -------------
 .../cache/mvcc/CoordinatorWaitTxsRequest.java   | 150 +++++++++++++++
 .../processors/cache/mvcc/MvccCounter.java      | 182 -------------------
 .../cache/persistence/CacheDataRow.java         |   2 +-
 .../cache/persistence/CacheDataRowAdapter.java  |   2 +-
 .../cache/persistence/CacheSearchRow.java       |   2 +-
 .../persistence/GridCacheOffheapManager.java    |   7 +-
 .../cache/tree/AbstractDataInnerIO.java         |   4 +-
 .../cache/tree/AbstractDataLeafIO.java          |   6 +-
 .../processors/cache/tree/CacheDataTree.java    |   6 +-
 .../processors/cache/tree/MvccDataRow.java      |  18 +-
 .../processors/cache/tree/MvccSearchRow.java    |  12 +-
 .../processors/cache/tree/SearchRow.java        |   2 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  93 ++++++++++
 .../database/FreeListImplSelfTest.java          |   4 +-
 .../processors/query/h2/opt/GridH2Row.java      |   2 +-
 21 files changed, 564 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 1c1bfb7..cf3bd2a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -105,10 +105,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckResponse;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -893,7 +893,7 @@ public class GridIoMessageFactory implements MessageFactory 
{
                 break;
 
             case 132:
-                msg = new CoordinatorTxAckResponse();
+                msg = new CoordinatorFutureResponse();
 
                 break;
 
@@ -907,13 +907,13 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
-            case 135:
-                msg = new MvccCounter();
+            case 136:
+                msg = new MvccCoordinatorVersionResponse();
 
                 return msg;
 
-            case 136:
-                msg = new MvccCoordinatorVersionResponse();
+            case 137:
+                msg = new CoordinatorWaitTxsRequest();
 
                 return msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a8c2f7e..7c4d209 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseL
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
@@ -173,7 +174,15 @@ public interface IgniteCacheOffheapManager {
     public void invoke(GridCacheContext cctx, KeyCacheObject key, 
GridDhtLocalPartition part, OffheapInvokeClosure c)
         throws IgniteCheckedException;
 
-    public void mvccUpdate(GridCacheMapEntry entry,
+    /**
+     * @param entry Entry.
+     * @param val Value.
+     * @param ver Cache version.
+     * @param mvccVer Mvcc update version.
+     * @return Transactions to wait for before finishing current transaction.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public GridLongList mvccUpdate(GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
         MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
@@ -466,7 +475,7 @@ public interface IgniteCacheOffheapManager {
             long expireTime,
             @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
 
-        void mvccUpdate(
+        GridLongList mvccUpdate(
             GridCacheContext cctx,
             KeyCacheObject key,
             CacheObject val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 9a8be390..5549c78 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -363,11 +363,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public void mvccUpdate(GridCacheMapEntry entry,
+    @Override public GridLongList mvccUpdate(GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
         MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
-        dataStore(entry.localPartition()).mvccUpdate(entry.context(),
+        return dataStore(entry.localPartition()).mvccUpdate(entry.context(),
             entry.key(),
             val,
             ver,
@@ -1302,8 +1302,17 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             return dataRow;
         }
 
+        private int compare(CacheDataRow row, long crdVer, long mvccCntr) {
+            int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer);
+
+            if (cmp != 0)
+                return cmp;
+
+            return Long.compare(row.mvccUpdateCounter(), mvccCntr);
+        }
+
         /** {@inheritDoc} */
-        @Override public void mvccUpdate(GridCacheContext cctx,
+        @Override public GridLongList mvccUpdate(GridCacheContext cctx,
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
@@ -1336,7 +1345,45 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 if (grp.sharedGroup() && dataRow.cacheId() == 
CU.UNDEFINED_CACHE_ID)
                     dataRow.cacheId(cctx.cacheId());
 
-                dataTree.putx(dataRow);
+                boolean old = dataTree.putx(dataRow);
+
+                assert !old;
+
+                GridLongList activeTxs = mvccVer.activeTransactions();
+
+                // TODO IGNITE-3484: need special method.
+                GridCursor<CacheDataRow> cur = dataTree.find(
+                    new MvccSearchRow(cacheId, key, 
mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
+                    new MvccSearchRow(cacheId, key, 1, 1));
+
+                GridLongList waitTxs = null;
+
+                boolean first = true;
+
+                while (cur.next()) {
+                    CacheDataRow oldVal = cur.get();
+
+                    if (activeTxs != null && oldVal.mvccCoordinatorVersion() 
== mvccVer.coordinatorVersion() &&
+                        activeTxs.contains(oldVal.mvccUpdateCounter())) {
+                        if (waitTxs == null)
+                            waitTxs = new GridLongList();
+
+                        waitTxs.add(oldVal.mvccUpdateCounter());
+                    }
+                    else if (!first) {
+                        int cmp = compare(oldVal, 
mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+
+                        if (cmp <= 0) {
+                            boolean rmvd = dataTree.removex(oldVal);
+
+                            assert rmvd;
+                        }
+                    }
+
+                    first = false;
+                }
+
+                return waitTxs;
             }
             finally {
                 busyLock.leaveBusy();
@@ -1588,12 +1635,12 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             while (cur.next()) {
                 CacheDataRow row0 = cur.get();
 
-                assert row0.mvccUpdateTopologyVersion() > 0 : row0;
+                assert row0.mvccCoordinatorVersion() > 0 : row0;
 
                 boolean visible;
 
                 if (txs != null) {
-                    visible = row0.mvccUpdateTopologyVersion() != 
ver.coordinatorVersion()
+                    visible = row0.mvccCoordinatorVersion() != 
ver.coordinatorVersion()
                         || !txs.contains(row0.mvccUpdateCounter());
                 }
                 else

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index ccd22d8..7034aca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -78,7 +76,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new 
ConcurrentHashMap<>();
 
     /** */
-    private final ConcurrentMap<Long, TxAckFuture> ackFuts = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new 
ConcurrentHashMap<>();
 
     /** */
     private final AtomicLong futIdCntr = new AtomicLong();
@@ -86,7 +84,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     /** */
     private final CountDownLatch crdLatch = new CountDownLatch(1);
 
-    /** */
+    /** Topology version when local node was assigned as coordinator. */
     private long crdVer;
 
     /** {@inheritDoc} */
@@ -193,11 +191,39 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
     /**
      * @param crd Coordinator.
+     * @param txs Transaction IDs.
+     * @return Future.
+     */
+    public IgniteInternalFuture<Void> waitTxsFuture(ClusterNode crd, 
GridLongList txs) {
+        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crd);
+
+        ackFuts.put(fut.id, fut);
+
+        try {
+            cctx.gridIO().sendToGridTopic(crd,
+                TOPIC_CACHE_COORDINATOR,
+                new CoordinatorWaitTxsRequest(fut.id, txs),
+                SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(); // No need to ack, finish without error.
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crd Coordinator.
      * @param txId Transaction ID.
      * @return Acknowledge future.
      */
     public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, 
GridCacheVersion txId) {
-        TxAckFuture fut = new TxAckFuture(futIdCntr.incrementAndGet(), crd);
+        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crd);
 
         ackFuts.put(fut.id, fut);
 
@@ -345,7 +371,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             try {
                 cctx.gridIO().sendToGridTopic(nodeId,
                     TOPIC_CACHE_COORDINATOR,
-                    new CoordinatorTxAckResponse(msg.futureId()),
+                    new CoordinatorFutureResponse(msg.futureId()),
                     SYSTEM_POOL);
             }
             catch (ClusterTopologyCheckedException e) {
@@ -362,8 +388,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorTxAckResponse(UUID nodeId, 
CoordinatorTxAckResponse msg) {
-        TxAckFuture fut = ackFuts.remove(msg.futureId());
+    private void processCoordinatorTxAckResponse(UUID nodeId, 
CoordinatorFutureResponse msg) {
+        WaitAckFuture fut = ackFuts.remove(msg.futureId());
 
         if (fut != null)
             fut.onResponse();
@@ -399,14 +425,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         assert old == null : txId;
 
-        long minQry = 0;
+        long cleanupVer = Long.MAX_VALUE;
 
-        for (Long qryCntr : activeQueries.keySet()) {
-            if (qryCntr < minQry)
-                minQry = qryCntr;
+        for (Long qryVer : activeQueries.keySet()) {
+            if (qryVer < cleanupVer)
+                cleanupVer = qryVer - 1;
         }
 
-        return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, 
minQry);
+        return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, 
cleanupVer);
     }
 
     /**
@@ -418,6 +444,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         assert cntr != null;
 
         committedCntr.setIfGreater(cntr);
+
+        notifyAll(); // TODO IGNITE-3478.
     }
 
     /**
@@ -465,6 +493,52 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     }
 
     /**
+     * @param msg Message.
+     */
+    private void processCoordinatorWaitTxsRequest(UUID nodeId, 
CoordinatorWaitTxsRequest msg) {
+        GridLongList txs = msg.transactions();
+
+        // TODO IGNITE-3478.
+        synchronized (this) {
+            for (int i = 0; i < txs.size(); i++) {
+                long txId = txs.get(i);
+
+                while (hasActiveTx(txId)) {
+                    try {
+                        wait();
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+        try {
+            cctx.gridIO().sendToGridTopic(nodeId,
+                TOPIC_CACHE_COORDINATOR,
+                new CoordinatorFutureResponse(msg.futureId()),
+                SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx ack response, node left [msg=" + 
msg + ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx ack response [msg=" + msg + ", 
node=" + nodeId + ']', e);
+        }
+
+    }
+
+    private boolean hasActiveTx(long txId) {
+        for (Long id : activeTxs.values()) {
+            if (id == txId)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param topVer Topology version.
      * @return MVCC coordinator for given topology version.
      */
@@ -552,7 +626,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     /**
      *
      */
-    private class TxAckFuture extends GridFutureAdapter<Void> {
+    private class WaitAckFuture extends GridFutureAdapter<Void> {
         /** */
         private final long id;
 
@@ -563,7 +637,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
          * @param id Future ID.
          * @param crd Coordinator.
          */
-        TxAckFuture(long id, ClusterNode crd) {
+        WaitAckFuture(long id, ClusterNode crd) {
             this.id = id;
             this.crd = crd;
         }
@@ -599,7 +673,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             for (MvccVersionFuture fut : verFuts.values())
                 fut.onNodeLeft(nodeId);
 
-            for (TxAckFuture fut : ackFuts.values())
+            for (WaitAckFuture fut : ackFuts.values())
                 fut.onNodeLeft(nodeId);
         }
     }
@@ -631,14 +705,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
                 processCoordinatorTxCounterRequest(nodeId, 
(CoordinatorTxCounterRequest)msg);
             else if (msg instanceof CoordinatorTxAckRequest)
                 processCoordinatorTxAckRequest(nodeId, 
(CoordinatorTxAckRequest)msg);
-            else if (msg instanceof CoordinatorTxAckResponse)
-                processCoordinatorTxAckResponse(nodeId, 
(CoordinatorTxAckResponse)msg);
+            else if (msg instanceof CoordinatorFutureResponse)
+                processCoordinatorTxAckResponse(nodeId, 
(CoordinatorFutureResponse)msg);
             else if (msg instanceof CoordinatorQueryAckRequest)
                 
processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, 
(CoordinatorQueryVersionRequest)msg);
             else if (msg instanceof MvccCoordinatorVersionResponse)
                 processCoordinatorQueryVersionResponse(nodeId, 
(MvccCoordinatorVersionResponse) msg);
+            else if (msg instanceof CoordinatorWaitTxsRequest)
+                processCoordinatorWaitTxsRequest(nodeId, 
(CoordinatorWaitTxsRequest)msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", 
msg=" + msg + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
new file mode 100644
index 0000000..4033733
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorFutureResponse implements MvccCoordinatorMessage {
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorFutureResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     */
+    CoordinatorFutureResponse(long futId) {
+        this.futId = futId;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorFutureResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 132;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorFutureResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
deleted file mode 100644
index 059416c..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckResponse implements MvccCoordinatorMessage {
-    /** */
-    private long futId;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorTxAckResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     */
-    CoordinatorTxAckResponse(long futId) {
-        this.futId = futId;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorTxAckResponse.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 132;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorTxAckResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
new file mode 100644
index 0000000..e66e2b9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorWaitTxsRequest implements MvccCoordinatorMessage {
+    /** */
+    private long futId;
+
+    /** */
+    private GridLongList txs;
+
+    /**
+     *
+     */
+    public CoordinatorWaitTxsRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txs Transactions to wait for.
+     */
+    public CoordinatorWaitTxsRequest(long futId, GridLongList txs) {
+        assert txs != null && txs.size() > 0 : txs;
+
+        this.futId = futId;
+        this.txs = txs;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Transactions to wait for.
+     */
+    GridLongList transactions() {
+        return txs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("txs", txs))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                txs = reader.readMessage("txs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorWaitTxsRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 137;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorWaitTxsRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
deleted file mode 100644
index 161e8d4..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
-
-/**
- *
- */
-public class MvccCounter implements Comparable<MvccCounter>, Message {
-    /** */
-    private long crdVer;
-
-    /** */
-    private long cntr;
-
-    /** */
-    private long cleanupCntr;
-
-    /**
-     *
-     */
-    public MvccCounter() {
-        // No-op.
-    }
-
-    /**
-     * @param crdVer Coordinator version.
-     * @param cntr Coordinator counter.
-     */
-    public MvccCounter(long crdVer, long cntr, long cleanupCntr) {
-        assert crdVer > 0 : crdVer;
-        assert cntr != CacheCoordinatorsSharedManager.COUNTER_NA;
-
-        this.crdVer = crdVer;
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(@NotNull MvccCounter other) {
-        int cmp = Long.compare(crdVer, other.crdVer);
-
-        if (cmp != 0)
-            return cmp;
-
-        return Long.compare(cntr, other.cntr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        MvccCounter that = (MvccCounter) o;
-
-        return crdVer == that.crdVer && cntr == that.cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = (int) (crdVer ^ (crdVer >>> 32));
-
-        res = 31 * res + (int) (cntr ^ (cntr >>> 32));
-
-        return res;
-    }
-
-    /**
-     * @return Coordinator version.
-     */
-    public long coordinatorVersion() {
-        return crdVer;
-    }
-
-    /**
-     * @return Counters.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("crdVer", crdVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                crdVer = reader.readLong("crdVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(MvccCounter.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 135;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(MvccCounter.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 5316eef..7c52c7d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -55,7 +55,7 @@ public interface CacheDataRow extends CacheSearchRow {
      */
     public void key(KeyCacheObject key);
 
-    public long mvccUpdateTopologyVersion();
+    public long mvccCoordinatorVersion();
 
     public long mvccUpdateCounter();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 11da76d..4aef9f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -573,7 +573,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateTopologyVersion() {
+    @Override public long mvccCoordinatorVersion() {
         return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 7834a03..533d8f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -43,7 +43,7 @@ public interface CacheSearchRow {
      */
     public int cacheId();
 
-    public long mvccUpdateTopologyVersion();
+    public long mvccCoordinatorVersion();
 
     public long mvccUpdateCounter();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5a88f9c..ffcfd8e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -830,7 +831,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public long mvccUpdateTopologyVersion() {
+        @Override public long mvccCoordinatorVersion() {
             return 0; // TODO IGNITE-3478.
         }
     }
@@ -1247,14 +1248,14 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public void mvccUpdate(GridCacheContext cctx,
+        @Override public GridLongList mvccUpdate(GridCacheContext cctx,
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            delegate.mvccUpdate(cctx, key, val, ver, mvccVer);
+            return delegate.mvccUpdate(cctx, key, val, ver, mvccVer);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a1dacd0..d87b5ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -59,10 +59,10 @@ public abstract class AbstractDataInnerIO extends 
BPlusInnerIO<CacheSearchRow> i
         }
 
         if (storeMvccVersion()) {
-            assert row.mvccUpdateTopologyVersion() > 0 : row;
+            assert row.mvccCoordinatorVersion() > 0 : row;
             assert row.mvccUpdateCounter() != 
CacheCoordinatorsSharedManager.COUNTER_NA : row;
 
-            PageUtils.putLong(pageAddr, off, row.mvccUpdateTopologyVersion());
+            PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
 
             PageUtils.putLong(pageAddr, off, row.mvccUpdateCounter());

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index bc27a21..0be84c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -58,13 +58,13 @@ public abstract class AbstractDataLeafIO extends 
BPlusLeafIO<CacheSearchRow> imp
         }
 
         if (storeMvccVersion()) {
-            long mvccUpdateTopVer = row.mvccUpdateTopologyVersion();
+            long mvccCrdVer = row.mvccCoordinatorVersion();
             long mvccUpdateCntr = row.mvccUpdateCounter();
 
-            assert mvccUpdateTopVer > 0 : mvccUpdateCntr;
+            assert mvccCrdVer > 0 : mvccCrdVer;
             assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
 
-            PageUtils.putLong(pageAddr, off, mvccUpdateTopVer);
+            PageUtils.putLong(pageAddr, off, mvccCrdVer);
             off += 8;
 
             PageUtils.putLong(pageAddr, off, mvccUpdateCntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 1fcf8dd..3bd0b02 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -114,7 +114,7 @@ public class CacheDataTree extends 
BPlusTree<CacheSearchRow, CacheDataRow> {
     /** {@inheritDoc} */
     @Override protected int compare(BPlusIO<CacheSearchRow> iox, long 
pageAddr, int idx, CacheSearchRow row)
         throws IgniteCheckedException {
-        assert !grp.mvccEnabled() || row.mvccUpdateTopologyVersion() != 0;// 
|| row.getClass() == SearchRow.class;
+        assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || 
row.getClass() == SearchRow.class;
 
         RowLinkIO io = (RowLinkIO)iox;
 
@@ -158,9 +158,9 @@ public class CacheDataTree extends 
BPlusTree<CacheSearchRow, CacheDataRow> {
         if (cmp != 0 || !grp.mvccEnabled())
             return 0;
 
-        long mvccTopVer = io.getMvccUpdateTopologyVersion(pageAddr, idx);
+        long mvccCrdVer = io.getMvccUpdateTopologyVersion(pageAddr, idx);
 
-        cmp = Long.compare(row.mvccUpdateTopologyVersion(), mvccTopVer);
+        cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
 
         if (cmp != 0)
             return cmp;

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index 17cc9e0..a3d2ec4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -28,7 +28,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  */
 public class MvccDataRow extends DataRow {
     /** */
-    private long mvccTopVer;
+    private long crdVer;
 
     /** */
     private long mvccCntr;
@@ -39,16 +39,16 @@ public class MvccDataRow extends DataRow {
      * @param link
      * @param part
      * @param rowData
-     * @param mvccTopVer
+     * @param crdVer
      * @param mvccCntr
      */
-    public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, 
RowData rowData, long mvccTopVer, long mvccCntr) {
+    public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, 
RowData rowData, long crdVer, long mvccCntr) {
         super(grp, hash, link, part, rowData);
 
-        assert mvccTopVer > 0 : mvccTopVer;
+        assert crdVer > 0 : crdVer;
         assert mvccCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
 
-        this.mvccTopVer = mvccTopVer;
+        this.crdVer = crdVer;
         this.mvccCntr = mvccCntr;
     }
 
@@ -64,17 +64,17 @@ public class MvccDataRow extends DataRow {
         GridCacheVersion ver,
         int part,
         int cacheId,
-        long mvccTopVer,
+        long crdVer,
         long mvccCntr) {
         super(key, val, ver, part, 0L, cacheId);
 
         this.mvccCntr = mvccCntr;
-        this.mvccTopVer = mvccTopVer;
+        this.crdVer = crdVer;
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateTopologyVersion() {
-        return mvccTopVer;
+    @Override public long mvccCoordinatorVersion() {
+        return crdVer;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
index ae3da98..e6c5268 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
@@ -24,7 +24,7 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
  */
 public class MvccSearchRow extends SearchRow {
     /** */
-    private long mvccTopVer;
+    private long crdVer;
 
     /** */
     private long mvccCntr;
@@ -32,19 +32,19 @@ public class MvccSearchRow extends SearchRow {
     /**
      * @param cacheId
      * @param key
-     * @param mvccTopVer
+     * @param crdVer
      * @param mvccCntr
      */
-    public MvccSearchRow(int cacheId, KeyCacheObject key, long mvccTopVer, 
long mvccCntr) {
+    public MvccSearchRow(int cacheId, KeyCacheObject key, long crdVer, long 
mvccCntr) {
         super(cacheId, key);
 
-        this.mvccTopVer = mvccTopVer;
+        this.crdVer = crdVer;
         this.mvccCntr = mvccCntr;
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateTopologyVersion() {
-        return mvccTopVer;
+    @Override public long mvccCoordinatorVersion() {
+        return crdVer;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
index 77bcfec..6ab80d0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
@@ -76,7 +76,7 @@ public class SearchRow implements CacheSearchRow {
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateTopologyVersion() {
+    @Override public long mvccCoordinatorVersion() {
         return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index e7478dc..e3b751e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -50,7 +51,9 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -354,6 +357,95 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testWaitPreviousTxAck() throws Exception {
+        testSpi = true;
+
+        startGrid(0);
+
+        client = true;
+
+        final Ignite ignite = startGrid(1);
+
+        final IgniteCache<Object, Object> cache =
+            ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
16));
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            cache.put(1, 1);
+            cache.put(2, 1);
+            cache.put(3, 1);
+
+            tx.commit();
+        }
+
+        TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(ignite);
+
+        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            /** */
+            boolean block = true;
+
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (block && msg instanceof CoordinatorTxAckRequest) {
+                    block = false;
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        IgniteInternalFuture<?> txFut1 = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try (Transaction tx = 
ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(2, 2);
+                    cache.put(3, 2);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<?> txFut2 = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try (Transaction tx = 
ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(1, 3);
+                    cache.put(2, 3);
+
+                    tx.commit();
+                }
+
+                // Should see changes mady by both tx1 and tx2.
+                Map<Object, Object> res = cache.getAll(F.asSet(1, 2, 3));
+
+                assertEquals(3, res.get(1));
+                assertEquals(3, res.get(2));
+                assertEquals(2, res.get(3));
+
+                return null;
+            }
+        });
+
+        clientSpi.waitForBlocked();
+
+        Thread.sleep(1000);
+
+        clientSpi.stopBlock(true);
+
+        txFut1.get();
+        txFut2.get();
+
+        Map<Object, Object> res = cache.getAll(F.asSet(1, 2, 3));
+
+        assertEquals(3, res.get(1));
+        assertEquals(3, res.get(2));
+        assertEquals(2, res.get(3));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPartialCommitGetAll() throws Exception {
         testSpi = true;
 
@@ -424,6 +516,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
 
                 Set<Integer> keys = new HashSet<>();
+
                 keys.add(key1);
                 keys.add(key2);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index 384f7b9..6ae5d6b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -427,12 +427,12 @@ public class FreeListImplSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public long mvccUpdateCounter() {
+        @Override public long mvccCoordinatorVersion() {
             return 0;
         }
 
         /** {@inheritDoc} */
-        @Override public long mvccUpdateTopologyVersion() {
+        @Override public long mvccUpdateCounter() {
             return 0;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/085a3219/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index e3144b3..02e4df8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -181,7 +181,7 @@ public abstract class GridH2Row implements 
GridSearchRowPointer, CacheDataRow, R
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateTopologyVersion() {
+    @Override public long mvccCoordinatorVersion() {
         throw new UnsupportedOperationException();
     }
 

Reply via email to