Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 95a41e885 -> 25520bf97


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25520bf9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25520bf9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25520bf9

Branch: refs/heads/ignite-5578
Commit: 25520bf97f03218529bb8ae39ba9510453567793
Parents: 95a41e8
Author: sboikov <[email protected]>
Authored: Fri Aug 4 10:43:55 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Aug 4 11:39:30 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 22 +++--
 .../processors/cache/GridCacheIoManager.java    | 18 ++++
 .../dht/GridClientPartitionTopology.java        |  6 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 99 +++++++++-----------
 .../GridDhtPartitionsExchangeFuture.java        | 10 +-
 .../datastreamer/DataStreamerImpl.java          |  9 +-
 .../distributed/CacheExchangeMergeTest.java     |  9 +-
 7 files changed, 99 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d02df5c..b99f054 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -186,9 +186,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == 
EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
-            assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;
+            synchronized (mux) {
+                assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;
 
-            lastAffVer = topVer;
+                lastAffVer = topVer;
+            }
         }
     }
 
@@ -266,7 +268,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         CacheAffinityChangeMessage msg = null;
 
         synchronized (mux) {
-            if (waitInfo == null)
+            if (waitInfo == null || !waitInfo.topVer.equals(lastAffVer) )
                 return;
 
             Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId);
@@ -305,14 +307,14 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     }
                 }
             }
-        }
 
-        try {
-            if (msg != null)
-                cctx.discovery().sendCustomEvent(msg);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send affinity change message.", e);
+            try {
+                if (msg != null)
+                    cctx.discovery().sendCustomEvent(msg);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send affinity change message.", e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 91872e8..6529795 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -480,6 +480,24 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Lock or {@code null} if node is stopping.
+     */
+    @Nullable public Lock readLock() {
+        Lock lock = rw.readLock();
+
+        if (!lock.tryLock())
+            return null;
+
+        if (stopping) {
+            lock.unlock();
+
+            return null;
+        }
+
+        return lock;
+    }
+
+    /**
      *
      */
     public void writeLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 12f3fb3..c934df0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -313,7 +313,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
         if (exchFut.context().events().hasServerLeft()) {
-            for (DiscoveryEvent evt : exchFut.context().events().events()) {
+            List<DiscoveryEvent> evts0 = exchFut.context().events().events();
+
+            for (int i = 0; i < evts0.size(); i++) {
+                DiscoveryEvent evt = evts0.get(i);
+
                 if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
                     removeNode(evt.eventNode().id());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0f804d9..3ecb443a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -109,12 +109,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     /** */
     private volatile AffinityTopologyVersion diffFromAffinityVer = 
AffinityTopologyVersion.NONE;
 
-    /** */
-    private volatile AffinityTopologyVersion readyTopVer = 
AffinityTopologyVersion.NONE;
-
-    /** */
+    /** Last started exchange version (always >= readyTopVer). */
     private volatile AffinityTopologyVersion lastTopChangeVer = 
AffinityTopologyVersion.NONE;
 
+    /** Last finished exchange version. */
+    private volatile AffinityTopologyVersion readyTopVer = 
AffinityTopologyVersion.NONE;
+
     /** Discovery cache. */
     private volatile DiscoCache discoCache;
 
@@ -261,8 +261,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         return topVer;
     }
 
-    @Override
-    public AffinityTopologyVersion lastTopologyChangeVersion() {
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion lastTopologyChangeVersion() {
         AffinityTopologyVersion topVer = this.lastTopChangeVer;
 
         assert topVer.topologyVersion() > 0 : "Invalid topology version 
[topVer=" + topVer +
@@ -322,9 +322,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             assert grp.affinity().lastVersion().equals(affVer) :
                 "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
-                    ", grp=" + grp.cacheOrGroupName() +
-                    ", affVer=" + affVer +
-                    ", fut=" + exchFut + ']';
+                ", grp=" + grp.cacheOrGroupName() +
+                ", affVer=" + affVer +
+                ", fut=" + exchFut + ']';
 
             int num = grp.affinity().partitions();
 
@@ -505,8 +505,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     }
 
                     if (evts.hasServerLeft()) {
-                        for (DiscoveryEvent evt : evts.events()) {
-                            if (evts.serverLeftEvent(evt))
+                        List<DiscoveryEvent> evts0 = evts.events();
+
+                        for (int i = 0; i < evts0.size(); i++) {
+                            DiscoveryEvent evt = evts0.get(i);
+
+                            if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
                                 removeNode(evt.eventNode().id());
                         }
                     }
@@ -581,7 +585,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             if (stopping)
                 return false;
 
-            assert readyTopVer.topologyVersion() > 0 : readyTopVer;
+            assert readyTopVer.initialized() : readyTopVer;
             assert lastTopChangeVer.equals(readyTopVer);
 
             if (log.isDebugEnabled())
@@ -659,12 +663,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
             }
 
-            List<List<ClusterNode>> aff = 
grp.affinity().readyAssignments(topVer);
+            AffinityAssignment aff = grp.affinity().readyAffinity(topVer);
 
             if (node2part != null && node2part.valid())
-                changed |= checkEvictions(updateSeq, topVer, aff);
+                changed |= checkEvictions(updateSeq, aff);
 
-            updateRebalanceVersion(aff);
+            updateRebalanceVersion(aff.assignment());
 
             consistencyCheck();
         }
@@ -1156,7 +1160,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
-            if (stopping || lastTopChangeVer == null)
+            if (stopping || !lastTopChangeVer.initialized())
                 return false;
 
             if (incomeCntrMap != null) {
@@ -1185,13 +1189,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             if (exchangeVer != null) {
                 // Ignore if exchange already finished or new exchange started.
                 if (readyTopVer.compareTo(exchangeVer) > 0 || 
lastTopChangeVer.compareTo(exchangeVer) > 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Stale exchange id for full partition map 
update (will ignore) [" +
-                            "lastTopChange=" + lastTopChangeVer +
-                            ", readTopVer=" + readyTopVer +
-                            ", exchVer=" + exchangeVer + ']');
-                    }
-
                     U.warn(log, "Stale exchange id for full partition map 
update (will ignore) [" +
                         "lastTopChange=" + lastTopChangeVer +
                         ", readTopVer=" + readyTopVer +
@@ -1202,13 +1199,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             }
 
             if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 
0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Stale version for full partition map update 
message (will ignore) [" +
-                        "lastTopChange=" + lastTopChangeVer +
-                        ", readTopVer=" + readyTopVer +
-                        ", msgVer=" + msgTopVer + ']');
-                }
-
                 U.warn(log, "Stale version for full partition map update 
message (will ignore) [" +
                     "lastTopChange=" + lastTopChangeVer +
                     ", readTopVer=" + readyTopVer +
@@ -1382,12 +1372,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             if (readyTopVer.initialized() && 
readyTopVer.equals(lastTopChangeVer)) {
-                List<List<ClusterNode>> aff = 
grp.affinity().readyAssignments(readyTopVer);
+                AffinityAssignment  aff = 
grp.affinity().readyAffinity(readyTopVer);
 
                 if (exchangeVer == null)
-                    changed |= checkEvictions(updateSeq, readyTopVer, aff);
+                    changed |= checkEvictions(updateSeq, aff);
 
-                updateRebalanceVersion(aff);
+                updateRebalanceVersion(aff.assignment());
             }
 
             consistencyCheck();
@@ -1490,14 +1480,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 return false;
 
             if (!force) {
-                if (lastTopChangeVer != null && exchId != null && 
lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Stale exchange id for single partition map 
update (will ignore) [" +
-                            "lastTopChange=" + lastTopChangeVer +
-                            ", readTopVer=" + readyTopVer +
-                            ", exch=" + exchId.topologyVersion() + ']');
-                    }
-
+                if (lastTopChangeVer.initialized() && exchId != null && 
lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) {
                     U.warn(log, "Stale exchange id for single partition map 
update (will ignore) [" +
                         "lastTopChange=" + lastTopChangeVer +
                         ", readTopVer=" + readyTopVer +
@@ -1518,11 +1501,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     parts.updateSequence(cur.updateSequence(), 
cur.topologyVersion());
             }
             else  if (isStaleUpdate(cur, parts)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Stale update for single partition map update 
(will ignore) [exchId=" + exchId +
-                        ", curMap=" + cur + ", newMap=" + parts + ']');
-                }
-
                 U.warn(log, "Stale update for single partition map update 
(will ignore) [exchId=" + exchId +
                     ", curMap=" + cur +
                     ", newMap=" + parts + ']');
@@ -1541,7 +1519,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             node2part.put(parts.nodeId(), parts);
 
-            // During exchange calculate diff after all messages are received 
and affinity initialized.
+            // During exchange diff is calculated after all messages are 
received and affinity initialized.
             if (exchId == null && !grp.isReplicated()) {
                 if (readyTopVer.initialized() && 
readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
                     AffinityAssignment affAssignment = 
grp.affinity().readyAffinity(readyTopVer);
@@ -1589,12 +1567,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             }
 
             if (readyTopVer.initialized() && 
readyTopVer.equals(lastTopChangeVer)) {
-                List<List<ClusterNode>> aff = 
grp.affinity().assignments(readyTopVer);
+                AffinityAssignment aff = 
grp.affinity().readyAffinity(readyTopVer);
 
                 if (exchId == null)
-                    changed |= checkEvictions(updateSeq, readyTopVer, aff);
+                    changed |= checkEvictions(updateSeq, aff);
 
-                updateRebalanceVersion(aff);
+                updateRebalanceVersion(aff.assignment());
             }
 
             consistencyCheck();
@@ -1635,6 +1613,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         }
     }
 
+    /**
+     * @param aff Affinity.
+     */
     private void createMovingPartitions(AffinityAssignment aff) {
         for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
             GridDhtPartitionMap map = e.getValue();
@@ -1644,6 +1625,10 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         }
     }
 
+    /**
+     * @param map Node partition state map.
+     * @param parts Partitions assigned to node.
+     */
     private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
         if (F.isEmpty(parts))
             return;
@@ -1813,7 +1798,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
             }
 
-            checkEvictions(updSeq, resTopVer, 
grp.affinity().readyAssignments(resTopVer));
+            checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer));
 
             grp.needsRecovery(false);
         }
@@ -1914,7 +1899,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(long updateSeq, AffinityTopologyVersion 
affVer, List<List<ClusterNode>> aff) {
+    private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
         boolean changed = false;
 
         UUID locId = ctx.localNodeId();
@@ -1931,7 +1916,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 List<ClusterNode> affNodes = aff.get(p);
 
                 if (!affNodes.contains(ctx.localNode())) {
-                    List<ClusterNode> nodes = nodes(p, affVer, OWNING, null);
+                    List<ClusterNode> nodes = nodes(p, aff.topologyVersion(), 
OWNING, null);
                     Collection<UUID> nodeIds = F.nodeIds(nodes);
 
                     // If all affinity nodes are owners, then evict partition 
from local node.
@@ -1940,7 +1925,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                         part.rent(false);
 
-                        updateSeq = updateLocal(part.id(), part.state(), 
updateSeq, affVer);
+                        updateSeq = updateLocal(part.id(), part.state(), 
updateSeq, aff.topologyVersion());
 
                         changed = true;
 
@@ -1965,7 +1950,10 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                                     part.rent(false);
 
-                                    updateSeq = updateLocal(part.id(), 
part.state(), updateSeq, affVer);
+                                    updateSeq = updateLocal(part.id(),
+                                        part.state(),
+                                        updateSeq,
+                                        aff.topologyVersion());
 
                                     changed = true;
 
@@ -1991,6 +1979,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param p Partition.
      * @param state State.
      * @param updateSeq Update sequence.
+     * @param affVer Affinity version.
      * @return Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})

http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 546b17b..67e41b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -2999,14 +3000,19 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                                         newCrdFut.listen(new 
CI1<IgniteInternalFuture>() {
                                             @Override public void 
apply(IgniteInternalFuture fut) {
-                                                if (isDone() || !enterBusy())
+                                                if (isDone())
+                                                    return;
+
+                                                Lock lock = 
cctx.io().readLock();
+
+                                                if (lock == null)
                                                     return;
 
                                                 try {
                                                     
onBecomeCoordinator((InitNewCoordinatorFuture) fut);
                                                 }
                                                 finally {
-                                                    leaveBusy();
+                                                    lock.unlock();
                                                 }
                                             }
                                         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index afaa81d..e2ee0b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -768,9 +768,12 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             }
 
             try {
-                AffinityTopologyVersion topVer = allowOverwrite() || 
cctx.isLocal() ?
-                        
ctx.cache().context().exchange().readyAffinityVersion() :
-                        cctx.topology().readyTopologyVersion();
+                AffinityTopologyVersion topVer;
+
+                if (!cctx.isLocal())
+                    topVer = 
ctx.cache().context().exchange().lastTopologyFuture().get();
+                else
+                    topVer = 
ctx.cache().context().exchange().readyAffinityVersion();
 
                 for (DataStreamerEntry entry : entries) {
                     List<ClusterNode> nodes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/25520bf9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index d9d7e08..3149385 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -1315,7 +1315,7 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
                     if 
(cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == 
TRANSACTIONAL) {
                         for (TransactionConcurrency concurrency : 
TransactionConcurrency.values()) {
                             for (TransactionIsolation isolation : 
TransactionIsolation.values())
-                                checkNodeCaches(node, cache, concurrency, 
isolation);
+                                checkNodeCaches(err, node, cache, concurrency, 
isolation);
                         }
                     }
                 }
@@ -1327,12 +1327,15 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param err Error message.
      * @param node Node.
      * @param cache Cache.
      * @param concurrency Transaction concurrency.
      * @param isolation Transaction isolation.
      */
-    private void checkNodeCaches(Ignite node,
+    private void checkNodeCaches(
+        String err,
+        Ignite node,
         IgniteCache<Object, Object> cache,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation) {
@@ -1362,7 +1365,7 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         }
 
         for (Map.Entry<Object, Object> e : map.entrySet())
-            assertEquals(e.getValue(), cache.get(e.getKey()));
+            assertEquals(err, e.getValue(), cache.get(e.getKey()));
     }
 
     /**

Reply via email to