This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 819485eb389 IGNITE-19141 Fix node failure on rebalancing during caches 
stop - Fixes #10613.
819485eb389 is described below

commit 819485eb38989f9ba15502d75cd1f7e24b7dff40
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Mar 31 10:22:29 2023 +0300

    IGNITE-19141 Fix node failure on rebalancing during caches stop - Fixes 
#10613.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../processors/cache/CacheGroupContext.java        |   3 +
 .../dht/preloader/GridDhtPartitionDemander.java    |  25 ++---
 .../dht/preloader/GridDhtPartitionSupplier.java    |   4 +
 .../IgniteRebalanceRepeatingCacheStopTest.java     | 125 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite4.java     |   2 +
 5 files changed, 141 insertions(+), 18 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 38b9bd1ef15..8e948c4fcda 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -443,6 +443,9 @@ public class CacheGroupContext {
     public GridCacheContext singleCacheContext() {
         List<GridCacheContext> caches = this.caches;
 
+        if (caches.isEmpty()) // Cache stopped.
+            return null;
+
         assert !sharedGroup() && caches.size() == 1 :
             "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + 
ccfg.getGroupName() +
                 ", caches=" + caches;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b5f42214ad3..0ecf17eeee8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -567,26 +567,15 @@ public class GridDhtPartitionDemander {
 
             fut.receivedBytes.addAndGet(supplyMsg.messageSize());
 
-            if (grp.sharedGroup()) {
-                for (GridCacheContext cctx : grp.caches()) {
-                    if (cctx.statisticsEnabled()) {
-                        long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
-
-                        if (keysCnt != -1)
-                            
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
-
-                        // Can not be calculated per cache.
-                        
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
-                    }
-                }
-            }
-            else {
-                GridCacheContext cctx = grp.singleCacheContext();
-
+            for (GridCacheContext cctx : grp.caches()) {
                 if (cctx.statisticsEnabled()) {
-                    if (supplyMsg.estimatedKeysCount() != -1)
-                        
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
+                    long keysCnt = grp.sharedGroup() ? 
supplyMsg.keysForCache(cctx.cacheId()) :
+                        supplyMsg.estimatedKeysCount();
+
+                    if (keysCnt != -1)
+                        
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
 
+                    // Can not be calculated per cache.
                     
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
                 }
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d6cbd8e1267..0cdcb8a61f7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -520,6 +520,10 @@ public class GridDhtPartitionSupplier {
                     + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
             }
 
+            // There can be errors in case of concurrent caches stop. Do not 
trigger failure handler in these cases.
+            if (!grp.hasCaches())
+                return;
+
             // If fallback to full rebalance is possible then let's try to 
switch to it
             // instead of triggering failure handler.
             if (!fallbackToFullRebalance) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceRepeatingCacheStopTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceRepeatingCacheStopTest.java
new file mode 100644
index 00000000000..11efe4acd41
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceRepeatingCacheStopTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test rebalance during repeating caches stop (due to deactivation or 
explicit call).
+ */
+@RunWith(Parameterized.class)
+public class IgniteRebalanceRepeatingCacheStopTest extends 
GridCommonAbstractTest {
+    /** */
+    @Parameterized.Parameter()
+    public boolean sharedGrp;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean pds = true;
+
+    /** */
+    @Parameterized.Parameters(name = "sharedGroup={0}, persistence={1}")
+    public static Collection<Object[]> parameters() {
+        return F.asList(
+            new Object[] {Boolean.FALSE, Boolean.FALSE},
+            new Object[] {Boolean.FALSE, Boolean.TRUE},
+            new Object[] {Boolean.TRUE, Boolean.FALSE},
+            new Object[] {Boolean.TRUE, Boolean.TRUE}
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                    .setPersistenceEnabled(pds)));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testRebalanceOnDeactivate() throws Exception {
+        doTest(ignite -> {
+            ignite.cluster().state(ClusterState.INACTIVE);
+
+            ignite.cluster().state(ClusterState.ACTIVE);
+        });
+    }
+
+    /** */
+    @Test
+    public void testRebalanceOnCacheStop() throws Exception {
+        doTest(ignite -> ignite.cache(DEFAULT_CACHE_NAME).destroy());
+    }
+
+    /** */
+    private void doTest(Consumer<Ignite> action) throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+        ignite0.cluster().state(ClusterState.ACTIVE);
+        ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+        for (int i = 0; i < 10; i++) {
+            IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
+                new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+                    .setBackups(1)
+                    .setGroupName(sharedGrp ? "grp" : null)
+                    .setAffinity(new RendezvousAffinityFunction(false, 1)));
+
+            cache.clear();
+
+            stopGrid(0);
+
+            try (IgniteDataStreamer<Integer, Integer> streamer = 
ignite1.dataStreamer(DEFAULT_CACHE_NAME)) {
+                for (int j = 0; j < 100_000; j++)
+                    streamer.addData(j, j);
+            }
+
+            ignite0 = startGrid(0);
+
+            action.accept(ignite0);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 351b99f1826..986ca50a1f1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.ResetLostPartitionTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
 import 
org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceRepeatingCacheStopTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.CorruptedTreeFailureHandlingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCheckpointMapSnapshotTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
@@ -77,6 +78,7 @@ public class IgnitePdsTestSuite4 {
         GridTestUtils.addTestIfNeeded(suite, ResetLostPartitionTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
RebalanceAfterResettingLostPartitionTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteRebalanceOnCachesStoppingOrDestroyingTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgniteRebalanceRepeatingCacheStopTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CachePageWriteLockUnlockTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsStartWIthEmptyArchive.class, ignoredTests);

Reply via email to