IGNITE-5701 - Some nodes have partitionUpdateCounter equal to 0 after rebalancing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/993f7fbe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/993f7fbe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/993f7fbe Branch: refs/heads/master Commit: 993f7fbe1d49a524e2dee626aef72e16fd5d3cda Parents: 517a23d Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Fri Jul 7 18:55:27 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Jul 7 18:55:41 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 2 +- .../distributed/dht/GridDhtCacheEntry.java | 6 -- .../distributed/dht/GridDhtLocalPartition.java | 45 ------------ .../dht/GridDhtPartitionTopologyImpl.java | 16 ++++- .../dht/preloader/GridDhtPartitionDemander.java | 5 +- .../dht/preloader/GridDhtPartitionSupplier.java | 2 +- .../GridDhtPartitionSupplyMessage.java | 20 +++--- .../GridCacheDatabaseSharedManager.java | 2 +- .../IgnitePdsCacheRebalancingAbstractTest.java | 74 ++++++++++++++++++++ .../wal/IgniteWalHistoryReservationsTest.java | 29 ++++++-- 10 files changed, 131 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 085712a..35b0577 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -626,7 +626,7 @@ public final class IgniteSystemProperties { /** * WAL rebalance threshold. */ - public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD"; + public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD"; /** Ignite page memory concurrency level. */ public static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL"; http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 57dd622..77cc642 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -123,12 +123,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void onUpdateFinished(long cntr) { - if (cctx.shared().database().persistenceEnabled()) - locPart.onUpdateReceived(cntr); - } - - /** {@inheritDoc} */ @Override public boolean isDht() { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 8e42351..725822d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -157,13 +157,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements @GridToStringExclude private final CacheDataStore store; - /** Partition updates. */ - @GridToStringExclude - private final ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>(); - - /** Last applied update. */ - private final AtomicLong lastApplied = new AtomicLong(0); - /** Set if failed to move partition to RENTING state due to reservations, to be checked when * reservation is released. */ private volatile boolean shouldBeRenting; @@ -349,44 +342,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * @return Last applied update. - */ - public long lastAppliedUpdate() { - return lastApplied.get(); - } - - /** - * @param cntr Received counter. - */ - public void onUpdateReceived(long cntr) { - boolean changed = updates.putIfAbsent(cntr, true) == null; - - if (!changed) - return; - - while (true) { - Map.Entry<Long, Boolean> entry = updates.firstEntry(); - - if (entry == null) - return; - - long first = entry.getKey(); - - long cntr0 = lastApplied.get(); - - if (first <= cntr0) - updates.remove(first); - else if (first == cntr0 + 1) - if (lastApplied.compareAndSet(cntr0, first)) - updates.remove(first); - else - break; - else - break; - } - } - - /** * @return If partition is moving or owning or renting. */ public boolean valid() { http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index cf0dd5f..2f54810 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -673,6 +673,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (loc == null || loc.state() == EVICTED) { locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); + T2<Long, Long> cntr = cntrMap.get(p); + + if (cntr != null) + loc.updateCounter(cntr.get2()); + if (ctx.pageStore() != null) { try { ctx.pageStore().onPartitionCreated(grp.groupId(), p); @@ -1334,11 +1339,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (cntr != null && cntr.get2() > part.updateCounter()) part.updateCounter(cntr.get2()); + else if (part.updateCounter() > 0) + this.cntrMap.put(part.id(), new T2<>(part.initialUpdateCounter(), part.updateCounter())); } } finally { lock.writeLock().unlock(); - } } @@ -1715,6 +1721,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { result.add(ctx.localNodeId()); } + U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + + "[nodeId=" + ctx.localNodeId() + "cacheOrGroupName=" + grp.cacheOrGroupName() + + ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); + } } @@ -1731,6 +1741,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { result.add(e.getKey()); } } + + U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + + "[nodeId=" + ctx.localNodeId() + "cacheOrGroupName=" + grp.cacheOrGroupName() + + ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); } if (updateSeq) http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index e7e95b2..4f34aba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -640,7 +640,7 @@ public class GridDhtPartitionDemander { assert part != null; - boolean last = supply.last().contains(p); + boolean last = supply.last().containsKey(p); if (part.state() == MOVING) { boolean reserved = part.reserve(); @@ -680,6 +680,9 @@ public class GridDhtPartitionDemander { // If message was last for this partition, // then we take ownership. if (last) { + if (supply.isClean(p)) + part.updateCounter(supply.last().get(p)); + top.own(part); fut.partitionDone(id, p); http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 1cc6c28..3ead982 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -414,7 +414,7 @@ class GridDhtPartitionSupplier { } // Mark as last supply message. - s.last(part); + s.last(part, loc.updateCounter()); phase = SupplyContextPhase.NEW; http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index ef14a90..90d11f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -56,8 +56,8 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple private AffinityTopologyVersion topVer; /** Partitions that have been fully sent. */ - @GridDirectCollection(int.class) - private Collection<Integer> last; + @GridDirectMap(keyType = int.class, valueType = long.class) + private Map<Integer, Long> last; /** Partitions which were not found. */ @GridToStringInclude @@ -128,19 +128,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple /** * @return Flag to indicate last message for partition. */ - Collection<Integer> last() { - return last == null ? Collections.<Integer>emptySet() : last; + Map<Integer, Long> last() { + return last == null ? Collections.<Integer, Long>emptyMap() : last; } /** * @param p Partition which was fully sent. */ - void last(int p) { + void last(int p, long cntr) { if (last == null) - last = new HashSet<>(); + last = new HashMap<>(); - if (last.add(p)) { - msgSize += 4; + if (last.put(p, cntr) == null) { + msgSize += 12; // If partition is empty, we need to add it. if (!infos().containsKey(p)) { @@ -304,7 +304,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple writer.incrementState(); case 7: - if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + if (!writer.writeMap("last", last, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) return false; writer.incrementState(); @@ -382,7 +382,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 7: - last = reader.readCollection("last", MessageCollectionItemType.INT); + last = reader.readMap("last", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 4af4daf..d64677e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2126,7 +2126,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CacheState state = new CacheState(locParts.size()); for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) - state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate()); + state.addPartitionState(part.id(), part.dataStore().fullSize(), part.updateCounter()); cpRec.addCacheGroupState(grp.groupId(), state); } http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index cbc2623..588b3ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.io.Serializable; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.Callable; @@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.QueryEntity; @@ -41,11 +43,13 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; @@ -489,6 +493,76 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb } /** + * @throws Exception If failed + */ + public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception { + final Ignite ig = startGrids(4); + + ig.active(true); + + int k = 0; + + try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) { + ds.allowOverwrite(true); + + for (int k0 = k; k < k0 + 10_000; k++) + ds.addData(k, k); + } + + for (int t = 0; t < 10; t++) { + IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + stopGrid(3); + + IgniteEx ig0 = startGrid(3); + + awaitPartitionMapExchange(); + + ig0.cache(cacheName).rebalance().get(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) { + ds.allowOverwrite(true); + + while (!fut.isDone()) { + ds.addData(k, k); + + k++; + + U.sleep(1); + } + } + + fut.get(); + + Map<Integer, Long> cntrs = new HashMap<>(); + + for (int g = 0; g < 4; g++) { + IgniteEx ig0 = grid(g); + + for (GridDhtLocalPartition part : ig0.cachex(cacheName).context().topology().currentLocalPartitions()) { + if (cntrs.containsKey(part.id())) + assertEquals(String.valueOf(part.id()), (long) cntrs.get(part.id()), part.updateCounter()); + else + cntrs.put(part.id(), part.updateCounter()); + } + + for (int k0 = 0; k0 < k; k0++) { + assertEquals(String.valueOf(k0), k0, ig0.cache(cacheName).get(k0)); + } + } + + assertEquals(ig.affinity(cacheName).partitions(), cntrs.size()); + } + } + + /** * */ private static class TestValue implements Serializable { http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index 48d8c21..4bea63f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.MemoryPolicyConfiguration; import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -201,7 +202,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { int entryCnt = 10_000; - Ignite ig0 = startGrids(2); + IgniteEx ig0 = (IgniteEx) startGrids(2); ig0.active(true); @@ -219,7 +220,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { forceCheckpoint(); - Ignite ig1 = startGrid(1); + IgniteEx ig1 = startGrid(1); IgniteCache<Integer, Integer> cache1 = ig1.cache("cache1"); @@ -236,6 +237,16 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { assertEquals("k=" + k, k, cache1.get(k)); } } + + cache.rebalance().get(); + + for (int p = 0; p < ig1.affinity("cache1").partitions(); p++) { + GridDhtLocalPartition p0 = ig0.context().cache().cache("cache1").context().topology().localPartition(p); + GridDhtLocalPartition p1 = ig1.context().cache().cache("cache1").context().topology().localPartition(p); + + assertTrue(p0.updateCounter() > 0); + assertEquals(p0.updateCounter(), p1.updateCounter()); + } } /** @@ -244,7 +255,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { public void testNodeIsClearedIfHistoryIsUnavailable() throws Exception { int entryCnt = 10_000; - Ignite ig0 = startGrids(2); + IgniteEx ig0 = (IgniteEx) startGrids(2); ig0.active(true); @@ -269,7 +280,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { assertEquals("k=" + k, k, cache.get(k)); } - Ignite ig1 = startGrid(1); + IgniteEx ig1 = startGrid(1); IgniteCache<Integer, Integer> cache1 = ig1.cache("cache1"); @@ -286,6 +297,16 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { assertEquals("k=" + k, k, cache1.get(k)); } } + + cache.rebalance().get(); + + for (int p = 0; p < ig1.affinity("cache1").partitions(); p++) { + GridDhtLocalPartition p0 = ig0.context().cache().cache("cache1").context().topology().localPartition(p); + GridDhtLocalPartition p1 = ig1.context().cache().cache("cache1").context().topology().localPartition(p); + + assertTrue(p0.updateCounter() > 0); + assertEquals(p0.updateCounter(), p1.updateCounter()); + } } /**