IGNITE-10242 Pause ongoing rebalance on cache group stopping Signed-off-by: Pavel Kovalenko <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/025a0360 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/025a0360 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/025a0360 Branch: refs/heads/ignite-10044 Commit: 025a0360c86a5ceb8c3eef2f514c29846081d4c2 Parents: 9199aac Author: Ivan Daschinskiy <[email protected]> Authored: Thu Dec 6 18:51:24 2018 +0300 Committer: Pavel Kovalenko <[email protected]> Committed: Thu Dec 6 18:51:24 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 4 +- .../processors/cache/GridCachePreloader.java | 10 + .../cache/GridCachePreloaderAdapter.java | 10 + .../processors/cache/GridCacheProcessor.java | 106 ++++--- .../dht/preloader/GridDhtPartitionDemander.java | 3 + .../GridDhtPartitionSupplyMessage.java | 3 + .../GridDhtPartitionsExchangeFuture.java | 8 +- .../preloader/GridDhtPartitionsFullMessage.java | 5 +- .../dht/preloader/GridDhtPreloader.java | 51 +++- .../ignite/internal/util/IgniteUtils.java | 66 +++++ ...balanceOnCachesStoppingOrDestroyingTest.java | 279 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite4.java | 3 + 12 files changed, 498 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6dad367..0a0e709 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -437,9 +437,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; } + else + U.error(log, "Unsupported message type: " + m.getClass().getName()); } - U.error(log, "Unsupported message type: " + m.getClass().getName()); + U.warn(log, "Cache group with id=" + m.groupId() + " is stopped or absent"); } finally { leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index d629e94..6ac26c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -192,4 +192,14 @@ public interface GridCachePreloader { * Dumps debug information. */ public void dumpDebugInfo(); + + /** + * Pause preloader. + */ + public void pause(); + + /** + * Resume preloader. + */ + public void resume(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index c5e4a81..f16305c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -181,4 +181,14 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { @Override public void dumpDebugInfo() { // No-op. } + + /** {@inheritDoc} */ + @Override public void pause() { + // No-op + } + + /** {@inheritDoc} */ + @Override public void resume() { + // No-op + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b49c697..375dd12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.IgniteTransactionsEx; import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; @@ -2113,10 +2114,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { else { Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>(); - int parallelismLvl = sharedCtx.kernalContext().config().getSystemThreadPoolSize(); - // Reserve at least 2 threads for system operations. - parallelismLvl = Math.max(1, parallelismLvl - 2); + int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2); doInParallel( parallelismLvl, @@ -2908,7 +2907,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param exchActions Change requests. */ private void processCacheStopRequestOnExchangeDone(ExchangeActions exchActions) { - // Force checkpoint if there is any cache stop request + // Reserve at least 2 threads for system operations. + int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2); + if (!exchActions.cacheStopRequests().isEmpty()) { try { sharedCtx.database().waitForCheckpoint("caches stop"); @@ -2918,63 +2919,88 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) { - CacheGroupContext gctx = cacheGrps.get(action.descriptor().groupId()); + List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop = exchActions.cacheGroupsToStop().stream() + .filter(a -> cacheGrps.containsKey(a.descriptor().groupId())) + .map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy())) + .collect(Collectors.toList()); - // Cancel all operations blocking gateway - if (gctx != null) { - final String msg = "Failed to wait for topology update, cache group is stopping."; + Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream() + .collect(Collectors.groupingBy(action -> action.descriptor().groupId())); - // If snapshot operation in progress we must throw CacheStoppedException - // for correct cache proxy restart. For more details see - // IgniteCacheProxy.cacheException() - gctx.affinity().cancelFutures(new CacheStoppedException(msg)); - } + try { + doInParallel( + parallelismLvl, + sharedCtx.kernalContext().getSystemExecutorService(), + cachesToStop.entrySet(), + cachesToStopByGrp -> { + CacheGroupContext gctx = cacheGrps.get(cachesToStopByGrp.getKey()); - stopGateway(action.request()); + if (gctx != null) + gctx.preloader().pause(); - sharedCtx.database().checkpointReadLock(); + try { - try { - prepareCacheStop(action.request().cacheName(), action.request().destroy()); - } - finally { - sharedCtx.database().checkpointReadUnlock(); - } + if (gctx != null) { + final String msg = "Failed to wait for topology update, cache group is stopping."; + + // If snapshot operation in progress we must throw CacheStoppedException + // for correct cache proxy restart. For more details see + // IgniteCacheProxy.cacheException() + gctx.affinity().cancelFutures(new CacheStoppedException(msg)); + } + + for (ExchangeActions.CacheActionData action: cachesToStopByGrp.getValue()) { + stopGateway(action.request()); + + sharedCtx.database().checkpointReadLock(); + + try { + prepareCacheStop(action.request().cacheName(), action.request().destroy()); + } + finally { + sharedCtx.database().checkpointReadUnlock(); + } + } + } + finally { + if (gctx != null) + gctx.preloader().resume(); + } + + return null; + } + ); + } + catch (IgniteCheckedException e) { + String msg = "Failed to stop caches"; + + log.error(msg, e); + + throw new IgniteException(msg, e); } sharedCtx.database().checkpointReadLock(); try { // Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption. - for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) { - Integer groupId = action.descriptor().groupId(); - CacheGroupContext grp = cacheGrps.get(groupId); + grpToStop.forEach(grp -> { + CacheGroupContext gctx = grp.getKey(); - if (grp != null && grp.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) { + if (gctx != null && gctx.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) { GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager)sharedCtx.database(); - mngr.removeCheckpointListener((DbCheckpointListener)grp.offheap()); + mngr.removeCheckpointListener((DbCheckpointListener)gctx.offheap()); } - } + }); } finally { sharedCtx.database().checkpointReadUnlock(); } - List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>(); - - for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) { - Integer groupId = action.descriptor().groupId(); - - if (cacheGrps.containsKey(groupId)) { - stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy())); - - stopCacheGroup(groupId); - } - } + for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpToStop) + stopCacheGroup(grp.get1().groupId()); if (!sharedCtx.kernalContext().clientNode()) - sharedCtx.database().onCacheGroupsStopped(stoppedGroups); + sharedCtx.database().onCacheGroupsStopped(grpToStop); if (exchActions.deactivate()) sharedCtx.deactivate(); http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a75fae7..ddbb3b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -906,6 +906,9 @@ public class GridDhtPartitionDemander { try { GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext(); + if (cctx == null) + return true; + if (cctx.isNear()) cctx = cctx.dhtCache().context(); http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 3034fb9..7e281e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -249,6 +249,9 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + if (grp == null) + return; + for (CacheEntryInfoCollection col : infos().values()) { List<GridCacheEntryInfo> entries = col.infos(); http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ffc55a9..89e03a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3533,10 +3533,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * failed to send update counter deltas to backup. */ private void finalizePartitionCounters() { - int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize(); - // Reserve at least 2 threads for system operations. - parallelismLvl = Math.max(1, parallelismLvl - 2); + int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); long time = System.currentTimeMillis(); @@ -3965,10 +3963,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte long time = System.currentTimeMillis(); - int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize(); - // Reserve at least 2 threads for system operations. - parallelismLvl = Math.max(1, parallelismLvl - 2); + int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); try { doInParallel( http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index fbaa241..a2cecb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -415,7 +416,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (marshal) { // Reserve at least 2 threads for system operations. - int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2); + int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); Collection<Object> objectsToMarshall = new ArrayList<>(); @@ -509,7 +510,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa Collection<byte[]> objectsToUnmarshall = new ArrayList<>(); // Reserve at least 2 threads for system operations. - int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2); + int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); if (partsBytes != null && parts == null) objectsToUnmarshall.add(partsBytes); http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index c8705d0..e92a240 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -20,13 +20,16 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -42,7 +45,9 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -80,6 +85,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); /** */ + private boolean paused; + + /** */ + private Queue<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> pausedDemanderQueue = new ConcurrentLinkedQueue<>(); + + /** */ private boolean stopped; /** @@ -357,7 +368,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { demandLock.readLock().lock(); try { - demander.handleSupplyMessage(idx, id, s); + if (paused) + pausedDemanderQueue.add(F.t(idx, id, s)); + else + demander.handleSupplyMessage(idx, id, s); } finally { demandLock.readLock().unlock(); @@ -562,6 +576,41 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ + @Override public void pause() { + demandLock.writeLock().lock(); + + try { + paused = true; + } + finally { + demandLock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void resume() { + demandLock.writeLock().lock(); + + try { + final List<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> msgToProc = + new ArrayList<>(pausedDemanderQueue); + + pausedDemanderQueue.clear(); + + final GridDhtPreloader preloader = this; + + ctx.kernalContext().closure().runLocalSafe(() -> msgToProc.forEach( + m -> preloader.handleSupplyMessage(m.get1(), m.get2(), m.get3()) + ), GridIoPolicy.SYSTEM_POOL); + + paused = false; + } + finally { + demandLock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public void dumpDebugInfo() { // No-op } http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 946378d..6da5c6e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.mxbean.IgniteStandardMXBean; import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker; @@ -10604,6 +10605,71 @@ public abstract class IgniteUtils { } /** + * @param ctx Kernal context. + * @param plc IO Policy. + * @param reserved Thread to reserve. + * @return Number of available threads in executor service for {@code plc}. If {@code plc} + * is invalid, return {@code 1}. + */ + public static int availableThreadCount(GridKernalContext ctx, byte plc, int reserved) { + IgniteConfiguration cfg = ctx.config(); + + int parallelismLvl; + + switch (plc) { + case GridIoPolicy.P2P_POOL: + parallelismLvl = cfg.getPeerClassLoadingThreadPoolSize(); + + break; + + case GridIoPolicy.SYSTEM_POOL: + parallelismLvl = cfg.getSystemThreadPoolSize(); + + break; + + case GridIoPolicy.PUBLIC_POOL: + parallelismLvl = cfg.getPublicThreadPoolSize(); + + break; + + case GridIoPolicy.MANAGEMENT_POOL: + parallelismLvl = cfg.getManagementThreadPoolSize(); + + break; + + case GridIoPolicy.UTILITY_CACHE_POOL: + parallelismLvl = cfg.getUtilityCacheThreadPoolSize(); + + break; + + case GridIoPolicy.IGFS_POOL: + parallelismLvl = cfg.getIgfsThreadPoolSize(); + + break; + + case GridIoPolicy.SERVICE_POOL: + parallelismLvl = cfg.getServiceThreadPoolSize(); + + break; + + case GridIoPolicy.DATA_STREAMER_POOL: + parallelismLvl = cfg.getDataStreamerThreadPoolSize(); + + break; + + case GridIoPolicy.QUERY_POOL: + parallelismLvl = cfg.getQueryThreadPoolSize(); + + break; + + default: + parallelismLvl = -1; + } + + return Math.max(1, parallelismLvl - reserved); + } + + /** * Execute operation on data in parallel. * * @param executorSvc Service for parallel execution. http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java new file mode 100644 index 0000000..97f8d45 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_1 = "cache_1"; + + /** */ + private static final String CACHE_2 = "cache_2"; + + /** */ + private static final String CACHE_3 = "cache_3"; + + /** */ + private static final String CACHE_4 = "cache_4"; + + /** */ + private static final String GROUP_1 = "group_1"; + + /** */ + private static final String GROUP_2 = "group_2"; + + /** */ + private static final int REBALANCE_BATCH_SIZE = 50 * 1024; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setCommunicationSpi(new RebalanceBlockingSPI()); + + cfg.setFailureHandler(new StopNodeFailureHandler()); + + cfg.setRebalanceThreadPoolSize(4); + + cfg.setTransactionConfiguration(new TransactionConfiguration() + .setDefaultTxTimeout(1000)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(100L * 1024 * 1024))); + + return cfg; + } + + /** + * + */ + public void testStopCachesOnDeactivation() throws Exception { + performTest(ig -> { + ig.cluster().active(false); + + // Add to escape possible long waiting in awaitPartitionMapExchange due to {@link CacheAffinityChangeMessage}. + ig.cluster().active(true); + + return null; + }); + } + + /** + * + */ + public void testDestroySpecificCachesInDifferentCacheGroups() throws Exception { + performTest(ig -> { + ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3)); + + return null; + }); + } + + /** + * + */ + public void testDestroySpecificCacheAndCacheGroup() throws Exception { + performTest(ig -> { + ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4)); + + return null; + }); + } + + /** + * @param testAction Action that trigger stop or destroy of caches. + */ + private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throws Exception { + IgniteEx ig0 = (IgniteEx)startGrids(2); + + ig0.cluster().active(true); + + stopGrid(1); + + loadData(ig0); + + startGrid(1); + + runLoad(ig0); + + testAction.accept(ig0); + + U.sleep(1000); + + awaitPartitionMapExchange(true, true, null, true); + + assertNull(grid(1).context().failure().failureContext()); + } + + /** + * @param ig Ig. + */ + private void loadData(Ignite ig) { + List<CacheConfiguration> configs = Stream.of( + F.t(CACHE_1, GROUP_1), + F.t(CACHE_2, GROUP_1), + F.t(CACHE_3, GROUP_2), + F.t(CACHE_4, GROUP_2) + ).map(names -> new CacheConfiguration<>(names.get1()) + .setGroupName(names.get2()) + .setRebalanceBatchSize(REBALANCE_BATCH_SIZE) + .setCacheMode(CacheMode.REPLICATED) + ).collect(Collectors.toList()); + + ig.getOrCreateCaches(configs); + + configs.forEach(cfg -> { + try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) { + for (int i = 0; i < 3_000; i++) + streamer.addData(i, new byte[1024]); + + streamer.flush(); + } + }); + } + + /** + * @param ig Ignite instance. + */ + private void runLoad(Ignite ig) throws Exception{ + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4); + + IgniteCache cache = ig.cache(cacheName); + + for (int i = 0; i < 3_000; i++) { + int idx = ThreadLocalRandom.current().nextInt(3_000); + + cache.put(idx, new byte[1024]); + } + } + }, 4, "load-thread"); + } + + /** + * + */ + private static class RebalanceBlockingSPI extends TcpCommunicationSpi { + /** */ + public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + slowDownMessage(msg); + + super.sendMessage(node, msg); + + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + slowDownMessage(msg); + + super.sendMessage(node, msg, ackC); + } + + /** + * @param msg Message. + */ + private void slowDownMessage(Message msg) { + if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) { + int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId(); + + if (grpId == CU.cacheId(GROUP_1) || grpId == CU.cacheId(GROUP_2)) { + try { + U.sleep(50); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/025a0360/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java index 11f0219..d01f1ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.cache.ResetLostPartitionTest; import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse; import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest; @@ -52,6 +53,8 @@ public class IgnitePdsTestSuite4 extends TestSuite { suite.addTestSuite(ResetLostPartitionTest.class); + suite.addTestSuite(IgniteRebalanceOnCachesStoppingOrDestroyingTest.class); + suite.addTestSuite(CachePageWriteLockUnlockTest.class); suite.addTestSuite(IgnitePdsCacheWalDisabledOnRebalancingTest.class);
