IGNITE-9479 Fixed spontaneous rebalance during cache start and improved logging level - Fixes #4691.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5432c00 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5432c00 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5432c00 Branch: refs/heads/ignite-5960 Commit: d5432c005c378edaaa1f9a21a42241ecea15023e Parents: 445d375 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Thu Sep 6 16:30:00 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu Sep 6 16:30:00 2018 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtForceKeysFuture.java | 36 ++-- .../dht/preloader/GridDhtPartitionDemander.java | 22 +-- .../dht/preloader/GridDhtPartitionSupplier.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 10 +- .../dht/preloader/GridDhtPreloader.java | 43 ++--- .../dht/IgniteCacheStartWithLoadTest.java | 165 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite7.java | 2 + 7 files changed, 213 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 52691f0..3b03958 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -274,8 +274,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec assert !n.id().equals(loc.id()); - if (log.isDebugEnabled()) - log.debug("Sending force key request [cacheName=" + cctx.name() + "node=" + n.id() + + if (log.isTraceEnabled()) + log.trace("Sending force key request [cacheName=" + cctx.name() + "node=" + n.id() + ", req=" + req + ']'); cctx.io().send(n, req, cctx.ioPolicy()); @@ -310,10 +310,10 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec try { if (e != null && !e.isNewLocked()) { - if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { int part = cctx.affinity().partition(key); - log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() + + log.trace("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); } @@ -322,8 +322,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec } } catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Received removed DHT entry for force keys request [entry=" + e + + if (log.isTraceEnabled()) + log.trace("Received removed DHT entry for force keys request [entry=" + e + ", locId=" + cctx.nodeId() + ']'); } @@ -333,8 +333,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc))); if (owners.isEmpty() || (owners.contains(loc) && cctx.rebalanceEnabled())) { - if (log.isDebugEnabled()) - log.debug("Will not rebalance key (local node is owner) [key=" + key + ", part=" + part + + if (log.isTraceEnabled()) + log.trace("Will not rebalance key (local node is owner) [key=" + key + ", part=" + part + "topVer=" + topVer + ", locId=" + cctx.nodeId() + ']'); // Key is already rebalanced. @@ -344,8 +344,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec // Create partition. GridDhtLocalPartition locPart = top.localPartition(part, topVer, false); - if (log.isDebugEnabled()) - log.debug("Mapping local partition [loc=" + cctx.localNodeId() + ", topVer" + topVer + + if (log.isTraceEnabled()) + log.trace("Mapping local partition [loc=" + cctx.localNodeId() + ", topVer" + topVer + ", part=" + locPart + ", owners=" + owners + ", allOwners=" + U.toShortString(top.owners(part)) + ']'); if (locPart == null) @@ -362,8 +362,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec pick = F.first(F.view(owners, F.remoteNodes(loc.id()))); if (pick == null) { - if (log.isDebugEnabled()) - log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" + + if (log.isTraceEnabled()) + log.trace("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); return mappings; @@ -378,15 +378,15 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec mappedKeys.add(key); - if (log.isDebugEnabled()) - log.debug("Will rebalance key from node [cacheName=" + cctx.name() + ", key=" + key + ", part=" + + if (log.isTraceEnabled()) + log.trace("Will rebalance key from node [cacheName=" + cctx.name() + ", key=" + key + ", part=" + part + ", node=" + pick.id() + ", locId=" + cctx.nodeId() + ']'); } else if (locPart.state() != OWNING) invalidParts.add(part); else { - if (log.isDebugEnabled()) - log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() + + if (log.isTraceEnabled()) + log.trace("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']'); } @@ -563,8 +563,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec return; } catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Trying to rebalance removed entry (will ignore) [cacheName=" + + if (log.isTraceEnabled()) + log.trace("Trying to rebalance removed entry (will ignore) [cacheName=" + cctx.name() + ", entry=" + entry + ']'); } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a0f82ea..851fcc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -761,9 +761,9 @@ public class GridDhtPartitionDemander { GridCacheEntryInfo entry = infos.next(); if (!preloadEntry(node, p, entry, topVer)) { - if (log.isDebugEnabled()) - log.debug("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + if (log.isTraceEnabled()) + log.trace("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); break; } @@ -877,8 +877,8 @@ public class GridDhtPartitionDemander { cached = cctx.cache().entryEx(entry.key()); - if (log.isDebugEnabled()) - log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + from.id() + ']'); + if (log.isTraceEnabled()) + log.trace("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + from.id() + ']'); if (preloadPred == null || preloadPred.apply(entry)) { if (cached.initialValue( @@ -905,17 +905,17 @@ public class GridDhtPartitionDemander { else { cached.touch(topVer); // Start tracking. - if (log.isDebugEnabled()) - log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + + if (log.isTraceEnabled()) + log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } } - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); + else if (log.isTraceEnabled()) + log.trace("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" + + if (log.isTraceEnabled()) + log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } catch (GridDhtInvalidPartitionException ignored) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 7ce4f7e..524d02d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -402,8 +402,8 @@ class GridDhtPartitionSupplier { if (preloadPred == null || preloadPred.apply(info)) s.addEntry0(part, iter.historical(part), info, grp.shared(), grp.cacheObjectContext()); else { - if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not send " + + if (log.isTraceEnabled()) + log.trace("Rebalance predicate evaluated to false (will not send " + "cache entry): " + info); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 357f3d7..e66511c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1225,8 +1225,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean distributed = true; - // Do not perform distributed partition release in case of cluster activation or caches start. - if (activateCluster() || hasCachesToStart()) + // Do not perform distributed partition release in case of cluster activation. + if (activateCluster()) distributed = false; // On first phase we wait for finishing all local tx updates, atomic updates and lock releases on all nodes. @@ -2920,11 +2920,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - // Don't validate partitions state in case of caches start. - boolean skipValidation = hasCachesToStart(); - - if (!skipValidation) - validatePartitionsState(); + validatePartitionsState(); if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { assert firstDiscoEvt instanceof DiscoveryCustomEvent; http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 7cf55a3..f886767 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -313,7 +313,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partCnt); } else { - Collection<ClusterNode> picked = pickOwners(p, topVer); + List<ClusterNode> picked = remoteOwners(p, topVer); if (picked.isEmpty()) { top.own(part); @@ -330,7 +330,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { log.debug("Owning partition as there are no other owners: " + part); } else { - ClusterNode n = F.rand(picked); + ClusterNode n = picked.get(0); GridDhtPartitionDemandMessage msg = assignments.get(n); @@ -359,42 +359,23 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** - * Picks owners for specified partition {@code p} from affinity. + * Returns remote owners (excluding local node) for specified partition {@code p}. * * @param p Partition. * @param topVer Topology version. - * @return Picked owners. + * @return Nodes owning this partition. */ - private Collection<ClusterNode> pickOwners(int p, AffinityTopologyVersion topVer) { - Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p); - - int affCnt = affNodes.size(); - - Collection<ClusterNode> rmts = remoteOwners(p, topVer); + private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { + List<ClusterNode> owners = grp.topology().owners(p, topVer); - int rmtCnt = rmts.size(); + List<ClusterNode> res = new ArrayList<>(owners.size()); - if (rmtCnt <= affCnt) - return rmts; - - List<ClusterNode> sorted = new ArrayList<>(rmts); - - // Sort in descending order, so nodes with higher order will be first. - Collections.sort(sorted, CU.nodeComparator(false)); - - // Pick newest nodes. - return sorted.subList(0, affCnt); - } + for (ClusterNode owner : owners) { + if (!owner.id().equals(ctx.localNodeId())) + res.add(owner); + } - /** - * Returns remote owners (excluding local node) for specified partition {@code p}. - * - * @param p Partition. - * @param topVer Topology version. - * @return Nodes owning this partition. - */ - private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { - return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId())); + return res; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheStartWithLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheStartWithLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheStartWithLoadTest.java new file mode 100644 index 0000000..acccc5b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheStartWithLoadTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.Assert; + +/** + * + */ +public class IgniteCacheStartWithLoadTest extends GridCommonAbstractTest { + /** */ + static final String CACHE_NAME = "tx_repl"; + + @Override + protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + CacheConfiguration ccfg = new CacheConfiguration().setName(CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setCacheMode(CacheMode.REPLICATED) + .setDataRegionName("ds") + .setAffinity(new RendezvousAffinityFunction(false, 32)); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setDataRegionConfigurations( + new DataRegionConfiguration() + .setName("ds") + .setPersistenceEnabled(true) + .setMaxSize(1024 * 1024 * 1024) + ); + + cfg.setDataStorageConfiguration(dsCfg); + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception if failed. + */ + public void testNoRebalanceDuringCacheStart() throws Exception { + IgniteEx crd = (IgniteEx)startGrids(4); + + crd.cluster().active(true); + + AtomicBoolean txLoadStop = new AtomicBoolean(); + + AtomicInteger txLoaderNo = new AtomicInteger(0); + + IgniteInternalFuture txLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> { + Ignite node = grid(txLoaderNo.getAndIncrement()); + IgniteCache<Object, Object> cache = node.cache(CACHE_NAME); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final int keys = 5; + final int keysSpace = 10_000; + + while (!txLoadStop.get()) { + try (Transaction tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + for (int it = 0; it < keys; it++) { + int key = rnd.nextInt(keysSpace); + byte[] value = new byte[2048]; + + cache.put(key, value); + } + tx.commit(); + + U.sleep(10); + } + catch (Throwable t) { + log.warning("Unexpected exception during tx load.", t); + } + } + }, 4, "tx-loader"); + + AtomicBoolean hasRebalance = new AtomicBoolean(); + + AtomicBoolean cacheRestartStop = new AtomicBoolean(); + + IgniteInternalFuture cacheRestartFuture = GridTestUtils.runAsync(() -> { + Ignite node = grid(0); + + final String tmpCacheName = "tmp"; + + while (!cacheRestartStop.get()) { + try { + node.getOrCreateCache(tmpCacheName); + + boolean hasMoving = false; + + for (int i = 0; i < 4; i++) { + hasMoving |= grid(i).cachex(CACHE_NAME).context().topology().hasMovingPartitions(); + } + + if (hasMoving) { + log.error("Cache restarter has been stopped because rebalance is triggered for stable caches."); + + hasRebalance.set(true); + + return; + } + + node.destroyCache(tmpCacheName); + + U.sleep(10_000); + } + catch (Throwable t) { + log.warning("Unexpected exception during caches restart.", t); + } + } + }); + + U.sleep(60_000); + + cacheRestartStop.set(true); + txLoadStop.set(true); + + cacheRestartFuture.get(); + txLoadFuture.get(); + + Assert.assertFalse(hasRebalance.get()); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d5432c00/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index 3a35950..0a2f86e 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest; import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest; import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest; import org.apache.ignite.internal.processors.cache.eviction.paged.PageEvictionMultinodeMixedRegionsTest; @@ -65,6 +66,7 @@ public class IgniteCacheTestSuite7 extends TestSuite { TestSuite suite = new TestSuite("IgniteCache With Persistence Test Suite"); suite.addTestSuite(CheckpointBufferDeadlockTest.class); + suite.addTestSuite(IgniteCacheStartWithLoadTest.class); suite.addTestSuite(AuthenticationConfigurationClusterTest.class); suite.addTestSuite(AuthenticationProcessorSelfTest.class);