This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new e17887b IGNITE-12654 Some of rentingFutures in GridDhtPartitionTopologyImpl may accumulate a huge number of eviction callbacks - Fixes #7399. e17887b is described below commit e17887bfbff7ddf8d58b9c376acca0382a184c15 Author: Slava Koptilin <slava.kopti...@gmail.com> AuthorDate: Wed Feb 12 20:43:26 2020 +0300 IGNITE-12654 Some of rentingFutures in GridDhtPartitionTopologyImpl may accumulate a huge number of eviction callbacks - Fixes #7399. Signed-off-by: Ivan Rakov <ira...@apache.org> --- .../dht/topology/GridDhtLocalPartition.java | 16 ++- .../dht/topology/GridDhtPartitionTopologyImpl.java | 28 +++-- ...eScheduleResendPartitionsAfterEvictionTest.java | 135 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite7.java | 2 + 4 files changed, 171 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index ff0b0f2..13b7811 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -653,12 +653,26 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return Future to signal that this node is no longer an owner or backup. */ public IgniteInternalFuture<?> rent(boolean updateSeq) { + return rent(updateSeq, true); + } + + /** + * Initiates partition eviction process. + * + * If partition has reservations, eviction will be delayed and continued after all reservations will be released. + * + * @param updateSeq If {@code true} topology update sequence will be updated after eviction is finished. + * @param alwaysReturnRentingFut If {@code true} renting future is returned in any way. + * @return Future to signal that this node is no longer an owner or backup or null if corresponding partition + * state is {@code RENTING} or {@code EVICTED}. + */ + public IgniteInternalFuture<?> rent(boolean updateSeq, boolean alwaysReturnRentingFut) { long state0 = this.state.get(); GridDhtPartitionState partState = getPartState(state0); if (partState == RENTING || partState == EVICTED) - return rent; + return alwaysReturnRentingFut ? rent : null; delayedRentingTopVer = ctx.exchange().readyAffinityVersion().topologyVersion(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index 4156878..d1eae4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -2436,6 +2437,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param aff Affinity assignments. * @return {@code True} if there are local partitions need to be evicted. */ + @SuppressWarnings("unchecked") private boolean checkEvictions(long updateSeq, AffinityAssignment aff) { if (!ctx.kernalContext().state().evictionsAllowed()) return false; @@ -2465,9 +2467,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (nodeIds.containsAll(F.nodeIds(affNodes))) { GridDhtPartitionState state0 = part.state(); - IgniteInternalFuture<?> rentFut = part.rent(false); + // There is no need to track a renting future of a partition which is already renting/evicted. + IgniteInternalFuture<?> rentFut = part.rent(false, false); - rentingFutures.add(rentFut); + if (rentFut != null) + rentingFutures.add(rentFut); updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); @@ -2496,9 +2500,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locId.equals(n.id())) { GridDhtPartitionState state0 = part.state(); - IgniteInternalFuture<?> rentFut = part.rent(false); + // There is no need to track a renting future of a partition + // which is already renting/evicted. + IgniteInternalFuture<?> rentFut = part.rent(false, false); - rentingFutures.add(rentFut); + if (rentFut != null) + rentingFutures.add(rentFut); updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); @@ -2522,15 +2529,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!rentingFutures.isEmpty()) { final AtomicInteger rentingPartitions = new AtomicInteger(rentingFutures.size()); - for (IgniteInternalFuture<?> rentingFuture : rentingFutures) { - rentingFuture.listen(f -> { + IgniteInClosure c = new IgniteInClosure() { + @Override public void apply(Object o) { int remaining = rentingPartitions.decrementAndGet(); if (remaining == 0) { lock.writeLock().lock(); try { - this.updateSeq.incrementAndGet(); + GridDhtPartitionTopologyImpl.this.updateSeq.incrementAndGet(); if (log.isDebugEnabled()) log.debug("Partitions have been scheduled to resend [reason=" + @@ -2542,8 +2549,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().unlock(); } } - }); - } + } + }; + + for (IgniteInternalFuture<?> rentingFuture : rentingFutures) + rentingFuture.listen(c); } return hasEvictedPartitions; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheScheduleResendPartitionsAfterEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheScheduleResendPartitionsAfterEvictionTest.java new file mode 100644 index 0000000..009ab31 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheScheduleResendPartitionsAfterEvictionTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManagerAbstractTest; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; + +/** + * + */ +public class GridCacheScheduleResendPartitionsAfterEvictionTest extends PartitionsEvictManagerAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(false))); + + CacheConfiguration cc = new CacheConfiguration(DEFAULT_CACHE_NAME).setBackups(1); + + cfg.setCacheConfiguration(cc); + + return cfg; + } + + /** + * Check that listeners that schedule resend partitions after eviction doesn't added to renting future + * uncontrollably. At most one listener is expected. + * + * @throws Exception If failed. + */ + @Test + @SuppressWarnings("unchecked") + public void testRentingFuturesListenersNotGrowingUncontrollably() throws Exception { + IgniteEx node1 = startGrid(0); + + startGrid(1); + + node1.cluster().baselineAutoAdjustEnabled(false); + + CountDownLatch latch = new CountDownLatch(1); + + subscribeEvictionQueueAtLatch(node1, latch, false); + + node1.cluster().state(ACTIVE); + + node1.getOrCreateCache(DEFAULT_CACHE_NAME); + try (IgniteDataStreamer streamer = node1.dataStreamer(DEFAULT_CACHE_NAME)) { + streamer.allowOverwrite(true); + + for (int i = 0; i < 100_000; i++) + streamer.addData(i, i); + } + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + IgniteInternalFuture res = GridTestUtils.runMultiThreadedAsync(() -> { + try { + startGrid(nodeIdx.incrementAndGet()); + resetBaselineTopology(); + } + catch (Exception e) { + fail("Failed to start rebalance."); + } + }, + 3, + "rebalanceThread"); + + // Give some time for rebalance to start. + Thread.sleep(3000); + + latch.countDown(); + + // And some extra time for collecting callbacks. + Thread.sleep(100); + + CacheGroupContext grpCtx = node1.context().cache().cacheGroup(CU.cacheId(DEFAULT_CACHE_NAME)); + + assertNotNull(grpCtx); + + GridDhtPartitionTopologyImpl top = (GridDhtPartitionTopologyImpl)grpCtx.topology(); + + List<GridDhtLocalPartition> locParts = top.localPartitions(); + + for (GridDhtLocalPartition localPartition : locParts) { + GridFutureAdapter partRentFut = IgniteUtils.field(localPartition, "rent"); + + int lsnrCnt = 0; + for (Object waitNode = IgniteUtils.field(partRentFut, "state"); + waitNode != null; waitNode = IgniteUtils.field(waitNode, "next")) { + if (IgniteUtils.field(waitNode, "val") != null) + lsnrCnt++; + } + + // At most one listener is expected. + assertTrue(lsnrCnt <= 1); + } + + res.get(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index 0bfbc4b..1cdd5fc 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartition import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionLostWhileClearingTest; import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheScheduleResendPartitionsAfterEvictionTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest; @@ -97,6 +98,7 @@ public class IgniteCacheTestSuite7 { GridTestUtils.addTestIfNeeded(suite, Cache64kPartitionsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingPartitionCountersTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingWithAsyncClearingTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, GridCacheScheduleResendPartitionsAfterEvictionTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheAssignmentNodeRestartsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, TxRollbackAsyncWithPersistenceTest.class, ignoredTests);