Repository: ignite Updated Branches: refs/heads/master 03cac751c -> aec3f91c2
IGNITE-9756 Fixed partition eviction deduplication logic - Fixes #4936. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aec3f91c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aec3f91c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aec3f91c Branch: refs/heads/master Commit: aec3f91c22c26d3ceb56769d6d7a13a92c9036d4 Parents: 03cac75 Author: pereslegin-pa <[email protected]> Authored: Wed Oct 17 19:58:29 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Oct 17 19:58:29 2018 +0300 ---------------------------------------------------------------------- .../dht/topology/PartitionsEvictManager.java | 49 ++++++++------------ 1 file changed, 20 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aec3f91c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java index 7f2a2a7..404e194 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -31,7 +32,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -111,28 +111,28 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * @param part Partition to evict. */ public void evictPartitionAsync(CacheGroupContext grp, GridDhtLocalPartition part) { - // Check node stop. - if (sharedEvictionContext.shouldStop()) - return; - GroupEvictionContext groupEvictionContext = evictionGroupsMap.computeIfAbsent( grp.groupId(), (k) -> new GroupEvictionContext(grp)); - PartitionEvictionTask evictionTask = groupEvictionContext.createEvictPartitionTask(part); - - if (evictionTask == null) + // Check node stop. + if (groupEvictionContext.shouldStop()) return; - if (log.isDebugEnabled()) - log.debug("Partition has been scheduled for eviction [grp=" + grp.cacheOrGroupName() - + ", p=" + part.id() + ", state=" + part.state() + "]"); - int bucket; synchronized (mux) { - bucket = evictionQueue.offer(evictionTask); + if (!groupEvictionContext.partIds.add(part.id())) + return; + + bucket = evictionQueue.offer(new PartitionEvictionTask(part, groupEvictionContext)); } + groupEvictionContext.totalTasks.incrementAndGet(); + + if (log.isDebugEnabled()) + log.debug("Partition has been scheduled for eviction [grp=" + grp.cacheOrGroupName() + + ", p=" + part.id() + ", state=" + part.state() + "]"); + scheduleNextPartitionEviction(bucket); } @@ -271,7 +271,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { private final CacheGroupContext grp; /** Deduplicate set partition ids. */ - private final Set<Integer> partIds = new GridConcurrentHashSet<>(); + private final Set<Integer> partIds = new HashSet<>(); /** Future for currently running partition eviction task. */ private final Map<Integer, IgniteInternalFuture<?>> partsEvictFutures = new ConcurrentHashMap<>(); @@ -299,19 +299,6 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { /** * - * @param part Grid local partition. - */ - private PartitionEvictionTask createEvictPartitionTask(GridDhtLocalPartition part){ - if (shouldStop() || !partIds.add(part.id())) - return null; - - totalTasks.incrementAndGet(); - - return new PartitionEvictionTask(part, this); - } - - /** - * * @param task Partition eviction task. */ private synchronized void taskScheduled(PartitionEvictionTask task) { @@ -324,6 +311,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { int partId = task.part.id(); + partIds.remove(partId); + partsEvictFutures.put(partId, fut); fut.listen(f -> { @@ -429,12 +418,14 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) part.destroy(); } - else // Re-offer partition if clear was unsuccessful due to partition reservation. - evictionQueue.offer(this); // Complete eviction future before schedule new to prevent deadlock with // simultaneous eviction stopping and scheduling new eviction. finishFut.onDone(); + + // Re-offer partition if clear was unsuccessful due to partition reservation. + if (!success) + evictPartitionAsync(groupEvictionCtx.grp, part); } catch (Throwable ex) { finishFut.onDone(ex);
