Repository: ignite Updated Branches: refs/heads/master 3f1e5d1f5 -> 675d697ec
IGNITE-8390 Correct assertion for historical rebalance - Fixes #3917. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/675d697e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/675d697e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/675d697e Branch: refs/heads/master Commit: 675d697ecaa63e8fe53bc0ccf1c9a76c920a2ca1 Parents: 3f1e5d1 Author: Pavel Kovalenko <[email protected]> Authored: Thu Apr 26 16:06:52 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Apr 26 16:06:52 2018 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionSupplier.java | 2 +- .../GridDhtPartitionSupplyMessage.java | 5 +- .../db/wal/IgniteWalRebalanceTest.java | 63 ++++++++++++++++++-- 3 files changed, 62 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/675d697e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index a3ee305..84e6828 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -378,7 +378,7 @@ class GridDhtPartitionSupplier { info.cacheId(row.cacheId()); if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext()); + s.addEntry0(part, iter.historical(part), info, grp.shared(), grp.cacheObjectContext()); else { if (log.isDebugEnabled()) log.debug("Rebalance predicate evaluated to false (will not send " + http://git-wip-us.apache.org/repos/asf/ignite/blob/675d697e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 4ae5acd..77baa38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -209,15 +209,16 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple /** * @param p Partition. + * @param historical {@code True} if partition rebalancing using WAL history. * @param info Entry to add. * @param ctx Cache shared context. * @param cacheObjCtx Cache object context. * @throws IgniteCheckedException If failed. */ - void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { + void addEntry0(int p, boolean historical, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { assert info != null; assert info.key() != null : info; - assert info.value() != null : info; + assert info.value() != null || historical : info; // Need to call this method to initialize info properly. marshalInfo(info, ctx, cacheObjCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/675d697e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java index 6387dac..23dda26 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; -import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -48,6 +49,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setConsistentId(gridName); + CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); @@ -87,9 +90,11 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { } /** + * Test simple WAL historical rebalance. + * * @throws Exception if failed. */ - public void test() throws Exception { + public void testSimple() throws Exception { IgniteEx ig0 = startGrid(0); IgniteEx ig1 = startGrid(1); final int entryCnt = 10_000; @@ -112,12 +117,60 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { ig1 = startGrid(1); - IgniteCache<Object, Object> cache1 = ig1.cache(CACHE_NAME); + awaitPartitionMapExchange(); + + for (Ignite ig : G.allGrids()) { + IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME); + + for (int k = 0; k < entryCnt; k++) + assertEquals(new IndexedObject(k + 1), cache1.get(k)); + } + } + + /** + * Test that cache entry removes are rebalanced properly using WAL. + * + * @throws Exception If failed. + */ + public void testRebalanceRemoves() throws Exception { + IgniteEx ig0 = startGrid(0); + IgniteEx ig1 = startGrid(1); + final int entryCnt = 10_000; + + ig0.cluster().active(true); - cache1.rebalance().get(2, TimeUnit.MINUTES); + IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME); for (int k = 0; k < entryCnt; k++) - assertEquals(new IndexedObject(k + 1), cache.get(k)); + cache.put(k, new IndexedObject(k)); + + forceCheckpoint(); + + stopGrid(1, false); + + for (int k = 0; k < entryCnt; k++) { + if (k % 3 != 2) + cache.put(k, new IndexedObject(k + 1)); + else // Spread removes across all partitions. + cache.remove(k); + } + + forceCheckpoint(); + + ig1 = startGrid(1); + + awaitPartitionMapExchange(); + + for (Ignite ig : G.allGrids()) { + IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME); + + for (int k = 0; k < entryCnt; k++) { + if (k % 3 != 2) + assertEquals(new IndexedObject(k + 1), cache1.get(k)); + else + assertNull(cache1.get(k)); + } + } } /**
