http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 8bf9e39..9da6876 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -27,13 +27,17 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -58,6 +62,8 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -82,14 +88,21 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionOptimisticException;
 import org.jetbrains.annotations.Nullable;
+import org.junit.Assert;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  * TODO IGNITE-3478: extend tests to use single/mutiple nodes, all tx types.
@@ -182,7 +195,38 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPessimisticTx1() throws Exception {
-        checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() {
+        checkTx1(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSerializableTx1() throws Exception {
+        checkTx1(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticRepeatableReadTx1() throws Exception {
+        checkTx1(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticReadCommittedTx1() throws Exception {
+        checkTx1(OPTIMISTIC, READ_COMMITTED);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkTx1(final TransactionConcurrency concurrency, final 
TransactionIsolation isolation)
+        throws Exception {
+        checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
             @Override public void apply(IgniteCache<Integer, Integer> cache) {
                 try {
                     IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
@@ -192,7 +236,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                     for (Integer key : keys) {
                         log.info("Test key: " + key);
 
-                        try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                        try (Transaction tx = txs.txStart(concurrency, 
isolation)) {
                             Integer val = cache.get(key);
 
                             assertNull(val);
@@ -222,7 +266,24 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPessimisticTx2() throws Exception {
-        checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() {
+        checkTx2(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticSerializableTx2() throws Exception {
+        checkTx2(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkTx2(final TransactionConcurrency concurrency, final 
TransactionIsolation isolation)
+        throws Exception {
+        checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
             @Override public void apply(IgniteCache<Integer, Integer> cache) {
                 try {
                     IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
@@ -232,7 +293,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                     for (Integer key : keys) {
                         log.info("Test key: " + key);
 
-                        try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                        try (Transaction tx = txs.txStart(concurrency, 
isolation)) {
                             cache.put(key, key);
                             cache.put(key + 1, key + 1);
 
@@ -257,9 +318,15 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @param c Closure to run.
      * @throws Exception If failed.
      */
-    private void checkPessimisticTx(IgniteInClosure<IgniteCache<Integer, 
Integer>> c) throws Exception {
+    private void checkTxWithAllCaches(IgniteInClosure<IgniteCache<Integer, 
Integer>> c) throws Exception {
+        client = false;
+
         startGridsMultiThreaded(SRVS);
 
+        client = true;
+
+        startGrid(SRVS);
+
         try {
             for (CacheConfiguration<Object, Object> ccfg : 
cacheConfigurations()) {
                 logCacheInfo(ccfg);
@@ -289,6 +356,21 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testActiveQueriesCleanup() throws Exception {
+        activeQueriesCleanup(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActiveQueriesCleanupTx() throws Exception {
+        activeQueriesCleanup(true);
+    }
+
+    /**
+     * @param tx If {@code true} tests reads inside transaction.
+     * @throws Exception If failed.
+     */
+    private void activeQueriesCleanup(final boolean tx) throws Exception {
         startGridsMultiThreaded(SRVS);
 
         client = true;
@@ -307,7 +389,11 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             @Override public void apply(Integer idx) {
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                IgniteCache cache = ignite(idx % 
NODES).cache(DEFAULT_CACHE_NAME);
+                Ignite node = ignite(idx % NODES);
+
+                IgniteTransactions txs = node.transactions();
+
+                IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
 
                 while (System.currentTimeMillis() < stopTime) {
                     int keyCnt = rnd.nextInt(10) + 1;
@@ -317,7 +403,18 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                     for (int i = 0; i < keyCnt; i++)
                         keys.add(rnd.nextInt());
 
-                    cache.getAll(keys);
+                    if (tx) {
+                        try (Transaction tx = txs.txStart(OPTIMISTIC, 
SERIALIZABLE)) {
+                            cache.getAll(keys);
+
+                            if (rnd.nextBoolean())
+                                tx.commit();
+                            else
+                                tx.rollback();
+                        }
+                    }
+                    else
+                        cache.getAll(keys);
                 }
             }
         }, NODES * 2, "get-thread");
@@ -329,7 +426,107 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimplePutGetAll() throws Exception {
+    public void testTxReadIsolationSimple() throws Exception {
+        Ignite srv0 = startGrids(4);
+
+        client = true;
+
+        startGrid(4);
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            IgniteCache<Object, Object> cache0 = srv0.createCache(ccfg);
+
+            final Map<Integer, Integer> startVals = new HashMap<>();
+
+            final int KEYS = 10;
+
+            for (int i = 0; i < KEYS; i++)
+                startVals.put(i, 0);
+
+            for (final TransactionIsolation isolation : 
TransactionIsolation.values()) {
+                for (final Ignite node : G.allGrids()) {
+                    info("Run test [node=" + node.name() + ", isolation=" + 
isolation + ']');
+
+                    try (Transaction tx = 
srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache0.putAll(startVals);
+
+                        tx.commit();
+                    }
+
+                    final CountDownLatch readStart = new CountDownLatch(1);
+
+                    final CountDownLatch readProceed = new CountDownLatch(1);
+
+                    IgniteInternalFuture fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            IgniteCache<Object, Object> cache = 
node.cache(DEFAULT_CACHE_NAME);
+
+                            try (Transaction tx = 
node.transactions().txStart(OPTIMISTIC, isolation)) {
+                                assertEquals(0, cache.get(0));
+
+                                readStart.countDown();
+
+                                assertTrue(readProceed.await(5, 
TimeUnit.SECONDS));
+
+                                if (isolation == READ_COMMITTED) {
+                                    assertNull(cache.get(1));
+
+                                    assertEquals(1, cache.get(2));
+
+                                    Map<Object, Object> res = 
cache.getAll(startVals.keySet());
+
+                                    assertEquals(startVals.size() / 2, 
res.size());
+
+                                    for (Map.Entry<Object, Object> e : 
res.entrySet())
+                                        assertEquals("Invalid value for key: " 
+ e.getKey(), 1, e.getValue());
+                                }
+                                else {
+                                    assertEquals(0, cache.get(1));
+
+                                    assertEquals(0, cache.get(2));
+
+                                    Map<Object, Object> res = 
cache.getAll(startVals.keySet());
+
+                                    assertEquals(startVals.size(), res.size());
+
+                                    for (Map.Entry<Object, Object> e : 
res.entrySet())
+                                        assertEquals("Invalid value for key: " 
+ e.getKey(), 0, e.getValue());
+                                }
+
+                                tx.rollback();
+                            }
+
+                            return null;
+                        }
+                    });
+
+                    assertTrue(readStart.await(5, TimeUnit.SECONDS));
+
+                    for (int i = 0; i < KEYS; i++) {
+                        try (Transaction tx = 
srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            if (i % 2 == 0)
+                                cache0.put(i, 1);
+                            else
+                                cache0.remove(i);
+
+                            tx.commit();
+                        }
+                    }
+
+                    readProceed.countDown();
+
+                    fut.get();
+                }
+            }
+
+            srv0.destroyCache(cache0.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetAllSimple() throws Exception {
         Ignite node = startGrid(0);
 
         IgniteTransactions txs = node.transactions();
@@ -388,22 +585,22 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimplePutRemove() throws Exception {
-        simplePutRemove(false);
+    public void testPutRemoveSimple() throws Exception {
+        putRemoveSimple(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testSimplePutRemove_LargeKeys() throws Exception {
-        simplePutRemove(true);
+    public void testPutRemoveSimple_LargeKeys() throws Exception {
+        putRemoveSimple(true);
     }
 
     /**
      * @throws Exception If failed.
      * @param largeKeys {@code True} to use large keys (not fitting in single 
page).
      */
-    private void simplePutRemove(boolean largeKeys) throws Exception {
+    private void putRemoveSimple(boolean largeKeys) throws Exception {
         Ignite node = startGrid(0);
 
         IgniteTransactions txs = node.transactions();
@@ -608,7 +805,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             boolean block = true;
 
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (block && msg instanceof CoordinatorTxAckRequest) {
+                if (block && msg instanceof CoordinatorAckRequestTx) {
                     block = false;
 
                     return true;
@@ -804,9 +1001,11 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         for (boolean otherPuts : vals) {
             for (boolean putOnStart : vals) {
-                cleanupWaitsForGet1(otherPuts, putOnStart);
+                for (boolean inTx : vals) {
+                    cleanupWaitsForGet1(otherPuts, putOnStart, inTx);
 
-                afterTest();
+                    afterTest();
+                }
             }
         }
     }
@@ -814,10 +1013,13 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @param otherPuts {@code True} to update unrelated keys to increment 
mvcc counter.
      * @param putOnStart {@code True} to put data in cache before getAll.
+     * @param inTx {@code True} to read inside transaction.
      * @throws Exception If failed.
      */
-    private void cleanupWaitsForGet1(boolean otherPuts, final boolean 
putOnStart) throws Exception {
-        info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + 
putOnStart + "]");
+    private void cleanupWaitsForGet1(boolean otherPuts, final boolean 
putOnStart, final boolean inTx) throws Exception {
+        info("cleanupWaitsForGet [otherPuts=" + otherPuts +
+            ", putOnStart=" + putOnStart +
+            ", inTx=" + inTx + "]");
 
         testSpi = true;
 
@@ -864,7 +1066,18 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             @Override public Void call() throws Exception {
                 IgniteCache<Integer, Integer> cache = 
client.cache(srvCache.getName());
 
-                Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2));
+
+                Map<Integer, Integer> vals;
+
+                if (inTx) {
+                    try (Transaction tx = 
client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        vals = cache.getAll(F.asSet(key1, key2));
+
+                        tx.rollback();
+                    }
+                }
+                else
+                    vals = cache.getAll(F.asSet(key1, key2));
 
                 if (putOnStart) {
                     assertEquals(2, vals.size());
@@ -944,7 +1157,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
             @Override public void apply(ClusterNode node, Message msg) {
-                if (msg instanceof CoordinatorTxAckRequest)
+                if (msg instanceof CoordinatorAckRequestTx)
                     doSleep(2000);
             }
         });
@@ -1063,7 +1276,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             private boolean blocked;
 
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (!blocked && (msg instanceof CoordinatorTxAckRequest)) {
+                if (!blocked && (msg instanceof CoordinatorAckRequestTx)) {
                     blocked = true;
 
                     return true;
@@ -1142,46 +1355,53 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_SingleNode() throws Exception {
-        putAllGetAll(false, 1, 0, 0, 64);
+        putAllGetAll(null, 1, 0, 0, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception 
{
-        putAllGetAll(false, 1, 0, 0, 1);
+        putAllGetAll(null, 1, 0, 0, 1);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups0() throws Exception {
-        putAllGetAll(false, 4, 2, 0, 64);
+        putAllGetAll(null, 4, 2, 0, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups1() throws Exception {
-        putAllGetAll(false, 4, 2, 1, 64);
+        putAllGetAll(null, 4, 2, 1, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups2() throws Exception {
-        putAllGetAll(false, 4, 2, 2, 64);
+        putAllGetAll(null, 4, 2, 2, 64);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() 
throws Exception {
-        putAllGetAll(true, 4, 2, 1, 64);
+        putAllGetAll(RestartMode.RESTART_CRD, 4, 2, 1, 64);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups1_Restart() throws 
Exception {
+        putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 1, 64);
     }
 
     /**
-     * @param restartCrd Coordinator restart flag.
+     * @param restartMode Restart mode.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -1189,7 +1409,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void putAllGetAll(
-        boolean restartCrd,
+        RestartMode restartMode,
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -1202,9 +1422,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         final int readers = 4;
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-            @Override public void apply(Integer idx, List<IgniteCache> caches, 
AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+            @Override public void apply(Integer idx, List<TestCache> caches, 
AtomicBoolean stop) {
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                 int min = idx * RANGE;
@@ -1222,30 +1442,35 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                     while (map.size() < RANGE)
                         map.put(rnd.nextInt(min, max), v);
 
-                    IgniteCache<Integer, Integer> cache = randomCache(caches, 
rnd);
+                    TestCache<Integer, Integer> cache = randomCache(caches, 
rnd);
 
-                    IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
+                    try {
+                        IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
 
-                    try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
-                        if (updated && rnd.nextBoolean()) {
-                            Map<Integer, Integer> res = 
cache.getAll(map.keySet());
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                            if (updated && rnd.nextBoolean()) {
+                                Map<Integer, Integer> res = 
cache.cache.getAll(map.keySet());
 
-                            for (Integer k : map.keySet())
-                                assertEquals(v - 1, (Object)res.get(k));
-                        }
+                                for (Integer k : map.keySet())
+                                    assertEquals(v - 1, (Object)res.get(k));
+                            }
 
-                        cache.putAll(map);
+                            cache.cache.putAll(map);
 
-                        tx.commit();
+                            tx.commit();
 
-                        updated = true;
-                    }
+                            updated = true;
+                        }
 
-                    if (rnd.nextBoolean()) {
-                        Map<Integer, Integer> res = cache.getAll(map.keySet());
+                        if (rnd.nextBoolean()) {
+                            Map<Integer, Integer> res = 
cache.cache.getAll(map.keySet());
 
-                        for (Integer k : map.keySet())
-                            assertEquals(v, (Object)res.get(k));
+                            for (Integer k : map.keySet())
+                                assertEquals(v, (Object)res.get(k));
+                        }
+                    }
+                    finally {
+                        cache.readUnlock();
                     }
 
                     map.clear();
@@ -1257,9 +1482,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             }
         };
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     Set<Integer> keys = new LinkedHashSet<>();
@@ -1275,9 +1500,16 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                         while (keys.size() < RANGE)
                             keys.add(rnd.nextInt(min, max));
 
-                        IgniteCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+                        TestCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+
+                        Map<Integer, Integer> map;
 
-                        Map<Integer, Integer> map = cache.getAll(keys);
+                        try {
+                            map = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
 
                         assertTrue("Invalid map size: " + map.size(),
                             map.isEmpty() || map.size() == RANGE);
@@ -1314,7 +1546,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             };
 
         readWriteTest(
-            restartCrd,
+            restartMode,
             srvs,
             clients,
             cacheBackups,
@@ -1334,49 +1566,49 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode() throws Exception {
-        accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode_SinglePartition() throws 
Exception {
-        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
+        accountsTxReadAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() 
throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
+        accountsTxReadAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception {
-        accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception {
-        accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception {
-        accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxScan_SingleNode_SinglePartition() throws 
Exception {
-        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN);
+        accountsTxReadAll(1, 0, 0, 1, false, ReadMode.SCAN);
     }
 
     /**
@@ -1388,7 +1620,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @param readMode Read mode.
      * @throws Exception If failed.
      */
-    private void accountsTxGetAll(
+    private void accountsTxReadAll(
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -1425,109 +1657,115 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         final Set<Integer> rmvdIds = new HashSet<>();
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     int cnt = 0;
 
                     while (!stop.get()) {
-                        IgniteCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
-                        IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
-
-                        cnt++;
+                        TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
 
-                        Integer id1 = rnd.nextInt(ACCOUNTS);
-                        Integer id2 = rnd.nextInt(ACCOUNTS);
+                        try {
+                            IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
 
-                        while (id1.equals(id2))
-                            id2 = rnd.nextInt(ACCOUNTS);
+                            cnt++;
 
-                        TreeSet<Integer> keys = new TreeSet<>();
+                            Integer id1 = rnd.nextInt(ACCOUNTS);
+                            Integer id2 = rnd.nextInt(ACCOUNTS);
 
-                        keys.add(id1);
-                        keys.add(id2);
+                            while (id1.equals(id2))
+                                id2 = rnd.nextInt(ACCOUNTS);
 
-                        Integer cntr1 = null;
-                        Integer cntr2 = null;
+                            TreeSet<Integer> keys = new TreeSet<>();
 
-                        try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
-                            MvccTestAccount a1;
-                            MvccTestAccount a2;
+                            keys.add(id1);
+                            keys.add(id2);
 
-                            Map<Integer, MvccTestAccount> accounts = 
cache.getAll(keys);
+                            Integer cntr1 = null;
+                            Integer cntr2 = null;
 
-                            a1 = accounts.get(id1);
-                            a2 = accounts.get(id2);
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                                MvccTestAccount a1;
+                                MvccTestAccount a2;
 
-                            if (!withRmvs) {
-                                assertNotNull(a1);
-                                assertNotNull(a2);
+                                Map<Integer, MvccTestAccount> accounts = 
cache.cache.getAll(keys);
 
-                                cntr1 = a1.updateCnt + 1;
-                                cntr2 = a2.updateCnt + 1;
+                                a1 = accounts.get(id1);
+                                a2 = accounts.get(id2);
 
-                                cache.put(id1, new MvccTestAccount(a1.val + 1, 
cntr1));
-                                cache.put(id2, new MvccTestAccount(a2.val - 1, 
cntr2));
-                            }
-                            else {
-                                if (a1 != null || a2 != null) {
-                                    if (a1 != null && a2 != null) {
-                                        Integer rmvd = null;
+                                if (!withRmvs) {
+                                    assertNotNull(a1);
+                                    assertNotNull(a2);
 
-                                        if (rnd.nextInt(10) == 0) {
-                                            synchronized (rmvdIds) {
-                                                if (rmvdIds.size() < ACCOUNTS 
/ 2) {
-                                                    rmvd = rnd.nextBoolean() ? 
id1 : id2;
+                                    cntr1 = a1.updateCnt + 1;
+                                    cntr2 = a2.updateCnt + 1;
 
-                                                    
assertTrue(rmvdIds.add(rmvd));
+                                    cache.cache.put(id1, new 
MvccTestAccount(a1.val + 1, cntr1));
+                                    cache.cache.put(id2, new 
MvccTestAccount(a2.val - 1, cntr2));
+                                }
+                                else {
+                                    if (a1 != null || a2 != null) {
+                                        if (a1 != null && a2 != null) {
+                                            Integer rmvd = null;
+
+                                            if (rnd.nextInt(10) == 0) {
+                                                synchronized (rmvdIds) {
+                                                    if (rmvdIds.size() < 
ACCOUNTS / 2) {
+                                                        rmvd = 
rnd.nextBoolean() ? id1 : id2;
+
+                                                        
assertTrue(rmvdIds.add(rmvd));
+                                                    }
                                                 }
                                             }
-                                        }
 
-                                        if (rmvd != null) {
-                                            cache.remove(rmvd);
+                                            if (rmvd != null) {
+                                                cache.cache.remove(rmvd);
 
-                                            cache.put(rmvd.equals(id1) ? id2 : 
id1,
-                                                new MvccTestAccount(a1.val + 
a2.val, 1));
+                                                
cache.cache.put(rmvd.equals(id1) ? id2 : id1,
+                                                    new MvccTestAccount(a1.val 
+ a2.val, 1));
+                                            }
+                                            else {
+                                                cache.cache.put(id1, new 
MvccTestAccount(a1.val + 1, 1));
+                                                cache.cache.put(id2, new 
MvccTestAccount(a2.val - 1, 1));
+                                            }
                                         }
                                         else {
-                                            cache.put(id1, new 
MvccTestAccount(a1.val + 1, 1));
-                                            cache.put(id2, new 
MvccTestAccount(a2.val - 1, 1));
-                                        }
-                                    }
-                                    else {
-                                        if (a1 == null) {
-                                            cache.put(id1, new 
MvccTestAccount(100, 1));
-                                            cache.put(id2, new 
MvccTestAccount(a2.val - 100, 1));
+                                            if (a1 == null) {
+                                                cache.cache.put(id1, new 
MvccTestAccount(100, 1));
+                                                cache.cache.put(id2, new 
MvccTestAccount(a2.val - 100, 1));
 
-                                            assertTrue(rmvdIds.remove(id1));
-                                        }
-                                        else {
-                                            cache.put(id1, new 
MvccTestAccount(a1.val - 100, 1));
-                                            cache.put(id2, new 
MvccTestAccount(100, 1));
+                                                
assertTrue(rmvdIds.remove(id1));
+                                            }
+                                            else {
+                                                cache.cache.put(id1, new 
MvccTestAccount(a1.val - 100, 1));
+                                                cache.cache.put(id2, new 
MvccTestAccount(100, 1));
 
-                                            assertTrue(rmvdIds.remove(id2));
+                                                
assertTrue(rmvdIds.remove(id2));
+                                            }
                                         }
                                     }
                                 }
-                            }
 
-                            tx.commit();
-                        }
+                                tx.commit();
+                            }
 
-                        if (!withRmvs) {
-                            Map<Integer, MvccTestAccount> accounts = 
cache.getAll(keys);
+                            if (!withRmvs) {
+                                Map<Integer, MvccTestAccount> accounts = 
cache.cache.getAll(keys);
 
-                            MvccTestAccount a1 = accounts.get(id1);
-                            MvccTestAccount a2 = accounts.get(id2);
+                                MvccTestAccount a1 = accounts.get(id1);
+                                MvccTestAccount a2 = accounts.get(id2);
 
-                            assertNotNull(a1);
-                            assertNotNull(a2);
+                                assertNotNull(a1);
+                                assertNotNull(a2);
 
-                            assertTrue(a1.updateCnt >= cntr1);
-                            assertTrue(a2.updateCnt >= cntr2);
+                                assertTrue(a1.updateCnt >= cntr1);
+                                assertTrue(a2.updateCnt >= cntr2);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
                         }
                     }
 
@@ -1535,9 +1773,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     Set<Integer> keys = new LinkedHashSet<>();
@@ -1548,21 +1786,26 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                         while (keys.size() < ACCOUNTS)
                             keys.add(rnd.nextInt(ACCOUNTS));
 
-                        IgniteCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
+                        TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
 
                         Map<Integer, MvccTestAccount> accounts;
 
-                        if (readMode == ReadMode.SCAN) {
-                            accounts = new HashMap<>();
+                        try {
+                            if (readMode == ReadMode.SCAN) {
+                                accounts = new HashMap<>();
 
-                            for (IgniteCache.Entry<Integer, MvccTestAccount> e 
: cache) {
-                                MvccTestAccount old = accounts.put(e.getKey(), 
e.getValue());
+                                for (IgniteCache.Entry<Integer, 
MvccTestAccount> e : cache.cache) {
+                                    MvccTestAccount old = 
accounts.put(e.getKey(), e.getValue());
 
-                                assertNull(old);
+                                    assertNull(old);
+                                }
                             }
+                            else
+                                accounts = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
                         }
-                        else
-                            accounts = cache.getAll(keys);
 
                         if (!withRmvs)
                             assertEquals(ACCOUNTS, accounts.size());
@@ -1590,9 +1833,16 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                     }
 
                     if (idx == 0) {
-                        IgniteCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
+                        TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
+
+                        Map<Integer, MvccTestAccount> accounts;
 
-                        Map<Integer, MvccTestAccount> accounts = 
cache.getAll(keys);
+                        try {
+                            accounts = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
 
                         int sum = 0;
 
@@ -1613,7 +1863,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             };
 
         readWriteTest(
-            false,
+            null,
             srvs,
             clients,
             cacheBackups,
@@ -1629,130 +1879,411 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testUpdate_N_Objects_SingleNode_SinglePartition() throws 
Exception {
-        int[] nValues = {3, 5, 10};
-
-        for (int n : nValues) {
-            updateNObjectsTest(n, 1, 0, 0, 1, 10_000);
-
-            afterTest();
-        }
+    public void testPessimisticTxReadsSnapshot_SingleNode_SinglePartition() 
throws Exception {
+        txReadsSnapshot(1, 0, 0, 1, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testUpdate_N_Objects_SingleNode() throws Exception {
-        int[] nValues = {3, 5, 10};
-
-        for (int n : nValues) {
-            updateNObjectsTest(n, 1, 0, 0, 64, 10_000);
+    public void testPessimisticTxReadsSnapshot_ClientServer() throws Exception 
{
+        txReadsSnapshot(4, 2, 1, 64, true);
+    }
 
-            afterTest();
-        }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticTxReadsSnapshot_SingleNode() throws Exception {
+        txReadsSnapshot(1, 0, 0, 64, false);
     }
 
     /**
-     * @throws Exception If failed
+     * @throws Exception If failed.
      */
-    public void testOperationsSequenceConsistency_SingleNode() throws 
Exception {
-        operationsSequenceConsistency(1, 0, 0, 64);
+    public void testOptimisticTxReadsSnapshot_SingleNode_SinglePartition() 
throws Exception {
+        txReadsSnapshot(1, 0, 0, 1, false);
     }
 
     /**
-     * TODO IGNITE-3478: enable when scan is fully implemented.
-     *
-     * @throws Exception If failed
+     * @throws Exception If failed.
      */
-//    public void testOperationsSequenceConsistency_ClientServer_Backups0() 
throws Exception {
-//        operationsSequenceConsistency(4, 2, 0, 64);
-//    }
+    public void testOptimisticTxReadsSnapshot_ClientServer() throws Exception {
+        txReadsSnapshot(4, 2, 1, 64, false);
+    }
 
     /**
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
      * @param cacheParts Number of cache partitions.
+     * @param pessimistic If {@code true} uses pessimistic tx, otherwise 
optimistic.
      * @throws Exception If failed.
      */
-    private void operationsSequenceConsistency(
+    private void txReadsSnapshot(
         final int srvs,
         final int clients,
         int cacheBackups,
-        int cacheParts
-    )
-        throws Exception
-    {
-        final int writers = 4;
-
-        final int readers = 4;
+        int cacheParts,
+        final boolean pessimistic
+    ) throws Exception {
+        final int ACCOUNTS = 20;
 
-        final long time = 10_000;
+        final int ACCOUNT_START_VAL = 1000;
 
-        final AtomicInteger keyCntr = new AtomicInteger();
+        final int writers = 4;
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
-                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        final int readers = 4;
 
-                    int cnt = 0;
+        final TransactionConcurrency concurrency;
+        final TransactionIsolation isolation;
 
-                    while (!stop.get()) {
-                        IgniteCache<Integer, Value> cache = 
randomCache(caches, rnd);
-                        IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
+        if (pessimistic) {
+            concurrency = PESSIMISTIC;
+            isolation = REPEATABLE_READ;
+        }
+        else {
+            concurrency = OPTIMISTIC;
+            isolation = SERIALIZABLE;
+        }
 
-                        Integer key = keyCntr.incrementAndGet();
+        final IgniteInClosure<IgniteCache<Object, Object>> init = new 
IgniteInClosure<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                final IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
 
-                        try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
-                            cache.put(key, new Value(idx, cnt++));
+                Map<Integer, MvccTestAccount> accounts = new HashMap<>();
 
-                            tx.commit();
-                        }
+                for (int i = 0; i < ACCOUNTS; i++)
+                    accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1));
 
-                        if (key > 1_000_000)
-                            break;
-                    }
+                try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                    cache.putAll(accounts);
 
-                    info("Writer finished, updates: " + cnt);
+                    tx.commit();
                 }
-            };
+            }
+        };
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
+                    int cnt = 0;
+
                     while (!stop.get()) {
-                        IgniteCache<Integer, Value> cache = 
randomCache(caches, rnd);
+                        TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
 
-                        Map<Integer, TreeSet<Integer>> vals = new HashMap<>();
+                        try {
+                            IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
 
-                        for (IgniteCache.Entry<Integer, Value> e : cache) {
-                            Value val = e.getValue();
+                            cnt++;
 
-                            assertNotNull(val);
+                            Integer id1 = rnd.nextInt(ACCOUNTS);
+                            Integer id2 = rnd.nextInt(ACCOUNTS);
 
-                            TreeSet<Integer> cntrs = vals.get(val.key);
+                            while (id1.equals(id2))
+                                id2 = rnd.nextInt(ACCOUNTS);
 
-                            if (cntrs == null)
-                                vals.put(val.key, cntrs = new TreeSet<>());
+                            TreeSet<Integer> keys = new TreeSet<>();
 
-                            boolean add = cntrs.add(val.cnt);
+                            keys.add(id1);
+                            keys.add(id2);
 
-                            assertTrue(add);
-                        }
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                                MvccTestAccount a1;
+                                MvccTestAccount a2;
 
-                        for (TreeSet<Integer> readCntrs : vals.values()) {
-                            for (int i = 0; i < readCntrs.size(); i++)
-                                assertTrue(readCntrs.contains(i));
-                        }
+                                Map<Integer, MvccTestAccount> accounts = 
cache.cache.getAll(keys);
+
+                                a1 = accounts.get(id1);
+                                a2 = accounts.get(id2);
+
+                                assertNotNull(a1);
+                                assertNotNull(a2);
+
+                                cache.cache.put(id1, new 
MvccTestAccount(a1.val + 1, 1));
+                                cache.cache.put(id2, new 
MvccTestAccount(a2.val - 1, 1));
+
+                                tx.commit();
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
+                        IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
+
+                        Map<Integer, MvccTestAccount> accounts = new 
HashMap<>();
+
+                        if (pessimistic) {
+                            try (Transaction tx = txs.txStart(concurrency, 
isolation)) {
+                                int remaining = ACCOUNTS;
+
+                                do {
+                                    int readCnt = rnd.nextInt(remaining) + 1;
+
+                                    Set<Integer> readKeys = new TreeSet<>();
+
+                                    for (int i = 0; i < readCnt; i++)
+                                        readKeys.add(accounts.size() + i);
+
+                                    Map<Integer, MvccTestAccount> readRes = 
cache.cache.getAll(readKeys);
+
+                                    assertEquals(readCnt, readRes.size());
+
+                                    accounts.putAll(readRes);
+
+                                    remaining = ACCOUNTS - accounts.size();
+                                }
+                                while (remaining > 0);
+
+                                validateSum(accounts);
+
+                                tx.commit();
+
+                                cnt++;
+                            }
+                            finally {
+                                cache.readUnlock();
+                            }
+                        }
+                        else {
+                            try (Transaction tx = txs.txStart(concurrency, 
isolation)) {
+                                int remaining = ACCOUNTS;
+
+                                do {
+                                    int readCnt = rnd.nextInt(remaining) + 1;
+
+                                    if (rnd.nextInt(3) == 0) {
+                                        for (int i = 0; i < readCnt; i++) {
+                                            Integer key = 
rnd.nextInt(ACCOUNTS);
+
+                                            MvccTestAccount account = 
cache.cache.get(key);
+
+                                            assertNotNull(account);
+
+                                            accounts.put(key, account);
+                                        }
+                                    }
+                                    else {
+                                        Set<Integer> readKeys = new 
LinkedHashSet<>();
+
+                                        for (int i = 0; i < readCnt; i++)
+                                            
readKeys.add(rnd.nextInt(ACCOUNTS));
+
+                                        Map<Integer, MvccTestAccount> readRes 
= cache.cache.getAll(readKeys);
+
+                                        assertEquals(readKeys.size(), 
readRes.size());
+
+                                        accounts.putAll(readRes);
+                                    }
+
+                                    remaining = ACCOUNTS - accounts.size();
+                                }
+                                while (remaining > 0);
+
+                                validateSum(accounts);
+
+                                cnt++;
+
+                                tx.commit();
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // No-op.
+                            }
+                            finally {
+                                cache.readUnlock();
+                            }
+                        }
+                    }
+
+                    info("Reader finished, txs: " + cnt);
+                }
+
+                /**
+                 * @param accounts Read accounts.
+                 */
+                private void validateSum(Map<Integer, MvccTestAccount> 
accounts) {
+                    int sum = 0;
+
+                    for (int i = 0; i < ACCOUNTS; i++) {
+                        MvccTestAccount account = accounts.get(i);
+
+                        assertNotNull(account);
+
+                        sum += account.val;
+                    }
+
+                    assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                }
+            };
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            cacheBackups,
+            cacheParts,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            init,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdate_N_Objects_SingleNode_SinglePartition() throws 
Exception {
+        int[] nValues = {3, 5, 10};
+
+        for (int n : nValues) {
+            updateNObjectsTest(n, 1, 0, 0, 1, 10_000);
+
+            afterTest();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdate_N_Objects_SingleNode() throws Exception {
+        int[] nValues = {3, 5, 10};
+
+        for (int n : nValues) {
+            updateNObjectsTest(n, 1, 0, 0, 64, 10_000);
+
+            afterTest();
+        }
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testOperationsSequenceConsistency_SingleNode() throws 
Exception {
+        operationsSequenceConsistency(1, 0, 0, 64);
+    }
+
+    /**
+     * TODO IGNITE-3478: enable when scan is fully implemented.
+     *
+     * @throws Exception If failed
+     */
+//    public void testOperationsSequenceConsistency_ClientServer_Backups0() 
throws Exception {
+//        operationsSequenceConsistency(4, 2, 0, 64);
+//    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param clients Number of client nodes.
+     * @param cacheBackups Number of cache backups.
+     * @param cacheParts Number of cache partitions.
+     * @throws Exception If failed.
+     */
+    private void operationsSequenceConsistency(
+        final int srvs,
+        final int clients,
+        int cacheBackups,
+        int cacheParts
+    )
+        throws Exception
+    {
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final long time = 10_000;
+
+        final AtomicInteger keyCntr = new AtomicInteger();
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, Value> cache = randomCache(caches, 
rnd);
+
+                        try {
+                            IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
+
+                            Integer key = keyCntr.incrementAndGet();
+
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                                cache.cache.put(key, new Value(idx, cnt++));
+
+                                tx.commit();
+                            }
+
+                            if (key > 1_000_000)
+                                break;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (!stop.get()) {
+                        TestCache<Integer, Value> cache = randomCache(caches, 
rnd);
+
+                        try {
+                            Map<Integer, TreeSet<Integer>> vals = new 
HashMap<>();
+
+                            for (IgniteCache.Entry<Integer, Value> e : 
cache.cache) {
+                                Value val = e.getValue();
+
+                                assertNotNull(val);
+
+                                TreeSet<Integer> cntrs = vals.get(val.key);
+
+                                if (cntrs == null)
+                                    vals.put(val.key, cntrs = new TreeSet<>());
+
+                                boolean add = cntrs.add(val.cnt);
+
+                                assertTrue(add);
+                            }
+
+                            for (TreeSet<Integer> readCntrs : vals.values()) {
+                                for (int i = 0; i < readCntrs.size(); i++)
+                                    assertTrue(readCntrs.contains(i));
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
                     }
                 }
             };
 
         readWriteTest(
-            false,
+            null,
             srvs,
             clients,
             cacheBackups,
@@ -1766,6 +2297,147 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * TODO IGNITE-3478 enable when recovery is implemented.
+     *
+     * @throws Exception If failed.
+     */
+    public void _testNodesRestartNoHang() throws Exception {
+        final int srvs = 4;
+        final int clients = 4;
+        final int writers = 6;
+        final int readers = 2;
+
+        final int KEYS = 100_000;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Map<Integer, Integer> map = new TreeMap<>();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        int keys = rnd.nextInt(32) + 1;
+
+                        while (map.size() < keys)
+                            map.put(rnd.nextInt(KEYS), cnt);
+
+                        TestCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+
+                        try {
+                            IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
+
+                            TransactionConcurrency concurrency;
+                            TransactionIsolation isolation;
+
+                            switch (rnd.nextInt(3)) {
+                                case 0: {
+                                    concurrency = PESSIMISTIC;
+                                    isolation = REPEATABLE_READ;
+
+                                    break;
+                                }
+                                case 1: {
+                                    concurrency = OPTIMISTIC;
+                                    isolation = REPEATABLE_READ;
+
+                                    break;
+                                }
+                                case 2: {
+                                    concurrency = OPTIMISTIC;
+                                    isolation = SERIALIZABLE;
+
+                                    break;
+                                }
+                                default: {
+                                    fail();
+
+                                    return;
+                                }
+                            }
+
+                            try (Transaction tx = txs.txStart(concurrency, 
isolation)) {
+                                if (rnd.nextBoolean()) {
+                                    Map<Integer, Integer> res = 
cache.cache.getAll(map.keySet());
+
+                                    assertNotNull(res);
+                                }
+
+                                cache.cache.putAll(map);
+
+                                tx.commit();
+                            }
+                            catch (TransactionOptimisticException e) {
+                                assertEquals(SERIALIZABLE, isolation);
+                            }
+                            catch (Exception e) {
+                                Assert.assertTrue("Unexpected error: " + e, 
X.hasCause(e, ClusterTopologyException.class));
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        map.clear();
+
+                        cnt++;
+                    }
+
+                    info("Writer done, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    while (!stop.get()) {
+                        int keyCnt = rnd.nextInt(64) + 1;
+
+                        while (keys.size() < keyCnt)
+                            keys.add(rnd.nextInt(KEYS));
+
+                        TestCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+
+                        Map<Integer, Integer> map;
+
+                        try {
+                            map = cache.cache.getAll(keys);
+
+                            assertNotNull(map);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        keys.clear();
+                    }
+                }
+            };
+
+        readWriteTest(
+            RestartMode.RESTART_RND_SRV,
+            srvs,
+            clients,
+            1,
+            256,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            null,
+            writer,
+            reader);
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testActiveQueryCleanupOnNodeFailure() throws Exception {
@@ -1783,7 +2455,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         srvSpi.blockMessages(GridNearGetResponse.class, 
getTestIgniteInstanceName(1));
 
-        
TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class,
+        
TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorAckRequestQuery.class,
             getTestIgniteInstanceName(0));
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
@@ -1817,7 +2489,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimpleRebalance() throws Exception {
+    public void testRebalanceSimple() throws Exception {
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Integer, Integer> cache =  (IgniteCache)srv0.createCache(
@@ -1880,59 +2552,177 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimpleRebalanceWithRemovedValues() throws Exception {
+    public void testRebalanceWithRemovedValuesSimple() throws Exception {
         Ignite node = startGrid(0);
 
-        IgniteTransactions txs = node.transactions();
+        IgniteTransactions txs = node.transactions();
+
+        final IgniteCache<Object, Object> cache = 
node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < 100; k++)
+                cache.remove(k);
+
+            tx.commit();
+        }
+
+        Map<Object, Object> expVals = new HashMap<>();
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 100; k < 200; k++) {
+                cache.put(k, k);
+
+                expVals.put(k, k);
+            }
+
+            tx.commit();
+        }
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 100; k < 200; k++) {
+                if (k % 2 == 0) {
+                    cache.remove(k);
+
+                    expVals.remove(k);
+                }
+            }
+
+            tx.commit();
+        }
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        checkValues(expVals, jcache(1));
+
+        stopGrid(0);
+
+        checkValues(expVals, jcache(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorFailureSimplePessimisticTx() throws Exception {
+        coordinatorFailureSimple(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorFailureSimpleSerializableTx() throws Exception {
+        coordinatorFailureSimple(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorFailureSimpleOptimisticTx() throws Exception {
+        coordinatorFailureSimple(OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void coordinatorFailureSimple(
+        final TransactionConcurrency concurrency,
+        final TransactionIsolation isolation
+    ) throws Exception {
+        testSpi = true;
+
+        startGrids(3);
+
+        client = true;
+
+        final Ignite client = startGrid(3);
+
+        final IgniteCache cache = client.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT));
+
+        final Integer key1 = primaryKey(jcache(1));
+        final Integer key2 = primaryKey(jcache(2));
+
+        TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(ignite(0));
+
+        crdSpi.blockMessages(MvccCoordinatorVersionResponse.class, 
client.name());
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                try {
+                    try (Transaction tx = 
client.transactions().txStart(concurrency, isolation)) {
+                        cache.put(key1, 1);
+                        cache.put(key2, 2);
+
+                        tx.commit();
+                    }
 
-        final IgniteCache<Object, Object> cache = 
node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+                    fail();
+                }
+                catch (ClusterTopologyException e) {
+                    info("Expected exception: " + e);
 
-        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-            for (int k = 0; k < 100; k++)
-                cache.remove(k);
+                    assertNotNull(e.retryReadyFuture());
 
-            tx.commit();
-        }
+                    e.retryReadyFuture().get();
+                }
 
-        Map<Object, Object> expVals = new HashMap<>();
+                return null;
+            }
+        }, "tx-thread");
 
-        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-            for (int k = 100; k < 200; k++) {
-                cache.put(k, k);
+        crdSpi.waitForBlocked();
 
-                expVals.put(k, k);
-            }
+        stopGrid(0);
 
-            tx.commit();
-        }
+        fut.get();
 
-        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-            for (int k = 100; k < 200; k++) {
-                if (k % 2 == 0) {
-                    cache.remove(k);
+        assertNull(cache.get(key1));
+        assertNull(cache.get(key2));
 
-                    expVals.remove(k);
-                }
-            }
+        try (Transaction tx = client.transactions().txStart(concurrency, 
isolation)) {
+            cache.put(key1, 1);
+            cache.put(key2, 2);
 
             tx.commit();
         }
 
-        startGrid(1);
-
-        awaitPartitionMapExchange();
+        assertEquals(1, cache.get(key1));
+        assertEquals(2, cache.get(key2));
+    }
 
-        checkValues(expVals, jcache(1));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxPrepareFailureSimplePessimisticTx() throws Exception {
+        txPrepareFailureSimple(PESSIMISTIC, REPEATABLE_READ);
+    }
 
-        stopGrid(0);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxPrepareFailureSimpleSerializableTx() throws Exception {
+        txPrepareFailureSimple(OPTIMISTIC, SERIALIZABLE);
+    }
 
-        checkValues(expVals, jcache(1));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxPrepareFailureSimpleOptimisticTx() throws Exception {
+        txPrepareFailureSimple(OPTIMISTIC, REPEATABLE_READ);
     }
 
     /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
      * @throws Exception If failed.
      */
-    public void testCoordinatorFailurePessimisticTx() throws Exception {
+    private void txPrepareFailureSimple(
+        final TransactionConcurrency concurrency,
+        final TransactionIsolation isolation
+    ) throws Exception {
         testSpi = true;
 
         startGrids(3);
@@ -1947,14 +2737,14 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         final Integer key1 = primaryKey(jcache(1));
         final Integer key2 = primaryKey(jcache(2));
 
-        TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(ignite(0));
+        TestRecordingCommunicationSpi srv1Spi = 
TestRecordingCommunicationSpi.spi(ignite(1));
 
-        crdSpi.blockMessages(MvccCoordinatorVersionResponse.class, 
client.name());
+        srv1Spi.blockMessages(GridNearTxPrepareResponse.class, client.name());
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
             @Override public Object call() throws Exception {
                 try {
-                    try (Transaction tx = 
client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    try (Transaction tx = 
client.transactions().txStart(concurrency, isolation)) {
                         cache.put(key1, 1);
                         cache.put(key2, 2);
 
@@ -1965,22 +2755,28 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
                 catch (ClusterTopologyException e) {
                     info("Expected exception: " + e);
+
+                    assertNotNull(e.retryReadyFuture());
+
+                    e.retryReadyFuture().get();
                 }
 
                 return null;
             }
         }, "tx-thread");
 
-        crdSpi.waitForBlocked();
+        srv1Spi.waitForBlocked();
 
-        stopGrid(0);
+        assertFalse(fut.isDone());
+
+        stopGrid(1);
 
         fut.get();
 
         assertNull(cache.get(key1));
         assertNull(cache.get(key2));
 
-        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+        try (Transaction tx = client.transactions().txStart(concurrency, 
isolation)) {
             cache.put(key1, 1);
             cache.put(key2, 2);
 
@@ -1994,32 +2790,165 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws 
Exception {
-        for (int i = 1; i <= 3; i++) {
-            readInProgressCoordinatorFailsSimple(false, i);
+    public void testSerializableTxRemap() throws Exception {
+        testSpi = true;
 
-            afterTest();
+        startGrids(2);
+
+        client = true;
+
+        final Ignite client = startGrid(2);
+
+        final IgniteCache cache = client.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT));
+
+        final Map<Object, Object> vals = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            vals.put(i, i);
+
+        TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(ignite(2));
+
+        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridNearTxPrepareRequest;
+            }
+        });
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                try (Transaction tx = 
client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.putAll(vals);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "tx-thread");
+
+        clientSpi.waitForBlocked(2);
+
+        this.client = false;
+
+        startGrid(3);
+
+        assertFalse(fut.isDone());
+
+        clientSpi.stopBlock();
+
+        fut.get();
+
+        for (Ignite node : G.allGrids())
+            checkValues(vals, node.cache(cache.getName()));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxInProgressCoordinatorChangeSimple() throws Exception {
+        txInProgressCoordinatorChangeSimple(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxInProgressCoordinatorChangeSimple_Readonly() throws 
Exception {
+        txInProgressCoordinatorChangeSimple(true);
+    }
+
+    /**
+     * @param readOnly If {@code true} tests read-only transaction.
+     * @throws Exception If failed.
+     */
+    private void txInProgressCoordinatorChangeSimple(boolean readOnly) throws 
Exception {
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(new 
CoordinatorAssignClosure());
+
+        Ignite srv0 = startGrids(4);
+
+        client = true;
+
+        startGrid(4);
+
+        client = false;
+
+        nodeAttr = CRD_ATTR;
+
+        int crdIdx = 5;
+
+        startGrid(crdIdx);
+
+        srv0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
+            setNodeFilter(new CoordinatorNodeFilter()));
+
+        Set<Integer> keys = F.asSet(1, 2, 3);
+
+        for (int i = 0; i < 5; i++) {
+            Ignite node = ignite(i);
+
+            info("Test with node: " + node.name());
+
+            IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+
+            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, 
SERIALIZABLE)) {
+                assertTrue(cache.getAll(keys).isEmpty());
+
+                if (!readOnly)
+                    cache.put(0, 0);
+
+                startGrid(crdIdx + 1);
+
+                stopGrid(crdIdx);
+
+                crdIdx++;
+
+                tx.commit();
+            }
+
+            checkActiveQueriesCleanup(ignite(crdIdx));
         }
     }
+    
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws 
Exception {
+        readInProgressCoordinatorFailsSimple(false);
+    }
 
     /**
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFailsSimple_FromClient() throws 
Exception {
-        for (int i = 1; i <= 3; i++) {
-            readInProgressCoordinatorFailsSimple(true, i);
+        readInProgressCoordinatorFailsSimple(true);
+    }
 
-            afterTest();
+    /**
+     * @param fromClient {@code True} if read from client node, otherwise from 
server node.
+     * @throws Exception If failed.
+     */
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient) 
throws Exception {
+        for (boolean readInTx : new boolean[]{false, true}) {
+            for (int i = 1; i <= 3; i++) {
+                readInProgressCoordinatorFailsSimple(fromClient, i, readInTx);
+
+                afterTest();
+            }
         }
     }
 
     /**
      * @param fromClient {@code True} if read from client node, otherwise from 
server node.
      * @param crdChangeCnt Number of coordinator changes.
+     * @param readInTx {@code True} to read inside transaction.
      * @throws Exception If failed.
      */
-    private void readInProgressCoordinatorFailsSimple(boolean fromClient, int 
crdChangeCnt) throws Exception {
-        info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient 
+ ", crdChangeCnt=" + crdChangeCnt + ']');
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient, int 
crdChangeCnt, final boolean readInTx)
+        throws Exception
+    {
+        info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient +
+            ", crdChangeCnt=" + crdChangeCnt +
+            ", readInTx=" + readInTx + ']');
 
         testSpi = true;
 
@@ -2072,7 +3001,17 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() {
             @Override public Object call() throws Exception {
-                Map<Integer, Integer> res = cache.getAll(keys);
+                Map<Integer, Integer> res;
+
+                if (readInTx) {
+                    try (Transaction tx = 
getNode.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        res = cache.getAll(keys);
+
+                        tx.rollback();
+                    }
+                }
+                else
+                    res = cache.getAll(keys);
 
                 assertEquals(20, res.size());
 
@@ -2197,21 +3136,36 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFails() throws Exception {
-        readInProgressCoordinatorFails(false);
+        readInProgressCoordinatorFails(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInsideTxInProgressCoordinatorFails() throws Exception {
+        readInProgressCoordinatorFails(false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFails_ReadDelay() throws 
Exception {
-        readInProgressCoordinatorFails(true);
+        readInProgressCoordinatorFails(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInsideTxInProgressCoordinatorFails_ReadDelay() throws 
Exception {
+        readInProgressCoordinatorFails(true, true);
     }
 
     /**
      * @param readDelay {@code True} if delays get requests.
+     * @param readInTx {@code True} to read inside transaction.
      * @throws Exception If failed.
      */
-    private void readInProgressCoordinatorFails(boolean readDelay) throws 
Exception {
+    private void readInProgressCoordinatorFails(boolean readDelay, final 
boolean readInTx) throws Exception {
         final int COORD_NODES = 5;
         final int SRV_NODES = 4;
 
@@ -2284,7 +3238,17 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                             for (String cacheName : cacheNames) {
                                 IgniteCache cache = node.cache(cacheName);
 
-                                Map<Integer, Integer> res = 
cache.getAll(vals.keySet());
+                                Map<Integer, Integer> res;
+
+                                if (readInTx) {
+                                    try (Transaction tx = 
node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                                        res = cache.getAll(vals.keySet());
+
+                                        tx.rollback();
+                                    }
+                                }
+                                else
+                                    res = cache.getAll(vals.keySet());
 
                                 assertEquals(vals.size(), res.size());
 
@@ -2311,7 +3275,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                         throw e;
                     }
                 }
-            }, (SRV_NODES + 1) + 1, "get-thread");
+            }, ((SRV_NODES + 1) + 1) * 2, "get-thread");
 
             IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new 
Callable() {
                 @Override public Void call() throws Exception {
@@ -2468,7 +3432,19 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         for (int i = 0; i < 10; i++)
             vals.put(i, val);
 
-        try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+        TransactionConcurrency concurrency;
+        TransactionIsolation isolation;
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            concurrency = PESSIMISTIC;
+            isolation = REPEATABLE_READ;
+        }
+        else {
+            concurrency = OPTIMISTIC;
+            isolation = SERIALIZABLE;
+        }
+
+        try (Transaction tx = putNode.transactions().txStart(concurrency, 
isolation)) {
             for (String cacheName : cacheNames)
                 putNode.cache(cacheName).putAll(vals);
 
@@ -2525,7 +3501,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         MvccCoordinator crd = null;
 
         for (Ignite node : G.allGrids()) {
-            CacheCoordinatorsProcessor crdProc = ((IgniteKernal) 
node).context().cache().context().coordinators();
+            CacheCoordinatorsProcessor crdProc = 
((IgniteKernal)node).context().cache().context().coordinators();
 
             MvccCoordinator crd0 = crdProc.currentCoordinator();
 
@@ -2704,16 +3680,16 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             }
         };
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     int cnt = 0;
 
                     while (!stop.get()) {
-                        IgniteCache<Integer, Integer> cache = 
randomCache(caches, rnd);
-                        IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
+                        TestCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+                        IgniteTransactions txs = 
cache.cache.unwrap(Ignite.class).transactions();
 
                         TreeSet<Integer> keys = new TreeSet<>();
 
@@ -2721,7 +3697,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                             keys.add(rnd.nextInt(TOTAL));
 
                         try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
-                            Map<Integer, Integer> curVals = cache.getAll(keys);
+                            Map<Integer, Integer> curVals = 
cache.cache.getAll(keys);
 
                             assertEquals(N, curVals.size());
 
@@ -2730,10 +3706,13 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                             for (Map.Entry<Integer, Integer> e : 
curVals.entrySet())
                                 newVals.put(e.getKey(), e.getValue() + 1);
 
-                            cache.putAll(newVals);
+                            cache.cache.putAll(newVals);
 
                             tx.commit();
                         }
+                        finally {
+                            cache.readUnlock();
+                        }
 
                         cnt++;
                     }
@@ -2742,9 +3721,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             };
 
-        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader =
-            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                     Set<Integer> keys = new LinkedHashSet<>();
@@ -2753,9 +3732,16 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                         while (keys.size() < TOTAL)
                             keys.add(rnd.nextInt(TOTAL));
 
-                        IgniteCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+                        TestCache<Integer, Integer> cache = 
randomCache(caches, rnd);
 
-                        Map<Integer, Integer> vals = cache.getAll(keys);
+                        Map<Integer, Integer> vals;
+
+                        try {
+                            vals = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
 
                         assertEquals(TOTAL, vals.size());
 
@@ -2773,9 +3759,16 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                     }
 
                     if (idx == 0) {
-                        IgniteCache<Integer, Integer> cache = 
randomCache(caches, rnd);
+                        TestCache<Integer, Integer> cache = 
randomCache(caches, rnd);
 
-                        Map<Integer, Integer> vals = cache.getAll(keys);
+                        Map<Integer, Integer> vals;
+
+                        try {
+                            vals = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
+                

<TRUNCATED>

Reply via email to