Repository: ignite
Updated Branches:
  refs/heads/master 3f1e5d1f5 -> 675d697ec


IGNITE-8390 Correct assertion for historical rebalance - Fixes #3917.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/675d697e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/675d697e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/675d697e

Branch: refs/heads/master
Commit: 675d697ecaa63e8fe53bc0ccf1c9a76c920a2ca1
Parents: 3f1e5d1
Author: Pavel Kovalenko <[email protected]>
Authored: Thu Apr 26 16:06:52 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Thu Apr 26 16:06:52 2018 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 .../GridDhtPartitionSupplyMessage.java          |  5 +-
 .../db/wal/IgniteWalRebalanceTest.java          | 63 ++++++++++++++++++--
 3 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/675d697e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index a3ee305..84e6828 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -378,7 +378,7 @@ class GridDhtPartitionSupplier {
                 info.cacheId(row.cacheId());
 
                 if (preloadPred == null || preloadPred.apply(info))
-                    s.addEntry0(part, info, grp.shared(), 
grp.cacheObjectContext());
+                    s.addEntry0(part, iter.historical(part), info, 
grp.shared(), grp.cacheObjectContext());
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Rebalance predicate evaluated to false 
(will not send " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/675d697e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 4ae5acd..77baa38 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -209,15 +209,16 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
 
     /**
      * @param p Partition.
+     * @param historical {@code True} if partition rebalancing using WAL 
history.
      * @param info Entry to add.
      * @param ctx Cache shared context.
      * @param cacheObjCtx Cache object context.
      * @throws IgniteCheckedException If failed.
      */
-    void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, 
CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
+    void addEntry0(int p, boolean historical, GridCacheEntryInfo info, 
GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws 
IgniteCheckedException {
         assert info != null;
         assert info.key() != null : info;
-        assert info.value() != null : info;
+        assert info.value() != null || historical : info;
 
         // Need to call this method to initialize info properly.
         marshalInfo(info, ctx, cacheObjCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/675d697e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 6387dac..23dda26 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -30,6 +30,7 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -48,6 +49,8 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
 
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setConsistentId(gridName);
+
         CacheConfiguration<Integer, IndexedObject> ccfg = new 
CacheConfiguration<>(CACHE_NAME);
 
         ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
@@ -87,9 +90,11 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test simple WAL historical rebalance.
+     *
      * @throws Exception if failed.
      */
-    public void test() throws Exception {
+    public void testSimple() throws Exception {
         IgniteEx ig0 = startGrid(0);
         IgniteEx ig1 = startGrid(1);
         final int entryCnt = 10_000;
@@ -112,12 +117,60 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
 
         ig1 = startGrid(1);
 
-        IgniteCache<Object, Object> cache1 = ig1.cache(CACHE_NAME);
+        awaitPartitionMapExchange();
+
+        for (Ignite ig : G.allGrids()) {
+            IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                assertEquals(new IndexedObject(k + 1), cache1.get(k));
+        }
+    }
+
+    /**
+     * Test that cache entry removes are rebalanced properly using WAL.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRebalanceRemoves() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+        IgniteEx ig1 = startGrid(1);
+        final int entryCnt = 10_000;
+
+        ig0.cluster().active(true);
 
-        cache1.rebalance().get(2, TimeUnit.MINUTES);
+        IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
 
         for (int k = 0; k < entryCnt; k++)
-            assertEquals(new IndexedObject(k + 1), cache.get(k));
+            cache.put(k, new IndexedObject(k));
+
+        forceCheckpoint();
+
+        stopGrid(1, false);
+
+        for (int k = 0; k < entryCnt; k++) {
+            if (k % 3 != 2)
+                cache.put(k, new IndexedObject(k + 1));
+            else // Spread removes across all partitions.
+                cache.remove(k);
+        }
+
+        forceCheckpoint();
+
+        ig1 = startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        for (Ignite ig : G.allGrids()) {
+            IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++) {
+                if (k % 3 != 2)
+                    assertEquals(new IndexedObject(k + 1), cache1.get(k));
+                else
+                    assertNull(cache1.get(k));
+            }
+        }
     }
 
     /**

Reply via email to