Repository: ignite Updated Branches: refs/heads/master af03d8413 -> 2d63040fd
IGNITE-9244 Partition eviction should not take all threads in system pool Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d63040f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d63040f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d63040f Branch: refs/heads/master Commit: 2d63040fdf77bbc4515790d796fa86a8f4f6a66c Parents: af03d84 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Mon Aug 13 11:59:27 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Aug 13 11:59:27 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 5 + .../processors/cache/CacheGroupContext.java | 17 +- .../processors/cache/GridCacheProcessor.java | 3 + .../cache/GridCacheSharedContext.java | 48 +- .../distributed/dht/GridDhtLocalPartition.java | 3 +- .../dht/GridDhtPartitionsEvictor.java | 310 ---------- .../distributed/dht/PartitionsEvictManager.java | 566 +++++++++++++++++++ .../wal/reader/IgniteWalIteratorFactory.java | 2 +- .../wal/IgniteWalIteratorSwitchSegmentTest.java | 2 + .../pagemem/BPlusTreePageMemoryImplTest.java | 1 + .../BPlusTreeReuseListPageMemoryImplTest.java | 1 + .../pagemem/IndexStoragePageMemoryImplTest.java | 1 + .../pagemem/PageMemoryImplNoLoadTest.java | 1 + .../persistence/pagemem/PageMemoryImplTest.java | 1 + .../loadtests/hashmap/GridCacheTestContext.java | 2 + 15 files changed, 630 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 3b4aedb..21f6c5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -942,6 +942,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_THROTTLE_LOG_THRESHOLD = "IGNITE_THROTTLE_LOG_THRESHOLD"; /** + * Number of concurrent operation for evict partitions. + */ + public static final String IGNITE_EVICTION_PERMITS = "IGNITE_EVICTION_PERMITS"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 7009575..c8fe283 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -41,7 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; @@ -126,10 +126,6 @@ public class CacheGroupContext { /** */ private GridCachePreloader preldr; - - /** Partition evictor. */ - private GridDhtPartitionsEvictor evictor; - /** */ private final DataRegion dataRegion; @@ -260,13 +256,6 @@ public class CacheGroupContext { } /** - * @return Partitions evictor. - */ - public GridDhtPartitionsEvictor evictor() { - return evictor; - } - - /** * @return IO policy for the given cache group. */ public byte ioPolicy() { @@ -733,7 +722,7 @@ public class CacheGroupContext { IgniteCheckedException err = new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); - evictor.stop(); + ctx.evict().onCacheGroupStopped(this); aff.cancelFutures(err); @@ -909,8 +898,6 @@ public class CacheGroupContext { else preldr = new GridCachePreloaderAdapter(this); - evictor = new GridDhtPartitionsEvictor(this); - if (persistenceEnabled()) { try { offheapMgr = new GridCacheOffheapManager(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 09e2860..ed97982 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask; @@ -2463,6 +2464,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheIoManager ioMgr = new GridCacheIoManager(); CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager(); GridCacheSharedTtlCleanupManager ttl = new GridCacheSharedTtlCleanupManager(); + PartitionsEvictManager evict = new PartitionsEvictManager(); CacheJtaManagerAdapter jta = JTA.createOptional(); @@ -2481,6 +2483,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { topMgr, ioMgr, ttl, + evict, jta, storeSesLsnrs ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 381d5bf..4b98060 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; @@ -126,6 +127,9 @@ public class GridCacheSharedContext<K, V> { /** Ttl cleanup manager. */ private GridCacheSharedTtlCleanupManager ttlMgr; + /** */ + private PartitionsEvictManager evictMgr; + /** Cache contexts map. */ private ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap; @@ -199,13 +203,30 @@ public class GridCacheSharedContext<K, V> { CacheAffinitySharedManager<K, V> affMgr, GridCacheIoManager ioMgr, GridCacheSharedTtlCleanupManager ttlMgr, + PartitionsEvictManager evictMgr, CacheJtaManagerAdapter jtaMgr, Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; - setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, pageStoreMgr, walMgr, walStateMgr, dbMgr, snpMgr, depMgr, - exchMgr, affMgr, ioMgr, ttlMgr); + setManagers( + mgrs, + txMgr, + jtaMgr, + verMgr, + mvccMgr, + pageStoreMgr, + walMgr, + walStateMgr, + dbMgr, + snpMgr, + depMgr, + exchMgr, + affMgr, + ioMgr, + ttlMgr, + evictMgr + ); this.storeSesLsnrs = storeSesLsnrs; @@ -349,7 +370,9 @@ public class GridCacheSharedContext<K, V> { void onReconnected(boolean active) throws IgniteCheckedException { List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); - setManagers(mgrs, txMgr, + setManagers( + mgrs, + txMgr, jtaMgr, verMgr, mvccMgr, @@ -362,7 +385,9 @@ public class GridCacheSharedContext<K, V> { new GridCachePartitionExchangeManager<K, V>(), affMgr, ioMgr, - ttlMgr); + ttlMgr, + evictMgr + ); this.mgrs = mgrs; @@ -404,7 +429,8 @@ public class GridCacheSharedContext<K, V> { * @param ttlMgr Ttl cleanup manager. */ @SuppressWarnings("unchecked") - private void setManagers(List<GridCacheSharedManager<K, V>> mgrs, + private void setManagers( + List<GridCacheSharedManager<K, V>> mgrs, IgniteTxManager txMgr, CacheJtaManagerAdapter jtaMgr, GridCacheVersionManager verMgr, @@ -418,7 +444,9 @@ public class GridCacheSharedContext<K, V> { GridCachePartitionExchangeManager<K, V> exchMgr, CacheAffinitySharedManager affMgr, GridCacheIoManager ioMgr, - GridCacheSharedTtlCleanupManager ttlMgr) { + GridCacheSharedTtlCleanupManager ttlMgr, + PartitionsEvictManager evictMgr + ) { this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); this.txMgr = add(mgrs, txMgr); @@ -433,6 +461,7 @@ public class GridCacheSharedContext<K, V> { this.affMgr = add(mgrs, affMgr); this.ioMgr = add(mgrs, ioMgr); this.ttlMgr = add(mgrs, ttlMgr); + this.evictMgr = add(mgrs, evictMgr); } /** @@ -765,6 +794,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Partition evict manager. + */ + public PartitionsEvictManager evict() { + return evictMgr; + } + + /** * @return Node ID. */ public UUID localNodeId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index e13c952..e1e9f8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -684,7 +684,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } } - grp.evictor().evictPartitionAsync(this); + ctx.evict().evictPartitionAsync(grp,this); } /** @@ -696,6 +696,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements return; clear = true; + clearAsync0(false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java deleted file mode 100644 index 7206397..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Function; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Class that serves asynchronous part eviction process. - * Only one partition from group can be evicted at the moment. - */ -public class GridDhtPartitionsEvictor { - /** Default eviction progress show frequency. */ - private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; // 2 Minutes. - - /** Eviction progress frequency property name. */ - private static final String SHOW_EVICTION_PROGRESS_FREQ = "SHOW_EVICTION_PROGRESS_FREQ"; - - /** */ - private final GridCacheSharedContext<?, ?> ctx; - - /** */ - private final CacheGroupContext grp; - - /** */ - private final IgniteLogger log; - - /** Lock object. */ - private final Object mux = new Object(); - - /** Queue contains partitions scheduled for eviction. */ - private final DeduplicationQueue<Integer, GridDhtLocalPartition> evictionQueue = new DeduplicationQueue<>(GridDhtLocalPartition::id); - - /** - * Flag indicates that eviction process is running at the moment. - * This is needed to schedule partition eviction if there are no currently running self-scheduling eviction tasks. - * Guarded by {@link #mux}. - */ - private boolean evictionRunning; - - /** Flag indicates that eviction process has stopped. */ - private volatile boolean stop; - - /** Future for currently running partition eviction task. */ - private volatile GridFutureAdapter<Boolean> evictionFut; - - /** Eviction progress frequency in ms. */ - private final long evictionProgressFreqMs = IgniteSystemProperties.getLong(SHOW_EVICTION_PROGRESS_FREQ, - DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS); - - /** Next time of show eviction progress. */ - private long nextShowProgressTime; - - /** - * Constructor. - * - * @param grp Cache group context. - */ - public GridDhtPartitionsEvictor(CacheGroupContext grp) { - assert grp != null; - - this.grp = grp; - this.ctx = grp.shared(); - - this.log = ctx.logger(getClass()); - } - - /** - * Adds partition to eviction queue and starts eviction process. - * - * @param part Partition to evict. - */ - public void evictPartitionAsync(GridDhtLocalPartition part) { - if (stop) - return; - - boolean added = evictionQueue.offer(part); - - if (!added) - return; - - synchronized (mux) { - if (!evictionRunning) { - nextShowProgressTime = U.currentTimeMillis() + evictionProgressFreqMs; - - scheduleNextPartitionEviction(); - } - } - } - - /** - * Stops eviction process. - * Method awaits last offered partition eviction. - */ - public void stop() { - stop = true; - - synchronized (mux) { - // Wait for last offered partition eviction completion. - IgniteInternalFuture<Boolean> evictionFut0 = evictionFut; - - if (evictionFut0 != null) { - try { - evictionFut0.get(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.warning("Failed to await partition eviction during stopping", e); - } - } - } - } - - /** - * Gets next partition from the queue and schedules it for eviction. - */ - private void scheduleNextPartitionEviction() { - if (stop) - return; - - synchronized (mux) { - GridDhtLocalPartition next = evictionQueue.poll(); - - if (next != null) { - showProgress(); - - evictionFut = new GridFutureAdapter<>(); - - ctx.kernalContext().closure().callLocalSafe(new PartitionEvictionTask(next, () -> stop), true); - } - else - evictionRunning = false; - } - } - - /** - * Shows progress of eviction. - */ - private void showProgress() { - if (U.currentTimeMillis() >= nextShowProgressTime) { - int size = evictionQueue.size() + 1; // Queue size plus current partition. - - if (log.isInfoEnabled()) - log.info("Eviction in progress [grp=" + grp.cacheOrGroupName() - + ", remainingPartsCnt=" + size + "]"); - - nextShowProgressTime = U.currentTimeMillis() + evictionProgressFreqMs; - } - } - - /** - * Task for self-scheduled partition eviction / clearing. - */ - private class PartitionEvictionTask implements Callable<Boolean> { - /** Partition to evict. */ - private final GridDhtLocalPartition part; - - /** Eviction context. */ - private final EvictionContext evictionCtx; - - /** - * @param part Partition. - * @param evictionCtx Eviction context. - */ - public PartitionEvictionTask(GridDhtLocalPartition part, EvictionContext evictionCtx) { - this.part = part; - this.evictionCtx = evictionCtx; - } - - /** {@inheritDoc} */ - @Override public Boolean call() throws Exception { - if (stop) { - evictionFut.onDone(); - - return false; - } - - try { - boolean success = part.tryClear(evictionCtx); - - if (success) { - if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) - part.destroy(); - } - else // Re-offer partition if clear was unsuccessful due to partition reservation. - evictionQueue.offer(part); - - // Complete eviction future before schedule new to prevent deadlock with - // simultaneous eviction stopping and scheduling new eviction. - evictionFut.onDone(); - - scheduleNextPartitionEviction(); - - return true; - } - catch (Throwable ex) { - evictionFut.onDone(ex); - - if (ctx.kernalContext().isStopping()) { - LT.warn(log, ex, "Partition eviction failed (current node is stopping).", - false, - true); - } - else - LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); - } - - return false; - } - } - - /** - * Thread-safe blocking queue with items deduplication. - * - * @param <K> Key type of item used for deduplication. - * @param <V> Queue item type. - */ - private static class DeduplicationQueue<K, V> { - /** Queue. */ - private final Queue<V> queue; - - /** Unique items set. */ - private final Set<K> uniqueItems; - - /** Key mapping function. */ - private final Function<V, K> keyMappingFunction; - - /** - * Constructor. - * - * @param keyExtractor Function to extract a key from a queue item. - * This key is used for deduplication if some item has offered twice. - */ - public DeduplicationQueue(Function<V, K> keyExtractor) { - keyMappingFunction = keyExtractor; - queue = new LinkedBlockingQueue<>(); - uniqueItems = new GridConcurrentHashSet<>(); - } - - /** - * Offers item to the queue. - * - * @param item Item. - * @return {@code true} if item has been successfully offered to the queue, - * {@code false} if item was rejected because already exists in the queue. - */ - public boolean offer(V item) { - K key = keyMappingFunction.apply(item); - - if (uniqueItems.add(key)) { - queue.offer(item); - - return true; - } - - return false; - } - - /** - * Polls next item from queue. - * - * @return Next item or {@code null} if queue is empty. - */ - public V poll() { - V item = queue.poll(); - - if (item != null) { - K key = keyMappingFunction.apply(item); - - uniqueItems.remove(key); - } - - return item; - } - - /** - * @return Size of queue. - */ - public int size() { - return queue.size(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java new file mode 100644 index 0000000..f76310d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVICTION_PERMITS; +import static org.apache.ignite.IgniteSystemProperties.getInteger; +import static org.apache.ignite.IgniteSystemProperties.getLong; + +/** + * Class that serves asynchronous part eviction process. + * Multiple partition from group can be evicted at the same time. + */ +public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { + + /** Default eviction progress show frequency. */ + private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; // 2 Minutes. + + /** Eviction progress frequency property name. */ + private static final String SHOW_EVICTION_PROGRESS_FREQ = "SHOW_EVICTION_PROGRESS_FREQ"; + + /** Eviction thread pool policy. */ + private static final byte EVICT_POOL_PLC = GridIoPolicy.SYSTEM_POOL; + + /** Eviction progress frequency in ms. */ + private final long evictionProgressFreqMs = getLong(SHOW_EVICTION_PROGRESS_FREQ, DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS); + + /** */ + private final int confPermits = getInteger(IGNITE_EVICTION_PERMITS, -1); + + /** Next time of show eviction progress. */ + private long nextShowProgressTime; + + private final Map<Integer, GroupEvictionContext> evictionGroupsMap = new ConcurrentHashMap<>(); + + /** Flag indicates that eviction process has stopped. */ + private volatile boolean stop; + + /** Check stop eviction context. */ + private final EvictionContext sharedEvictionContext = () -> stop; + + /** Number of maximum concurrent operations. */ + private volatile int threads; + + /** How many eviction task may execute concurrent. */ + private volatile int permits; + + /** Bucket queue for load balance partitions to the threads via count of partition size. + * Is not thread-safe. + * All method should be called under mux synchronization. + */ + private volatile BucketQueue evictionQueue; + + /** Lock object. */ + private final Object mux = new Object(); + + /** + * Stops eviction process for group. + * + * Method awaits last offered partition eviction. + * + * @param grp Group context. + */ + public void onCacheGroupStopped(CacheGroupContext grp){ + GroupEvictionContext groupEvictionContext = evictionGroupsMap.remove(grp.groupId()); + + if (groupEvictionContext != null){ + groupEvictionContext.stop(); + + groupEvictionContext.awaitFinishAll(); + } + } + + /** + * Adds partition to eviction queue and starts eviction process if permit available. + * + * @param grp Group context. + * @param part Partition to evict. + */ + public void evictPartitionAsync(CacheGroupContext grp, GridDhtLocalPartition part) { + // Check node stop. + if (sharedEvictionContext.shouldStop()) + return; + + GroupEvictionContext groupEvictionContext = evictionGroupsMap.computeIfAbsent( + grp.groupId(), (k) -> new GroupEvictionContext(grp)); + + PartitionEvictionTask evictionTask = groupEvictionContext.createEvictPartitionTask(part); + + if (evictionTask == null) + return; + + int bucket; + + synchronized (mux) { + bucket = evictionQueue.offer(evictionTask); + } + + scheduleNextPartitionEviction(bucket); + } + + /** + * Gets next partition from the queue and schedules it for eviction. + * + * @param bucket Bucket. + */ + private void scheduleNextPartitionEviction(int bucket) { + // Check node stop. + if (sharedEvictionContext.shouldStop()) + return; + + synchronized (mux) { + // Check that we have permits for next operation. + if (permits > 0) { + // If queue is empty not need to do. + if (evictionQueue.isEmpty()) + return; + + // Get task until we have permits. + while (permits >= 0) { + // Get task from bucket. + PartitionEvictionTask evictionTask = evictionQueue.poll(bucket); + + // If bucket empty try get from another. + if (evictionTask == null) { + // Until queue have tasks. + while (!evictionQueue.isEmpty()) { + // Get task from any other bucket. + evictionTask = evictionQueue.pollAny(); + + // Stop iteration if we found task. + if (evictionTask != null) + break; + } + + // If task not found no need to do some. + if (evictionTask == null) + return; + } + + // Print current eviction progress. + showProgress(); + + GroupEvictionContext groupEvictionContext = evictionTask.groupEvictionContext; + + // Check that group or node stopping. + if (groupEvictionContext.shouldStop()) + continue; + + // Get permit for this task. + permits--; + + // Register task future, may need if group or node will be stopped. + groupEvictionContext.taskScheduled(evictionTask); + + evictionTask.finishFut.listen(f -> { + synchronized (mux) { + // Return permit after task completed. + permits++; + } + + // Re-schedule new one task form same bucket. + scheduleNextPartitionEviction(bucket); + }); + + // Submit task to executor. + cctx.kernalContext() + .closure() + .runLocalSafe(evictionTask, EVICT_POOL_PLC); + } + } + } + } + + /** + * Shows progress of eviction. + */ + private void showProgress() { + if (U.currentTimeMillis() >= nextShowProgressTime) { + int size = evictionQueue.size() + 1; // Queue size plus current partition. + + if (log.isInfoEnabled()) + log.info("Eviction in progress [permits=" + permits+ + ", threads=" + threads + + ", groups=" + evictionGroupsMap.keySet().size() + + ", remainingPartsToEvict=" + size + "]"); + + evictionGroupsMap.values().forEach(GroupEvictionContext::showProgress); + + nextShowProgressTime = U.currentTimeMillis() + evictionProgressFreqMs; + } + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + super.start0(); + + // If property is not setup, calculate permits as parts of sys pool. + if (confPermits == -1) { + int sysPoolSize = cctx.kernalContext().config().getSystemThreadPoolSize(); + + threads = permits = sysPoolSize / 4; + } + else + threads = permits = confPermits; + + // Avoid 0 permits if sys pool size less that 4. + if (threads == 0) + threads = permits = 1; + + log.info("Evict partition permits=" + permits); + + evictionQueue = new BucketQueue(threads); + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + super.stop0(cancel); + + stop = true; + + Collection<GroupEvictionContext> evictionGrps = evictionGroupsMap.values(); + + evictionGrps.forEach(GroupEvictionContext::stop); + + evictionGrps.forEach(GroupEvictionContext::awaitFinishAll); + } + + /** + * + */ + private class GroupEvictionContext implements EvictionContext { + /** */ + private final CacheGroupContext grp; + + /** Deduplicate set partition ids. */ + private final Set<Integer> partIds = new GridConcurrentHashSet<>(); + + /** Future for currently running partition eviction task. */ + private final Map<Integer, IgniteInternalFuture<?>> partsEvictFutures = new ConcurrentHashMap<>(); + + /** Flag indicates that eviction process has stopped for this group. */ + private volatile boolean stop; + + /** Total partition to evict. */ + private AtomicInteger totalTasks = new AtomicInteger(); + + /** Total partition evict in progress. */ + private int taskInProgress; + + /** + * @param grp Group context. + */ + private GroupEvictionContext(CacheGroupContext grp) { + this.grp = grp; + } + + /** {@inheritDoc} */ + @Override public boolean shouldStop() { + return stop || sharedEvictionContext.shouldStop(); + } + + /** + * + * @param part Grid local partition. + */ + private PartitionEvictionTask createEvictPartitionTask(GridDhtLocalPartition part){ + if (shouldStop() || !partIds.add(part.id())) + return null; + + totalTasks.incrementAndGet(); + + return new PartitionEvictionTask(part, this); + } + + /** + * + * @param task Partition eviction task. + */ + private synchronized void taskScheduled(PartitionEvictionTask task) { + if (shouldStop()) + return; + + taskInProgress++; + + GridFutureAdapter<?> fut = task.finishFut; + + int partId = task.part.id(); + + partsEvictFutures.put(partId, fut); + + fut.listen(f -> { + synchronized (this) { + taskInProgress--; + + partsEvictFutures.remove(partId, f); + + if (totalTasks.decrementAndGet() == 0) + evictionGroupsMap.remove(grp.groupId()); + } + }); + } + + /** + * Stop eviction for group. + */ + private void stop() { + stop = true; + } + + /** + * Await evict finish. + */ + private void awaitFinishAll(){ + partsEvictFutures.forEach(this::awaitFinish); + + evictionGroupsMap.remove(grp.groupId()); + } + + /** + * Await evict finish partition. + */ + private void awaitFinish(Integer part, IgniteInternalFuture<?> fut) { + // Wait for last offered partition eviction completion + try { + log.info("Await partition evict, grpName=" + grp.cacheOrGroupName() + + ", grpId=" + grp.groupId() + ", partId=" + part); + + fut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.warning("Failed to await partition eviction during stopping.", e); + } + } + + /** + * Shows progress group of eviction. + */ + private void showProgress() { + if (log.isInfoEnabled()) + log.info("Group eviction in progress [grpName=" + grp.cacheOrGroupName()+ + ", grpId=" + grp.groupId() + + ", remainingPartsToEvict=" + (totalTasks.get() - taskInProgress) + + ", partsEvictInProgress=" + taskInProgress + + ", totalParts= " + grp.topology().localPartitions().size() + "]"); + } + } + + /** + * Task for self-scheduled partition eviction / clearing. + */ + private class PartitionEvictionTask implements Runnable { + /** Partition to evict. */ + private final GridDhtLocalPartition part; + + private final long size; + + /** Eviction context. */ + private final GroupEvictionContext groupEvictionContext; + + /** */ + private final GridFutureAdapter<?> finishFut = new GridFutureAdapter<>(); + + /** + * @param part Partition. + */ + private PartitionEvictionTask( + GridDhtLocalPartition part, + GroupEvictionContext groupEvictionContext + ) { + this.part = part; + this.groupEvictionContext = groupEvictionContext; + + size = part.fullSize(); + } + + /** {@inheritDoc} */ + @Override public void run() { + if (groupEvictionContext.shouldStop()) + return; + + try { + boolean success = part.tryClear(groupEvictionContext); + + if (success) { + if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) + part.destroy(); + } + else // Re-offer partition if clear was unsuccessful due to partition reservation. + evictionQueue.offer(this); + + // Complete eviction future before schedule new to prevent deadlock with + // simultaneous eviction stopping and scheduling new eviction. + finishFut.onDone(); + } + catch (Throwable ex) { + finishFut.onDone(ex); + + if (cctx.kernalContext().isStopping()) { + LT.warn(log, ex, "Partition eviction failed (current node is stopping).", + false, + true); + } + else{ + LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); + } + } + } + } + + /** + * + */ + private class BucketQueue { + /** Queues contains partitions scheduled for eviction. */ + private final Queue<PartitionEvictionTask>[] buckets; + + /** */ + private final long[] bucketSizes; + + /** + * @param buckets Number of buckets. + */ + BucketQueue(int buckets) { + this.buckets = new Queue[buckets]; + + for (int i = 0; i < buckets; i++) + this.buckets[i] = createEvictPartitionQueue(); + + bucketSizes = new long[buckets]; + } + + /** + * Poll eviction task from queue for specific bucket. + * + * @param bucket Bucket index. + * @return Partition evict task, or {@code null} if bucket queue is empty. + */ + PartitionEvictionTask poll(int bucket) { + PartitionEvictionTask task = buckets[bucket].poll(); + + if (task != null) + bucketSizes[bucket] -= task.size; + + return task; + } + + /** + * Poll eviction task from queue (bucket is not specific). + * + * @return Partition evict task. + */ + PartitionEvictionTask pollAny() { + for (int bucket = 0; bucket < bucketSizes.length; bucket++){ + if (!buckets[bucket].isEmpty()) + return poll(bucket); + } + + return null; + } + + /** + * Offer task to queue. + * + * @return Bucket index. + */ + int offer(PartitionEvictionTask task) { + int bucket = calculateBucket(); + + buckets[bucket].offer(task); + + bucketSizes[bucket] += task.size; + + return bucket; + } + + + /** + * @return {@code True} if queue is empty, {@code} False if not empty. + */ + boolean isEmpty(){ + return size() == 0; + } + + /** + * @return Queue size. + */ + int size(){ + int size = 0; + + for (Queue<PartitionEvictionTask> queue : buckets) { + size += queue.size(); + } + + return size; + } + + /*** + * @return Bucket index. + */ + private int calculateBucket() { + int min = 0; + + for (int bucket = min; bucket < bucketSizes.length; bucket++) { + if (bucketSizes[min] > bucketSizes[bucket]) + min = bucket; + } + + return min; + } + + /** + * 0 - PRIORITY QUEUE (compare by partition size). + * default (any other values) - FIFO. + */ + private static final byte QUEUE_TYPE = 1; + + /** + * + * @return Queue for evict partitions. + */ + private Queue<PartitionEvictionTask> createEvictPartitionQueue() { + switch (QUEUE_TYPE) { + case 1: + return new PriorityBlockingQueue<>( + 1000, Comparator.comparingLong(p -> p.part.fullSize())); + default: + return new LinkedBlockingQueue<>(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index eeb52b3..aae9775 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -322,7 +322,7 @@ public class IgniteWalIteratorFactory { kernalCtx, null, null, null, null, null, null, dbMgr, null, null, null, null, null, - null, null, null + null, null,null, null ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index 00ed6f1..9dbef5d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -142,6 +142,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { null, null, null, + null, null) ).createSerializer(serVer); @@ -370,6 +371,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { new GridCacheIoManager(), null, null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 3737204..7719b43 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -60,6 +60,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { null, null, null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index c36ecce..71eb129 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -62,6 +62,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest null, null, null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java index 9087b1c..43fbb6e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java @@ -75,6 +75,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { null, null, null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 34fd93b..52aff0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -65,6 +65,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { null, null, null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 3697c4c..000131a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -297,6 +297,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { null, null, null, + null, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2d63040f/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 3933953..64d29e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedTtlCleanupMana import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.WalStateManager; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager; import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; @@ -78,6 +79,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new CacheAffinitySharedManager<K, V>(), new GridCacheIoManager(), new GridCacheSharedTtlCleanupManager(), + new PartitionsEvictManager(), new CacheNoopJtaManager(), null ),