This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 51accac IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942. 51accac is described below commit 51accac9f748689be72bfbce390a19fb8a6896f8 Author: Pavel Voronkin <pvoron...@gridgain.com> AuthorDate: Mon Feb 4 13:48:16 2019 +0300 IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../cache/GridCachePartitionExchangeManager.java | 17 -- .../dht/preloader/CacheGroupAffinityMessage.java | 25 ++- .../preloader/GridDhtPartitionsExchangeFuture.java | 193 +++++++++++++-------- .../dht/topology/GridDhtPartitionTopologyImpl.java | 139 +++++++-------- .../cache/distributed/CacheParallelStartTest.java | 54 +++--- 5 files changed, 232 insertions(+), 196 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 525c41a..71a704c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -200,9 +200,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private GridFutureAdapter<?> reconnectExchangeFut; - /** */ - private final Object interruptLock = new Object(); - /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -818,13 +815,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @return Interrupt lock. - */ - public Object interruptLock() { - return interruptLock; - } - - /** * @param grpId Cache group ID. * @return Topology. */ @@ -2639,13 +2629,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - /** {@inheritDoc} */ - @Override public void cancel() { - synchronized (interruptLock) { - super.cancel(); - } - } - /** * Add custom exchange task. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index 695eadc..ef56834 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; @@ -135,25 +136,23 @@ public class CacheGroupAffinityMessage implements Message { } /** + * Fill Map of CacheGroupAffinityMessages. + * * @param cctx Context. * @param topVer Topology version. * @param affReq Cache group IDs. * @param cachesAff Optional already prepared affinity. - * @return Affinity. */ - static Map<Integer, CacheGroupAffinityMessage> createAffinityMessages( + static void createAffinityMessages( GridCacheSharedContext cctx, AffinityTopologyVersion topVer, Collection<Integer> affReq, - @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff + Map<Integer, CacheGroupAffinityMessage> cachesAff ) { assert !F.isEmpty(affReq) : affReq; - if (cachesAff == null) - cachesAff = U.newHashMap(affReq.size()); - for (Integer grpId : affReq) { - if (!cachesAff.containsKey(grpId)) { + cachesAff.computeIfAbsent(grpId, (integer) -> { GridAffinityAssignmentCache aff = cctx.affinity().groupAffinity(grpId); // If no coordinator group holder on the node, try fetch affinity from existing cache group. @@ -169,15 +168,13 @@ public class CacheGroupAffinityMessage implements Message { List<List<ClusterNode>> assign = aff.readyAssignments(topVer); - CacheGroupAffinityMessage msg = new CacheGroupAffinityMessage(assign, + return new CacheGroupAffinityMessage( + assign, aff.centralizedAffinityFunction() ? aff.idealAssignment() : null, - null); - - cachesAff.put(grpId, msg); - } + null + ); + }); } - - return cachesAff; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5cfa56e..f4043ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -3108,25 +3109,31 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param resTopVer Result topology version. */ private void detectLostPartitions(AffinityTopologyVersion resTopVer) { - boolean detected = false; + AtomicInteger detected = new AtomicInteger(); - synchronized (cctx.exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - return; - - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (!grp.isLocal()) { - // Do not trigger lost partition events on start. - boolean event = !localJoinExchange() && !activateCluster(); + try { + // Reserve at least 2 threads for system operations. + U.doInParallel( + U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2), + cctx.kernalContext().getSystemExecutorService(), + cctx.cache().cacheGroups(), + grp -> { + if (!grp.isLocal()) { + // Do not trigger lost partition events on start. + boolean evt = !localJoinExchange() && !activateCluster(); - boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, event ? events().lastEvent() : null); + if (grp.topology().detectLostPartitions(resTopVer, evt ? events().lastEvent() : null)) + detected.incrementAndGet(); + } - detected |= detectedOnGrp; - } - } + return null; + }); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } - if (detected) { + if (detected.get() > 0) { if (log.isDebugEnabled()) log.debug("Partitions have been scheduled to resend [reason=" + "Lost partitions detect on " + resTopVer + "]"); @@ -3143,22 +3150,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void resetLostPartitions(Collection<String> cacheNames) { assert !exchCtx.mergeExchanges(); - synchronized (cctx.exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - return; - - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; + try { + // Reserve at least 2 threads for system operations. + U.doInParallel( + U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2), + cctx.kernalContext().getSystemExecutorService(), + cctx.cache().cacheGroups(), + grp -> { + if (grp.isLocal()) + return null; - for (String cacheName : cacheNames) { - if (grp.hasCache(cacheName)) { - grp.topology().resetLostPartitions(initialVersion()); + for (String cacheName : cacheNames) { + if (grp.hasCache(cacheName)) { + grp.topology().resetLostPartitions(initialVersion()); - break; + break; + } } - } - } + + return null; + }); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } } @@ -3295,6 +3309,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null; + // Reserve at least 2 threads for system operations. + int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); + if (exchCtx.mergeExchanges()) { synchronized (mux) { if (mergedJoinExchMsgs != null) { @@ -3315,51 +3332,65 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true); - for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) { - if (desc.config().getCacheMode() == CacheMode.LOCAL) - continue; - CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + U.doInParallel( + parallelismLvl, + cctx.kernalContext().getSystemExecutorService(), + cctx.affinity().cacheGroups().values(), + desc -> { + if (desc.config().getCacheMode() == CacheMode.LOCAL) + return null; - GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache()); + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); - top.beforeExchange(this, true, true); - } + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache()); + + top.beforeExchange(this, true, true); + + return null; + }); } timeBag.finishGlobalStage("Affinity recalculation (crd)"); - Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size()); - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - GridDhtPartitionsSingleMessage msg = e.getValue(); + U.doInParallel( + parallelismLvl, + cctx.kernalContext().getSystemExecutorService(), + msgs.entrySet(), + entry -> { + GridDhtPartitionsSingleMessage msg = entry.getValue(); - // Apply update counters after all single messages are received. - for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { - Integer grpId = entry.getKey(); + for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) { + Integer grpId = e.getKey(); - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(grpId, events().discoveryCache()); + GridDhtPartitionTopology top = grp != null + ? grp.topology() + : cctx.exchange().clientTopology(grpId, events().discoveryCache()); - CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, - top.partitions()); + CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions()); - if (cntrs != null) - top.collectUpdateCounters(cntrs); - } + if (cntrs != null) + top.collectUpdateCounters(cntrs); + } - Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); - if (affReq != null) { - joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, - resTopVer, - affReq, - joinedNodeAff); + if (affReq != null) + CacheGroupAffinityMessage.createAffinityMessages( + cctx, + resTopVer, + affReq, + joinedNodeAff + ); + + return null; } - } + ); timeBag.finishGlobalStage("Collect update counters and create affinity messages"); @@ -3708,14 +3739,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + if (affReq != null) { - Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages( + Map<Integer, CacheGroupAffinityMessage> cachesAff = U.newHashMap(affReq.size()); + + CacheGroupAffinityMessage.createAffinityMessages( cctx, finishState.resTopVer, affReq, - null); + cachesAff); - fullMsg.joinedNodeAffinity(aff); + fullMsg.joinedNodeAffinity(cachesAff); } if (!fullMsg.exchangeId().equals(msg.exchangeId())) { @@ -4572,21 +4606,38 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = newCrdFut.messages(); if (!F.isEmpty(msgs)) { - Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(); - for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - this.msgs.put(e.getKey().id(), e.getValue()); + // Reserve at least 2 threads for system operations. + int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); - GridDhtPartitionsSingleMessage msg = e.getValue(); - - Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + try { + U.doInParallel( + parallelismLvl, + cctx.kernalContext().getSystemExecutorService(), + msgs.entrySet(), + entry -> { + this.msgs.put(entry.getKey().id(), entry.getValue()); + + GridDhtPartitionsSingleMessage msg = entry.getValue(); + + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + + if (!F.isEmpty(affReq)) { + CacheGroupAffinityMessage.createAffinityMessages( + cctx, + fullMsg.resultTopologyVersion(), + affReq, + joinedNodeAff + ); + } - if (!F.isEmpty(affReq)) { - joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, - fullMsg.resultTopologyVersion(), - affReq, - joinedNodeAff); - } + return null; + } + ); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index b0decae..ba11df7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -505,108 +505,103 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ctx.database().checkpointReadLock(); try { - synchronized (ctx.exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); - - U.writeLock(lock); + U.writeLock(lock); - try { - if (stopping) - return; + try { + if (stopping) + return; - assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer + - ", exchId=" + exchFut.exchangeId() + ']'; + assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer + + ", exchId=" + exchFut.exchangeId() + ']'; - ExchangeDiscoveryEvents evts = exchFut.context().events(); + ExchangeDiscoveryEvents evts = exchFut.context().events(); - if (affReady) { - assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" + - "grp=" + grp.cacheOrGroupName() + - ", affVer=" + grp.affinity().lastVersion() + - ", evtsVer=" + evts.topologyVersion() + ']'; + if (affReady) { + assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" + + "grp=" + grp.cacheOrGroupName() + + ", affVer=" + grp.affinity().lastVersion() + + ", evtsVer=" + evts.topologyVersion() + ']'; - lastTopChangeVer = readyTopVer = evts.topologyVersion(); + lastTopChangeVer = readyTopVer = evts.topologyVersion(); - discoCache = evts.discoveryCache(); - } + discoCache = evts.discoveryCache(); + } - if (log.isDebugEnabled()) { - log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() + - ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); - } + if (log.isDebugEnabled()) { + log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); + } - long updateSeq = this.updateSeq.incrementAndGet(); + long updateSeq = this.updateSeq.incrementAndGet(); - cntrMap.clear(); + cntrMap.clear(); - initializeFullMap(updateSeq); + initializeFullMap(updateSeq); - boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - if (evts.hasServerLeft()) { - List<DiscoveryEvent> evts0 = evts.events(); + if (evts.hasServerLeft()) { + List<DiscoveryEvent> evts0 = evts.events(); - for (int i = 0; i < evts0.size(); i++) { - DiscoveryEvent evt = evts0.get(i); + for (int i = 0; i < evts0.size(); i++) { + DiscoveryEvent evt = evts0.get(i); - if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) - removeNode(evt.eventNode().id()); - } + if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) + removeNode(evt.eventNode().id()); } + } - if (grp.affinityNode()) { - if (grpStarted || - exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || - exchFut.serverNodeDiscoveryEvent()) { - - AffinityTopologyVersion affVer; - List<List<ClusterNode>> affAssignment; + if (grp.affinityNode()) { + if (grpStarted || + exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || + exchFut.serverNodeDiscoveryEvent()) { - if (affReady) { - affVer = evts.topologyVersion(); + AffinityTopologyVersion affVer; + List<List<ClusterNode>> affAssignment; - assert grp.affinity().lastVersion().equals(affVer) : - "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.cacheOrGroupName() + - ", affVer=" + affVer + - ", fut=" + exchFut + ']'; + if (affReady) { + affVer = evts.topologyVersion(); - affAssignment = grp.affinity().readyAssignments(affVer); - } - else { - assert !exchFut.context().mergeExchanges(); + assert grp.affinity().lastVersion().equals(affVer) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + + ", affVer=" + affVer + + ", fut=" + exchFut + ']'; - affVer = exchFut.initialVersion(); - affAssignment = grp.affinity().idealAssignment(); - } + affAssignment = grp.affinity().readyAssignments(affVer); + } + else { + assert !exchFut.context().mergeExchanges(); - initPartitions(affVer, affAssignment, exchFut, updateSeq); + affVer = exchFut.initialVersion(); + affAssignment = grp.affinity().idealAssignment(); } - } - consistencyCheck(); + initPartitions(affVer, affAssignment, exchFut, updateSeq); + } + } - if (updateMoving) { - assert grp.affinity().lastVersion().equals(evts.topologyVersion()); + consistencyCheck(); - createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion())); - } + if (updateMoving) { + assert grp.affinity().lastVersion().equals(evts.topologyVersion()); - if (log.isDebugEnabled()) { - log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " + - "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); - } + createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion())); + } - if (log.isTraceEnabled()) { - log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName() - + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); - } + if (log.isDebugEnabled()) { + log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " + + "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); } - finally { - lock.writeLock().unlock(); + + if (log.isTraceEnabled()) { + log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName() + + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']'); } } + finally { + lock.writeLock().unlock(); + } } finally { ctx.database().checkpointReadUnlock(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java index f8ce7b4..d01d822 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java @@ -38,7 +38,11 @@ import org.junit.Test; */ public class CacheParallelStartTest extends GridCommonAbstractTest { /** */ - private static final int CACHES_COUNT = 500; + private static final int CACHES_COUNT = 5000; + + /** */ + private static final int GROUPS_COUNT = 50; + /** */ private static final String STATIC_CACHE_PREFIX = "static-cache-"; @@ -54,11 +58,15 @@ public class CacheParallelStartTest extends GridCommonAbstractTest { cfg.setSystemThreadPoolSize(Runtime.getRuntime().availableProcessors() * 3); - long sz = 100 * 1024 * 1024; + long sz = 512 * 1024 * 1024; DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024) .setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setPersistenceEnabled(false).setInitialSize(sz).setMaxSize(sz)) + new DataRegionConfiguration() + .setPersistenceEnabled(false) + .setInitialSize(sz) + .setMaxSize(sz) + ) .setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000); cfg.setDataStorageConfiguration(memCfg); @@ -66,7 +74,7 @@ public class CacheParallelStartTest extends GridCommonAbstractTest { ArrayList<Object> staticCaches = new ArrayList<>(CACHES_COUNT); for (int i = 0; i < CACHES_COUNT; i++) - staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX + i)); + staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX, i)); cfg.setCacheConfiguration(staticCaches.toArray(new CacheConfiguration[CACHES_COUNT])); @@ -77,12 +85,12 @@ public class CacheParallelStartTest extends GridCommonAbstractTest { * @param cacheName Cache name. * @return Cache configuration. */ - private CacheConfiguration cacheConfiguration(String cacheName) { + private CacheConfiguration cacheConfiguration(String cacheName, int i) { CacheConfiguration cfg = defaultCacheConfiguration(); - cfg.setName(cacheName); + cfg.setName(cacheName + i); cfg.setBackups(1); - cfg.setGroupName(STATIC_CACHE_CACHE_GROUP_NAME); + cfg.setGroupName(STATIC_CACHE_CACHE_GROUP_NAME + i % GROUPS_COUNT); cfg.setIndexedTypes(Long.class, Long.class); return cfg; @@ -175,20 +183,22 @@ public class CacheParallelStartTest extends GridCommonAbstractTest { * */ private void assertCaches(IgniteEx igniteEx) { - Collection<GridCacheContext> caches = igniteEx - .context() - .cache() - .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME)) - .caches(); - - assertEquals(CACHES_COUNT, caches.size()); - - @Nullable CacheGroupContext cacheGroup = igniteEx - .context() - .cache() - .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME)); - - for (GridCacheContext cacheContext : caches) - assertEquals(cacheContext.group(), cacheGroup); + for (int i = 0; i < GROUPS_COUNT; i++) { + Collection<GridCacheContext> caches = igniteEx + .context() + .cache() + .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME + i)) + .caches(); + + assertEquals(CACHES_COUNT / GROUPS_COUNT, caches.size()); + + @Nullable CacheGroupContext cacheGrp = igniteEx + .context() + .cache() + .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME + i)); + + for (GridCacheContext cacheContext : caches) + assertEquals(cacheContext.group(), cacheGrp); + } } }