Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 ef038c4cd -> c7a606905


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c7a60690
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c7a60690
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c7a60690

Branch: refs/heads/ignite-5578
Commit: c7a606905c7fd1ab40240f0cdc87947432759fdd
Parents: ef038c4
Author: sboikov <[email protected]>
Authored: Tue Jul 18 17:58:31 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Jul 18 18:56:05 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 13 ++++
 .../discovery/GridDiscoveryManager.java         | 21 +++++--
 .../cache/CacheAffinitySharedManager.java       | 63 +++++---------------
 .../dht/GridDhtPartitionTopologyImpl.java       |  3 +
 .../preloader/CacheGroupAffinityMessage.java    |  9 ++-
 .../distributed/CacheExchangeMergeTest.java     | 16 +++--
 6 files changed, 67 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index f63c5f6..ac2d0b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -85,6 +86,9 @@ public class DiscoCache {
     /** */
     private final IgniteProductVersion minNodeVer;
 
+    /** */
+    private final AffinityTopologyVersion topVer;
+
     /**
      * @param state Current cluster state.
      * @param loc Local node.
@@ -101,6 +105,7 @@ public class DiscoCache {
      * @param alives Alive nodes.
      */
     DiscoCache(
+        AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
         List<ClusterNode> rmtNodes,
@@ -114,6 +119,7 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
         Set<UUID> alives) {
+        this.topVer = topVer;
         this.state = state;
         this.loc = loc;
         this.rmtNodes = rmtNodes;
@@ -143,6 +149,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return
+     */
+    public AffinityTopologyVersion version() {
+        return topVer;
+    }
+
+    /**
      * @return Minimum node version.
      */
     public IgniteProductVersion minimumNodeVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9f5bd3f..adf4431 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -597,7 +597,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 ChangeGlobalStateFinishMessage stateFinishMsg = null;
 
                 if (locJoinEvt) {
-                    discoCache = createDiscoCache(ctx.state().clusterState(), 
locNode, topSnapshot);
+                    discoCache = createDiscoCache(new 
AffinityTopologyVersion(topVer, minorTopVer),
+                        ctx.state().clusterState(),
+                        locNode,
+                        topSnapshot);
 
                     transitionWaitFut = ctx.state().onLocalJoin(discoCache);
                 }
@@ -620,7 +623,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     else if (customMsg instanceof 
ChangeGlobalStateFinishMessage) {
                         
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
 
-                        discoCache = 
createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(topSnap.get().topVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
 
                         topSnap.set(new Snapshot(topSnap.get().topVer, 
discoCache));
 
@@ -668,7 +674,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 // event notifications, since SPI notifies manager about all 
events from this listener.
                 if (verChanged) {
                     if (discoCache == null)
-                        discoCache = 
createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(nextTopVer, 
ctx.state().clusterState(), locNode, topSnapshot);
 
                     discoCacheHist.put(nextTopVer, discoCache);
 
@@ -739,7 +745,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     topHist.clear();
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        createDiscoCache(ctx.state().clusterState(), locNode, 
Collections.<ClusterNode>emptySet())));
+                        createDiscoCache(AffinityTopologyVersion.ZERO, 
ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -2149,7 +2155,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.
      */
-    @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState 
state,
+    @NotNull private DiscoCache createDiscoCache(
+        AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState state,
         ClusterNode loc,
         Collection<ClusterNode> topSnapshot) {
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
@@ -2226,6 +2234,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         }
 
         return new DiscoCache(
+            topVer,
             state,
             loc,
             Collections.unmodifiableList(rmtNodes),
@@ -2368,7 +2377,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED,
                             AffinityTopologyVersion.NONE,
                             node,
-                            createDiscoCache(null, node, empty),
+                            createDiscoCache(AffinityTopologyVersion.NONE, 
null, node, empty),
                             empty,
                             null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index df7bf03..ba6a22b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -89,14 +89,14 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT,
 10_000);
 
     /** */
-    static final IgniteClosure<ClusterNode, UUID> NODE_TO_ID = new 
IgniteClosure<ClusterNode, UUID>() {
+    private static final IgniteClosure<ClusterNode, UUID> NODE_TO_ID = new 
IgniteClosure<ClusterNode, UUID>() {
         @Override public UUID apply(ClusterNode node) {
             return node.id();
         }
     };
 
     /** */
-    static final IgniteClosure<ClusterNode, Long> NODE_TO_ORDER = new 
IgniteClosure<ClusterNode, Long>() {
+    private static final IgniteClosure<ClusterNode, Long> NODE_TO_ORDER = new 
IgniteClosure<ClusterNode, Long>() {
         @Override public Long apply(ClusterNode node) {
             return node.order();
         }
@@ -105,9 +105,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     /** Affinity information for all started caches (initialized on 
coordinator). */
     private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new 
ConcurrentHashMap<>();
 
-    /** Last topology version when affinity was calculated (updated from 
exchange thread). */
-    private AffinityTopologyVersion affCalcVer;
-
     /** Topology version which requires affinity re-calculation (set from 
discovery thread). */
     private AffinityTopologyVersion lastAffVer;
 
@@ -172,8 +169,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             // Clean-up in case of client reconnect.
             caches.clear();
 
-            affCalcVer = null;
-
             lastAffVer = null;
 
             caches.init(cctx.cache().cacheGroupDescriptors(), 
cctx.cache().cacheDescriptors());
@@ -272,10 +267,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             if (waitInfo == null)
                 return;
 
-            assert affCalcVer != null;
-            assert affCalcVer.equals(waitInfo.topVer) : "Invalid affinity 
version [calcVer=" + affCalcVer +
-                ", waitVer=" + waitInfo.topVer + ']';
-
             Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId);
 
             boolean rebalanced = true;
@@ -841,7 +832,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         }
 
         if (stoppedGrps != null) {
-            boolean notify = false;
+            AffinityTopologyVersion notifyTopVer = null;
 
             synchronized (mux) {
                 if (waitInfo != null) {
@@ -849,7 +840,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         boolean rmv = waitInfo.waitGrps.remove(grpId) != null;
 
                         if (rmv) {
-                            notify = true;
+                            notifyTopVer = waitInfo.topVer;
 
                             waitInfo.assignments.remove(grpId);
                         }
@@ -857,8 +848,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 }
             }
 
-            if (notify) {
-                final AffinityTopologyVersion topVer = affCalcVer;
+            if (notifyTopVer != null) {
+                final AffinityTopologyVersion topVer = notifyTopVer;
 
                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
@@ -948,16 +939,12 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         boolean crd,
         final CacheAffinityChangeMessage msg)
         throws IgniteCheckedException {
-        assert affCalcVer != null || cctx.kernalContext().clientNode();
         assert msg.topologyVersion() != null && msg.exchangeId() == null : msg;
-        assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion()) :
-            "Invalid version [affCalcVer=" + affCalcVer + ", msg=" + msg + ']';
 
         final AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
         if (log.isDebugEnabled()) {
             log.debug("Process affinity change message [exchVer=" + 
exchFut.topologyVersion() +
-                ", affCalcVer=" + affCalcVer +
                 ", msgVer=" + msg.topologyVersion() + ']');
         }
 
@@ -1019,11 +1006,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), 
topVer);
             }
         });
-
-        synchronized (mux) {
-            if (affCalcVer == null)
-                affCalcVer = msg.topologyVersion();
-        }
     }
 
     /**
@@ -1267,6 +1249,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
+        log.info("mergeExchangesOnServerLeft [topVer=" + 
fut.context().events().discoveryCache().version() + ']');
+
         forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 ExchangeDiscoveryEvents evts = fut.context().events();
@@ -1301,10 +1285,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 aff.initialize(evts.topologyVersion(), cachedAssignment(aff, 
newAssignment, affCache));
             }
         });
-
-        synchronized (mux) {
-            affCalcVer = fut.context().events().waitRebalanceEventVersion();
-        }
     }
 
     public void onJoin(final GridDhtPartitionsExchangeFuture fut, 
GridDhtPartitionsFullMessage msg)
@@ -1345,18 +1325,14 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 }
             }
         });
-
-        if (!cctx.kernalContext().clientNode()) {
-            synchronized (mux) {
-                affCalcVer = fut.context().events().topologyVersion();
-            }
-        }
     }
 
     public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture 
fut, boolean crd)
         throws IgniteCheckedException {
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
+        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + 
evts.discoveryCache().version() + ']');
+
         assert evts.serverJoin() && !evts.serverLeft();
 
         WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, 
crd);
@@ -1371,6 +1347,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         assert evts.serverLeft();
 
+        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + 
evts.discoveryCache().version() + ']');
+
         forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 AffinityTopologyVersion topVer = evts.topologyVersion();
@@ -1381,10 +1359,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             }
         });
 
-        synchronized (mux) {
-            affCalcVer = evts.waitRebalanceEventVersion();
-        }
-
         Map<Integer, Map<Integer, List<Long>>> diff = 
initAffinityOnNodeLeft0(evts.topologyVersion(),
             fut,
             NODE_TO_ORDER,
@@ -1433,8 +1407,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, 
AffinityTopologyVersion topVer, boolean crd) {
-        affCalcVer = topVer;
-
         this.waitInfo = waitRebalanceInfo != null && 
!waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
 
         WaitRebalanceInfo info = this.waitInfo;
@@ -1606,8 +1578,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         }
 
         synchronized (mux) {
-            affCalcVer = fut.topologyVersion();
-
             this.waitInfo = null;
         }
 
@@ -1992,7 +1962,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(fut.context().events().waitRebalanceEventVersion());
 
-        final Collection<ClusterNode> aliveNodes = 
cctx.discovery().nodes(topVer);
+        final Collection<ClusterNode> aliveNodes = 
fut.context().events().discoveryCache().serverNodes();
 
         final Map<Integer, Map<Integer, List<T>>> assignment = new HashMap<>();
 
@@ -2013,7 +1983,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 assert newAssignment != null;
 
-                List<List<ClusterNode>> newAssignment0 =  initAff ? new 
ArrayList<>(newAssignment) : null;
+                List<List<ClusterNode>> newAssignment0 = initAff ? new 
ArrayList<>(newAssignment) : null;
 
                 GridDhtPartitionTopology top = grpHolder.topology(fut);
 
@@ -2052,7 +2022,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                                 for (int i = 1; i < curNodes.size(); i++) {
                                     ClusterNode curNode = curNodes.get(i);
 
-                                    if (top.partitionState(curNode.id(), p) == 
GridDhtPartitionState.OWNING) {
+                                    if (top.partitionState(curNode.id(), p) == 
GridDhtPartitionState.OWNING &&
+                                        aliveNodes.contains(curNode)) {
                                         newNodes0 = 
latePrimaryAssignment(grpHolder.affinity(),
                                             p,
                                             curNode,
@@ -2107,8 +2078,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         });
 
         synchronized (mux) {
-            assert affCalcVer.equals(topVer);
-
             this.waitInfo = !waitRebalanceInfo.empty() ? waitRebalanceInfo : 
null;
 
             WaitRebalanceInfo info = this.waitInfo;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 74156d9..52df4a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1118,6 +1118,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             if (stopping)
                 return false;
 
+            if (exchangeVer == null && !topReadyFut.isDone())
+                return false;
+
             if (exchangeVer != null) {
                 assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 69a507c..fcfec1d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -135,6 +135,12 @@ public class CacheGroupAffinityMessage implements Message {
         return cachesAff;
     }
 
+    /**
+     * @param assign Nodes orders.
+     * @param nodesByOrder Nodes by order cache.
+     * @param discoCache Discovery data cache.
+     * @return Nodes list.
+     */
     public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, 
ClusterNode> nodesByOrder, DiscoCache discoCache) {
         List<ClusterNode> assign0 = new ArrayList<>(assign.size());
 
@@ -146,7 +152,8 @@ public class CacheGroupAffinityMessage implements Message {
             if (affNode == null) {
                 affNode = discoCache.serverNodeByOrder(order);
 
-                assert affNode != null : order;
+                assert affNode != null : "Failed to find node by order 
[order=" + order +
+                    ", topVer=" + discoCache.version() + ']';
 
                 nodesByOrder.put(order, affNode);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 5e0d7e2..376224a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -67,8 +67,14 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         }
 
         cfg.setCacheConfiguration(
-            cacheConfiguration("c1", ATOMIC),
-            cacheConfiguration("c2", TRANSACTIONAL));
+            cacheConfiguration("c1", ATOMIC, 0),
+            cacheConfiguration("c2", ATOMIC, 1),
+            cacheConfiguration("c3", ATOMIC, 2),
+            cacheConfiguration("c4", ATOMIC, 10),
+            cacheConfiguration("c5", TRANSACTIONAL, 0),
+            cacheConfiguration("c6", TRANSACTIONAL, 1),
+            cacheConfiguration("c7", TRANSACTIONAL, 2),
+            cacheConfiguration("c8", TRANSACTIONAL, 10));
 
         return cfg;
     }
@@ -83,13 +89,15 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     /**
      * @param name Cache name.
      * @param atomicityMode Cache atomicity mode.
+     * @param backups Number of backups.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration(String name, 
CacheAtomicityMode atomicityMode) {
+    private CacheConfiguration cacheConfiguration(String name, 
CacheAtomicityMode atomicityMode, int backups) {
         CacheConfiguration ccfg = new CacheConfiguration(name);
 
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
 
         return ccfg;
     }
@@ -246,7 +254,7 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
 
         assertTrue(nodes.size() > 0);
 
-        String[] cacheNames = {"c1", "c2"};
+        String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"};
 
         ThreadLocalRandom rnd = ThreadLocalRandom.current();
 

Reply via email to