Repository: ignite
Updated Branches:
  refs/heads/ignite-2.5 b4cb2f0df -> b4cc9f2d4


http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
new file mode 100644
index 0000000..bad1b61
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message is used to send acks for {@link Latch} instances management.
+ */
+public class LatchAckMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Latch id. */
+    private String latchId;
+
+    /** Latch topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Flag indicates that ack is final. */
+    private boolean isFinal;
+
+    /**
+     * Constructor.
+     *
+     * @param latchId Latch id.
+     * @param topVer Latch topology version.
+     * @param isFinal Final acknowledgement flag.
+     */
+    public LatchAckMessage(String latchId, AffinityTopologyVersion topVer, 
boolean isFinal) {
+        this.latchId = latchId;
+        this.topVer = topVer;
+        this.isFinal = isFinal;
+    }
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public LatchAckMessage() {
+    }
+
+    /**
+     * @return Latch id.
+     */
+    public String latchId() {
+        return latchId;
+    }
+
+    /**
+     * @return Latch topology version.
+     */
+    public AffinityTopologyVersion topVer() {
+        return topVer;
+    }
+
+    /**
+     * @return {@code} if ack is final.
+     */
+    public boolean isFinal() {
+        return isFinal;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeBoolean("isFinal", isFinal))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeString("latchId", latchId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                isFinal = reader.readBoolean("isFinal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                latchId = reader.readString("latchId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(LatchAckMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 135;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 7785605..33f84f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3525,6 +3525,11 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
                 U.error(log, "Failed to prepare transaction: " + this, e);
             }
+            catch (Throwable t) {
+                fut.onDone(t);
+
+                throw t;
+            }
 
             if (err != null)
                 fut.rollbackOnError(err);
@@ -3544,6 +3549,11 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
                         U.error(log, "Failed to prepare transaction: " + this, 
e);
                     }
+                    catch (Throwable t) {
+                        fut.onDone(t);
+
+                        throw t;
+                    }
 
                     if (err != null)
                         fut.rollbackOnError(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5cfd92d..68ec83d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -189,7 +189,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             freeList.saveMetadata();
 
             long updCntr = store.updateCounter();
-            int size = store.fullSize();
+            long size = store.fullSize();
             long rmvId = globalRemoveId().get();
 
             PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
@@ -318,7 +318,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                                 partMetaId,
                                 updCntr,
                                 rmvId,
-                                size,
+                                (int)size, // TODO: Partition size may be long
                                 cntrsPageId,
                                 state == null ? -1 : (byte)state.ordinal(),
                                 pageCnt
@@ -549,7 +549,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             final int grpId,
             final int partId,
             final int currAllocatedPageCnt,
-            final int partSize
+            final long partSize
     ) {
         if (part != null) {
             boolean reserved = part.reserve();
@@ -1301,7 +1301,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public int fullSize() {
+        @Override public long fullSize() {
             try {
                 CacheDataStore delegate0 = init0(true);
 
@@ -1313,7 +1313,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public int cacheSize(int cacheId) {
+        @Override public long cacheSize(int cacheId) {
             try {
                 CacheDataStore delegate0 = init0(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 9bfaaf3..945ef48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -490,7 +490,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     @Override public AffinityTopologyVersion topologyVersion() {
         AffinityTopologyVersion res = topVer;
 
-        if (res.equals(AffinityTopologyVersion.NONE)) {
+        if (res == null || res.equals(AffinityTopologyVersion.NONE)) {
             if (system()) {
                 AffinityTopologyVersion topVer = 
cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fbdeca1..9fb8777 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -545,10 +545,10 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @param topVer Topology version.
      * @return Future that will be completed when all ongoing transactions are 
finished.
      */
-    public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion 
topVer) {
+    public IgniteInternalFuture<Boolean> 
finishLocalTxs(AffinityTopologyVersion topVer) {
         GridCompoundFuture<IgniteInternalTx, Boolean> res =
             new CacheObjectsReleaseFuture<>(
-                "Tx",
+                "LocalTx",
                 topVer,
                 new IgniteReducer<IgniteInternalTx, Boolean>() {
                     @Override public boolean collect(IgniteInternalTx e) {
@@ -561,8 +561,9 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                 });
 
         for (IgniteInternalTx tx : txs()) {
-            if (needWaitTransaction(tx, topVer))
+            if (needWaitTransaction(tx, topVer)) {
                 res.add(tx.finishFuture());
+            }
         }
 
         res.markInitialized();
@@ -571,6 +572,29 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Creates a future that will wait for finishing all tx updates on backups 
after all local transactions are finished.
+     *
+     * NOTE:
+     * As we send finish request to backup nodes after transaction 
successfully completed on primary node
+     * it's important to ensure that all updates from primary to backup are 
finished or at least remote transaction has created on backup node.
+     *
+     * @param finishLocalTxsFuture Local transactions finish future.
+     * @param topVer Topology version.
+     * @return Future that will be completed when all ongoing transactions are 
finished.
+     */
+    public IgniteInternalFuture<?> finishAllTxs(IgniteInternalFuture<?> 
finishLocalTxsFuture, AffinityTopologyVersion topVer) {
+        final GridCompoundFuture finishAllTxsFuture = new 
CacheObjectsReleaseFuture("AllTx", topVer);
+
+        // After finishing all local updates, wait for finishing all tx 
updates on backups.
+        finishLocalTxsFuture.listen(future -> {
+            finishAllTxsFuture.add(cctx.mvcc().finishRemoteTxs(topVer));
+            finishAllTxsFuture.markInitialized();
+        });
+
+        return finishAllTxsFuture;
+    }
+
+    /**
      * @param tx Transaction.
      * @param topVer Exchange version.
      * @return {@code True} if need wait transaction for exchange.
@@ -1834,12 +1858,12 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Finish future for related remote transactions.
      */
     @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion 
nearVer) {
-        GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+    public IgniteInternalFuture<IgniteInternalTx> 
remoteTxFinishFuture(GridCacheVersion nearVer) {
+        GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new 
GridCompoundFuture<>();
 
         for (final IgniteInternalTx tx : txs()) {
             if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
-                fut.add((IgniteInternalFuture) tx.finishFuture());
+                fut.add(tx.finishFuture());
         }
 
         fut.markInitialized();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
index 7263656..702b188 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
@@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest 
extends GridCommonAbstrac
             cache = grid(g).cache(DEFAULT_CACHE_NAME);
 
             for (GridDhtLocalPartition p : 
dht(cache).topology().localPartitions()) {
-                int size = p.dataStore().fullSize();
+                long size = p.dataStore().fullSize();
 
                 assertTrue("Unexpected size: " + size, size <= 32);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 468bbc8..6c570d7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
new file mode 100644
index 0000000..52cd033
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Lists;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests for {@link ExchangeLatchManager} functionality when latch coordinator 
is failed.
+ */
+public class IgniteExchangeLatchManagerCoordinatorFailTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String LATCH_NAME = "test";
+
+    /** 5 nodes. */
+    private final AffinityTopologyVersion latchTopVer = new 
AffinityTopologyVersion(5, 0);
+
+    /** Wait before latch creation. */
+    private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, 
Boolean> beforeCreate = (mgr, syncLatch) -> {
+        try {
+            syncLatch.countDown();
+            syncLatch.await();
+
+            Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+            distributedLatch.countDown();
+
+            distributedLatch.await();
+        } catch (Exception e) {
+            log.error("Unexpected exception", e);
+
+            return false;
+        }
+
+        return true;
+    };
+
+    /** Wait before latch count down. */
+    private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, 
Boolean> beforeCountDown = (mgr, syncLatch) -> {
+        try {
+            Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+            syncLatch.countDown();
+            syncLatch.await();
+
+            distributedLatch.countDown();
+
+            distributedLatch.await();
+        } catch (Exception e) {
+            log.error("Unexpected exception ", e);
+
+            return false;
+        }
+
+        return true;
+    };
+
+    /** Wait after all operations are successful. */
+    private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, 
Boolean> all = (mgr, syncLatch) -> {
+        try {
+            Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+            distributedLatch.countDown();
+
+            syncLatch.countDown();
+
+            distributedLatch.await();
+
+            syncLatch.await();
+        } catch (Exception e) {
+            log.error("Unexpected exception ", e);
+
+            return false;
+        }
+
+        return true;
+    };
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test scenarios description:
+     *
+     * We have existing coordinator and 4 other nodes.
+     * Each node do following operations:
+     * 1) Create latch
+     * 2) Countdown latch
+     * 3) Await latch
+     *
+     * While nodes do the operations we shutdown coordinator and next oldest 
node become new coordinator.
+     * We should check that new coordinator properly restored latch and all 
nodes finished latch completion successfully after that.
+     *
+     * Each node before coordinator shutdown can be in 3 different states:
+     *
+     * State {@link #beforeCreate} - Node didn't create latch yet.
+     * State {@link #beforeCountDown} - Node created latch but didn't count 
down it yet.
+     * State {@link #all} - Node created latch and count downed it.
+     *
+     * We should check important cases when future coordinator is in one of 
these states, and other 3 nodes have 3 different states.
+     */
+
+    /**
+     * Scenario 1:
+     *
+     * Node 1 state -> {@link #beforeCreate}
+     * Node 2 state -> {@link #beforeCountDown}
+     * Node 3 state -> {@link #all}
+     * Node 4 state -> {@link #beforeCreate}
+     */
+    public void testCoordinatorFail1() throws Exception {
+        List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> 
nodeStates = Lists.newArrayList(
+            beforeCreate,
+            beforeCountDown,
+            all,
+            beforeCreate
+        );
+
+        doTestCoordinatorFail(nodeStates);
+    }
+
+    /**
+     * Scenario 2:
+     *
+     * Node 1 state -> {@link #beforeCountDown}
+     * Node 2 state -> {@link #beforeCountDown}
+     * Node 3 state -> {@link #all}
+     * Node 4 state -> {@link #beforeCreate}
+     */
+    public void testCoordinatorFail2() throws Exception {
+        List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> 
nodeStates = Lists.newArrayList(
+            beforeCountDown,
+            beforeCountDown,
+            all,
+            beforeCreate
+        );
+
+        doTestCoordinatorFail(nodeStates);
+    }
+
+    /**
+     * Scenario 3:
+     *
+     * Node 1 state -> {@link #all}
+     * Node 2 state -> {@link #beforeCountDown}
+     * Node 3 state -> {@link #all}
+     * Node 4 state -> {@link #beforeCreate}
+     */
+    public void testCoordinatorFail3() throws Exception {
+        List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> 
nodeStates = Lists.newArrayList(
+            all,
+            beforeCountDown,
+            all,
+            beforeCreate
+        );
+
+        doTestCoordinatorFail(nodeStates);
+    }
+
+    /**
+     * Test latch coordinator fail with specified scenarios.
+     *
+     * @param nodeScenarios Node scenarios.
+     * @throws Exception If failed.
+     */
+    private void 
doTestCoordinatorFail(List<IgniteBiClosure<ExchangeLatchManager, 
CountDownLatch, Boolean>> nodeScenarios) throws Exception {
+        IgniteEx crd = (IgniteEx) startGridsMultiThreaded(5);
+        crd.cluster().active(true);
+
+        // Latch to synchronize node states.
+        CountDownLatch syncLatch = new CountDownLatch(5);
+
+        GridCompoundFuture finishAllLatches = new GridCompoundFuture();
+
+        AtomicBoolean hasErrors = new AtomicBoolean();
+
+        for (int node = 1; node < 5; node++) {
+            IgniteEx grid = grid(node);
+            ExchangeLatchManager latchMgr = 
grid.context().cache().context().exchange().latch();
+            final int stateIdx = node - 1;
+
+            IgniteInternalFuture<?> fut = 
GridTestUtils.runMultiThreadedAsync(() -> {
+                boolean success = nodeScenarios.get(stateIdx).apply(latchMgr, 
syncLatch);
+                if (!success)
+                    hasErrors.set(true);
+            }, 1, "latch-runner-" + node);
+
+            finishAllLatches.add(fut);
+        }
+
+        finishAllLatches.markInitialized();
+
+        // Wait while all nodes reaches their states.
+        while (syncLatch.getCount() != 1) {
+            Thread.sleep(10);
+
+            if (hasErrors.get())
+                throw new Exception("All nodes should complete latches without 
errors");
+        }
+
+        crd.close();
+
+        // Resume progress for all nodes.
+        syncLatch.countDown();
+
+        // Wait for distributed latch completion.
+        finishAllLatches.get(5000);
+
+        Assert.assertFalse("All nodes should complete latches without errors", 
hasErrors.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
new file mode 100644
index 0000000..63d772a
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class GridCachePartitionsStateValidationTest extends 
GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        cfg.setCommunicationSpi(new 
SingleMessageInterceptorCommunicationSpi(2));
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        clientMode = false;
+    }
+
+    /**
+     * Test that partitions state validation works correctly.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValidationIfPartitionCountersAreInconsistent() throws 
Exception {
+        IgniteEx ignite = (IgniteEx) startGrids(2);
+        ignite.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        // Modify update counter for some partition.
+        for (GridDhtLocalPartition partition : 
ignite.cachex(CACHE_NAME).context().topology().localPartitions()) {
+            partition.updateCounter(100500L);
+            break;
+        }
+
+        // Trigger exchange.
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        // Nothing should happen (just log error message) and we're still able 
to put data to corrupted cache.
+        ignite.cache(CACHE_NAME).put(0, 0);
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that all nodes send correct {@link GridDhtPartitionsSingleMessage} 
with consistent update counters.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionCountersConsistencyOnExchange() throws Exception {
+        IgniteEx ignite = (IgniteEx) startGrids(4);
+        ignite.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        final String atomicCacheName = "atomic-cache";
+        final String txCacheName = "tx-cache";
+
+        clientMode = true;
+
+        Ignite client = startGrid(4);
+
+        clientMode = false;
+
+        IgniteCache atomicCache = client.getOrCreateCache(new 
CacheConfiguration<>(atomicCacheName)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        IgniteCache txCache = client.getOrCreateCache(new 
CacheConfiguration<>(txCacheName)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        for (int it = 0; it < 10; it++) {
+            SingleMessageInterceptorCommunicationSpi spi = 
(SingleMessageInterceptorCommunicationSpi) 
ignite.configuration().getCommunicationSpi();
+            spi.clear();
+
+            // Stop load future.
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            // Run atomic load.
+            IgniteInternalFuture atomicLoadFuture = 
GridTestUtils.runMultiThreadedAsync(() -> {
+                int k = 0;
+
+                while (!stop.get()) {
+                    k++;
+                    try {
+                        atomicCache.put(k, k);
+                    } catch (Exception ignored) {}
+                }
+            }, 1, "atomic-load");
+
+            // Run tx load.
+            IgniteInternalFuture txLoadFuture = 
GridTestUtils.runMultiThreadedAsync(() -> {
+                final int txOps = 5;
+
+                while (!stop.get()) {
+                    List<Integer> randomKeys = Stream.generate(() -> 
ThreadLocalRandom.current().nextInt(5))
+                        .limit(txOps)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+                    try (Transaction tx = 
ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED)) {
+                        for (Integer key : randomKeys)
+                            txCache.put(key, key);
+
+                        tx.commit();
+                    }
+                    catch (Exception ignored) { }
+                }
+            }, 4, "tx-load");
+
+            // Wait for some data.
+            Thread.sleep(1000);
+
+            // Prevent sending full message.
+            spi.blockFullMessage();
+
+            // Trigger exchange.
+            IgniteInternalFuture nodeStopFuture = GridTestUtils.runAsync(() -> 
stopGrid(3));
+
+            try {
+                spi.waitUntilAllSingleMessagesAreSent();
+
+                List<GridDhtPartitionsSingleMessage> interceptedMessages = 
spi.getMessages();
+
+                // Associate each message with existing node UUID.
+                Map<UUID, GridDhtPartitionsSingleMessage> messagesMap = new 
HashMap<>();
+                for (int i = 0; i < interceptedMessages.size(); i++)
+                    messagesMap.put(grid(i + 1).context().localNodeId(), 
interceptedMessages.get(i));
+
+                GridDhtPartitionsStateValidator validator = new 
GridDhtPartitionsStateValidator(ignite.context().cache().context());
+
+                // Validate partition update counters. If counters are not 
consistent, exception will be thrown.
+                
validator.validatePartitionsUpdateCounters(ignite.cachex(atomicCacheName).context().topology(),
 messagesMap, Collections.emptySet());
+                
validator.validatePartitionsUpdateCounters(ignite.cachex(txCacheName).context().topology(),
 messagesMap, Collections.emptySet());
+
+            } finally {
+                // Stop load and resume exchange.
+                spi.unblockFullMessage();
+
+                stop.set(true);
+
+                atomicLoadFuture.get();
+                txLoadFuture.get();
+                nodeStopFuture.get();
+            }
+
+            // Return grid to initial state.
+            startGrid(3);
+
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * SPI which intercepts single messages during exchange.
+     */
+    private static class SingleMessageInterceptorCommunicationSpi extends 
TcpCommunicationSpi {
+        /** */
+        private static final List<GridDhtPartitionsSingleMessage> messages = 
new CopyOnWriteArrayList<>();
+
+        /** Future completes when {@link #singleMessagesThreshold} messages 
are sent to coordinator. */
+        private static final GridFutureAdapter allSingleMessagesSent = new 
GridFutureAdapter();
+
+        /** A number of single messages we're waiting for send. */
+        private final int singleMessagesThreshold;
+
+        /** Latch which blocks full message sending. */
+        private volatile CountDownLatch blockFullMsgLatch;
+
+        /**
+         * Constructor.
+         */
+        private SingleMessageInterceptorCommunicationSpi(int 
singleMessagesThreshold) {
+            this.singleMessagesThreshold = singleMessagesThreshold;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (((GridIoMessage) msg).message() instanceof 
GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage) ((GridIoMessage) msg).message();
+
+                // We're interesting for only exchange messages and when node 
is stopped.
+                if (singleMsg.exchangeId() != null && 
singleMsg.exchangeId().isLeft() && !singleMsg.client()) {
+                    messages.add(singleMsg);
+
+                    if (messages.size() == singleMessagesThreshold)
+                        allSingleMessagesSent.onDone();
+                }
+            }
+
+            try {
+                if (((GridIoMessage) msg).message() instanceof 
GridDhtPartitionsFullMessage) {
+                    if (blockFullMsgLatch != null)
+                        blockFullMsgLatch.await();
+                }
+            }
+            catch (Exception ignored) { }
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /** */
+        public void clear() {
+            messages.clear();
+            allSingleMessagesSent.reset();
+        }
+
+        /** */
+        public List<GridDhtPartitionsSingleMessage> getMessages() {
+            return Collections.unmodifiableList(messages);
+        }
+
+        /** */
+        public void blockFullMessage() {
+            blockFullMsgLatch = new CountDownLatch(1);
+        }
+
+        /** */
+        public void unblockFullMessage() {
+            blockFullMsgLatch.countDown();
+        }
+
+        /** */
+        public void waitUntilAllSingleMessagesAreSent() throws 
IgniteCheckedException {
+            allSingleMessagesSent.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
new file mode 100644
index 0000000..9ed8d54
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+/**
+ * Test correct behaviour of {@link GridDhtPartitionsStateValidator} class.
+ */
+public class GridCachePartitionsStateValidatorSelfTest extends 
GridCommonAbstractTest {
+    /** Mocks and stubs. */
+    private final UUID localNodeId = UUID.randomUUID();
+    /** */
+    private GridCacheSharedContext cctxMock;
+    /** */
+    private GridDhtPartitionTopology topologyMock;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        // Prepare mocks.
+        cctxMock = Mockito.mock(GridCacheSharedContext.class);
+        Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId);
+
+        topologyMock = Mockito.mock(GridDhtPartitionTopology.class);
+        Mockito.when(topologyMock.partitionState(Matchers.any(), 
Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING);
+        Mockito.when(topologyMock.groupId()).thenReturn(0);
+        Mockito.when(topologyMock.partitions()).thenReturn(3);
+
+        List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
+                partitionMock(0, 1, 1),
+                partitionMock(1, 2, 2),
+                partitionMock(2, 3, 3)
+        );
+        
Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
+        
Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
+    }
+
+    /**
+     * @return Partition mock with specified {@code id}, {@code updateCounter} 
and {@code size}.
+     */
+    private GridDhtLocalPartition partitionMock(int id, long updateCounter, 
long size) {
+        GridDhtLocalPartition partitionMock = 
Mockito.mock(GridDhtLocalPartition.class);
+        Mockito.when(partitionMock.id()).thenReturn(id);
+        Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter);
+        Mockito.when(partitionMock.fullSize()).thenReturn(size);
+        return partitionMock;
+    }
+
+    /**
+     * @return Message containing specified {@code countersMap}.
+     */
+    private GridDhtPartitionsSingleMessage fromUpdateCounters(Map<Integer, 
T2<Long, Long>> countersMap) {
+        GridDhtPartitionsSingleMessage msg = new 
GridDhtPartitionsSingleMessage();
+        msg.addPartitionUpdateCounters(0, countersMap);
+        return msg;
+    }
+
+    /**
+     * @return Message containing specified {@code sizesMap}.
+     */
+    private GridDhtPartitionsSingleMessage fromCacheSizes(Map<Integer, Long> 
sizesMap) {
+        GridDhtPartitionsSingleMessage msg = new 
GridDhtPartitionsSingleMessage();
+        msg.addPartitionSizes(0, sizesMap);
+        return msg;
+    }
+
+    /**
+     * Test partition update counters validation.
+     */
+    public void testPartitionCountersValidation() {
+        UUID remoteNode = UUID.randomUUID();
+        UUID ignoreNode = UUID.randomUUID();
+
+        // For partitions 0 and 2 (zero counter) we have inconsistent update 
counters.
+        Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>();
+        updateCountersMap.put(0, new T2<>(2L, 2L));
+        updateCountersMap.put(1, new T2<>(2L, 2L));
+
+        // Form single messages map.
+        Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+        messages.put(remoteNode, fromUpdateCounters(updateCountersMap));
+        messages.put(ignoreNode, fromUpdateCounters(updateCountersMap));
+
+        GridDhtPartitionsStateValidator validator = new 
GridDhtPartitionsStateValidator(cctxMock);
+
+        // (partId, (nodeId, updateCounter))
+        Map<Integer, Map<UUID, Long>> result = 
validator.validatePartitionsUpdateCounters(topologyMock, messages, 
Sets.newHashSet(ignoreNode));
+
+        // Check that validation result contains all necessary information.
+        Assert.assertEquals(2, result.size());
+        Assert.assertTrue(result.containsKey(0));
+        Assert.assertTrue(result.containsKey(2));
+        Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+        Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+        Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+        Assert.assertTrue(result.get(2).get(remoteNode) == 0L);
+    }
+
+    /**
+     * Test partition cache sizes validation.
+     */
+    public void testPartitionCacheSizesValidation() {
+        UUID remoteNode = UUID.randomUUID();
+        UUID ignoreNode = UUID.randomUUID();
+
+        // For partitions 0 and 2 we have inconsistent cache sizes.
+        Map<Integer, Long> cacheSizesMap = new HashMap<>();
+        cacheSizesMap.put(0, 2L);
+        cacheSizesMap.put(1, 2L);
+        cacheSizesMap.put(2, 2L);
+
+        // Form single messages map.
+        Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+        messages.put(remoteNode, fromCacheSizes(cacheSizesMap));
+        messages.put(ignoreNode, fromCacheSizes(cacheSizesMap));
+
+        GridDhtPartitionsStateValidator validator = new 
GridDhtPartitionsStateValidator(cctxMock);
+
+        // (partId, (nodeId, cacheSize))
+        Map<Integer, Map<UUID, Long>> result = 
validator.validatePartitionsSizes(topologyMock, messages, 
Sets.newHashSet(ignoreNode));
+
+        // Check that validation result contains all necessary information.
+        Assert.assertEquals(2, result.size());
+        Assert.assertTrue(result.containsKey(0));
+        Assert.assertTrue(result.containsKey(2));
+        Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+        Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+        Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+        Assert.assertTrue(result.get(2).get(remoteNode) == 2L);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
new file mode 100644
index 0000000..03ea0f7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.T1;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class TxOptimisticOnPartitionExchangeTest extends 
GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 3;
+
+    /** Tx size. */
+    private static final int TX_SIZE = 20 * NODES_CNT;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Logger started. */
+    private static volatile boolean msgInterception;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODES_CNT);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi(log()));
+
+        cfg.setCacheConfiguration(defaultCacheConfiguration()
+            .setName(CACHE_NAME)
+            .setAtomicityMode(TRANSACTIONAL)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setCacheMode(PARTITIONED)
+            .setBackups(1));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConsistencyOnPartitionExchange() throws Exception {
+        doTest(SERIALIZABLE, true);
+        doTest(READ_COMMITTED, true);
+        doTest(SERIALIZABLE, false);
+        doTest(READ_COMMITTED, false);
+    }
+
+    /**
+     * @param isolation {@link TransactionIsolation}.
+     * @param txInitiatorPrimary False If the transaction does not use the 
keys of the node that initiated it.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void doTest(final TransactionIsolation isolation, boolean 
txInitiatorPrimary) throws Exception {
+        final CountDownLatch txStarted = new CountDownLatch(1);
+
+        final IgniteCache cache = ignite(0).cache(CACHE_NAME);
+
+        final Map<Integer, Integer> txValues = new TreeMap<>();
+
+        ClusterNode node = ignite(0).cluster().node();
+
+        GridCacheAffinityManager affinity = 
((IgniteCacheProxy)cache).context().affinity();
+
+        for (int i = 0; txValues.size() < TX_SIZE; i++) {
+            if (!txInitiatorPrimary && node.equals(affinity.primaryByKey(i, 
NONE)))
+                continue;
+
+            txValues.put(i, i);
+        }
+
+        TestCommunicationSpi.init();
+
+        msgInterception = true;
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                try (Transaction tx = 
ignite(0).transactions().txStart(OPTIMISTIC, isolation)) {
+                    info(">>> TX started.");
+
+                    txStarted.countDown();
+
+                    cache.putAll(txValues);
+
+                    tx.commit();
+
+                    info(">>> TX committed.");
+                }
+
+                return null;
+            }
+        });
+
+        txStarted.await();
+
+        try {
+            info(">>> Grid starting.");
+
+            IgniteEx ignite = startGrid(NODES_CNT);
+
+            info(">>> Grid started.");
+
+            fut.get();
+
+            awaitPartitionMapExchange();
+
+            msgInterception = false;
+
+            IgniteCache<Object, Object> cacheStartedNode = 
ignite.cache(CACHE_NAME);
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                Set<Object> keys = 
cacheStartedNode.getAll(txValues.keySet()).keySet();
+
+                assertEquals(txValues.keySet(), new TreeSet<>(keys));
+
+                tx.commit();
+            }
+        }
+        finally {
+            msgInterception = false;
+
+            stopGrid(NODES_CNT);
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ConstantConditions")
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Partition single message sent from added node. */
+        private static volatile CountDownLatch partSingleMsgSentFromAddedNode;
+
+        /** Partition supply message sent count. */
+        private static final AtomicInteger partSupplyMsgSentCnt = new 
AtomicInteger();
+
+        /** Logger. */
+        private IgniteLogger log;
+
+        /**
+         * @param log Logger.
+         */
+        public TestCommunicationSpi(IgniteLogger log) {
+            this.log = log;
+        }
+
+        /**
+         *
+         */
+        public static void init() {
+            partSingleMsgSentFromAddedNode = new CountDownLatch(1);
+
+            partSupplyMsgSentCnt.set(0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (msgInterception) {
+                if (msg instanceof GridIoMessage) {
+                    final Message msg0 = ((GridIoMessage)msg).message();
+
+                    String locNodeId = 
((IgniteEx)ignite).context().localNodeId().toString();
+
+                    int nodeIdx = 
Integer.parseInt(locNodeId.substring(locNodeId.length() - 3));
+
+                    if (nodeIdx == 0) {
+                        if (msg0 instanceof GridNearTxPrepareRequest || msg0 
instanceof GridDhtTxPrepareRequest) {
+                            GridTestUtils.runAsync(new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    partSingleMsgSentFromAddedNode.await();
+
+                                    sendMessage(node, msg, ackC, true);
+
+                                    return null;
+                                }
+                            });
+
+                            return;
+
+                        }
+                        else if (msg0 instanceof GridNearTxFinishRequest || 
msg0 instanceof GridDhtTxFinishRequest) {
+                            GridTestUtils.runAsync(new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    final T1<Integer> i = new T1<>(0);
+
+                                    while (waitForCondition(new 
GridAbsPredicate() {
+                                        @Override public boolean apply() {
+                                            return partSupplyMsgSentCnt.get() 
> i.get();
+                                        }
+                                    }, i.get() == 0 ? 5_000 : 500))
+                                        i.set(partSupplyMsgSentCnt.get());
+
+                                    sendMessage(node, msg, ackC, true);
+
+                                    return null;
+                                }
+                            });
+
+                            return;
+                        }
+                    }
+                    else if (nodeIdx == NODES_CNT && msg0 instanceof 
GridDhtPartitionsSingleMessage)
+                        partSingleMsgSentFromAddedNode.countDown();
+
+                    if (msg0 instanceof GridDhtPartitionSupplyMessage)
+                        partSupplyMsgSentCnt.incrementAndGet();
+                }
+            }
+
+            sendMessage(node, msg, ackC, msgInterception);
+        }
+
+        /**
+         * @param node Node.
+         * @param msg Message.
+         * @param ackC Ack closure.
+         * @param logMsg Log Messages.
+         */
+        private void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC,
+            boolean logMsg
+        ) throws IgniteSpiException {
+            if (logMsg) {
+                String id = node.id().toString();
+                String locNodeId = 
((IgniteEx)ignite).context().localNodeId().toString();
+
+                Message msg0 = ((GridIoMessage)msg).message();
+
+                log.info(
+                    String.format(">>> Output msg[type=%s, fromNode= %s, 
toNode=%s]",
+                        msg0.getClass().getSimpleName(),
+                        locNodeId.charAt(locNodeId.length() - 1),
+                        id.charAt(id.length() - 1)
+                    )
+                );
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb397f7..0612615 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -133,6 +133,8 @@ import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheT
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxExceptionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
@@ -292,6 +294,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class);
         suite.addTestSuite(CacheDeferredDeleteSanitySelfTest.class);
         suite.addTestSuite(CacheDeferredDeleteQueueTest.class);
+        suite.addTestSuite(GridCachePartitionsStateValidatorSelfTest.class);
+        suite.addTestSuite(GridCachePartitionsStateValidationTest.class);
 
         suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4cc9f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index f8add30..415479d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim
 import 
org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest;
+import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
@@ -40,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticT
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
+import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
@@ -93,6 +95,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
         
suite.addTestSuite(PartitionedTransactionalOptimisticCacheGetsDistributionTest.class);
         
suite.addTestSuite(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
 
+        suite.addTestSuite(TxOptimisticOnPartitionExchangeTest.class);
+
+        
suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class);
+
         return suite;
     }
 }

Reply via email to