This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 819485eb389 IGNITE-19141 Fix node failure on rebalancing during caches
stop - Fixes #10613.
819485eb389 is described below
commit 819485eb38989f9ba15502d75cd1f7e24b7dff40
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Mar 31 10:22:29 2023 +0300
IGNITE-19141 Fix node failure on rebalancing during caches stop - Fixes
#10613.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../processors/cache/CacheGroupContext.java | 3 +
.../dht/preloader/GridDhtPartitionDemander.java | 25 ++---
.../dht/preloader/GridDhtPartitionSupplier.java | 4 +
.../IgniteRebalanceRepeatingCacheStopTest.java | 125 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
5 files changed, 141 insertions(+), 18 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 38b9bd1ef15..8e948c4fcda 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -443,6 +443,9 @@ public class CacheGroupContext {
public GridCacheContext singleCacheContext() {
List<GridCacheContext> caches = this.caches;
+ if (caches.isEmpty()) // Cache stopped.
+ return null;
+
assert !sharedGroup() && caches.size() == 1 :
"stopping=" + ctx.kernalContext().isStopping() + ", groupName=" +
ccfg.getGroupName() +
", caches=" + caches;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b5f42214ad3..0ecf17eeee8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -567,26 +567,15 @@ public class GridDhtPartitionDemander {
fut.receivedBytes.addAndGet(supplyMsg.messageSize());
- if (grp.sharedGroup()) {
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
-
- if (keysCnt != -1)
-
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
-
- // Can not be calculated per cache.
-
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
- }
- }
- }
- else {
- GridCacheContext cctx = grp.singleCacheContext();
-
+ for (GridCacheContext cctx : grp.caches()) {
if (cctx.statisticsEnabled()) {
- if (supplyMsg.estimatedKeysCount() != -1)
-
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
+ long keysCnt = grp.sharedGroup() ?
supplyMsg.keysForCache(cctx.cacheId()) :
+ supplyMsg.estimatedKeysCount();
+
+ if (keysCnt != -1)
+
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
+ // Can not be calculated per cache.
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d6cbd8e1267..0cdcb8a61f7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -520,6 +520,10 @@ public class GridDhtPartitionSupplier {
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
}
+ // There can be errors in case of concurrent caches stop. Do not
trigger failure handler in these cases.
+ if (!grp.hasCaches())
+ return;
+
// If fallback to full rebalance is possible then let's try to
switch to it
// instead of triggering failure handler.
if (!fallbackToFullRebalance) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceRepeatingCacheStopTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceRepeatingCacheStopTest.java
new file mode 100644
index 00000000000..11efe4acd41
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceRepeatingCacheStopTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test rebalance during repeating caches stop (due to deactivation or
explicit call).
+ */
+@RunWith(Parameterized.class)
+public class IgniteRebalanceRepeatingCacheStopTest extends
GridCommonAbstractTest {
+ /** */
+ @Parameterized.Parameter()
+ public boolean sharedGrp;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public boolean pds = true;
+
+ /** */
+ @Parameterized.Parameters(name = "sharedGroup={0}, persistence={1}")
+ public static Collection<Object[]> parameters() {
+ return F.asList(
+ new Object[] {Boolean.FALSE, Boolean.FALSE},
+ new Object[] {Boolean.FALSE, Boolean.TRUE},
+ new Object[] {Boolean.TRUE, Boolean.FALSE},
+ new Object[] {Boolean.TRUE, Boolean.TRUE}
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration()
+ .setPersistenceEnabled(pds)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testRebalanceOnDeactivate() throws Exception {
+ doTest(ignite -> {
+ ignite.cluster().state(ClusterState.INACTIVE);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+ });
+ }
+
+ /** */
+ @Test
+ public void testRebalanceOnCacheStop() throws Exception {
+ doTest(ignite -> ignite.cache(DEFAULT_CACHE_NAME).destroy());
+ }
+
+ /** */
+ private void doTest(Consumer<Ignite> action) throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+ ignite0.cluster().state(ClusterState.ACTIVE);
+ ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+ for (int i = 0; i < 10; i++) {
+ IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
+ new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+ .setBackups(1)
+ .setGroupName(sharedGrp ? "grp" : null)
+ .setAffinity(new RendezvousAffinityFunction(false, 1)));
+
+ cache.clear();
+
+ stopGrid(0);
+
+ try (IgniteDataStreamer<Integer, Integer> streamer =
ignite1.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int j = 0; j < 100_000; j++)
+ streamer.addData(j, j);
+ }
+
+ ignite0 = startGrid(0);
+
+ action.accept(ignite0);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 351b99f1826..986ca50a1f1 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.ResetLostPartitionTest;
import
org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import
org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
+import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceRepeatingCacheStopTest;
import
org.apache.ignite.internal.processors.cache.persistence.CorruptedTreeFailureHandlingTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCheckpointMapSnapshotTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
@@ -77,6 +78,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, ResetLostPartitionTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
RebalanceAfterResettingLostPartitionTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteRebalanceOnCachesStoppingOrDestroyingTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgniteRebalanceRepeatingCacheStopTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CachePageWriteLockUnlockTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsStartWIthEmptyArchive.class, ignoredTests);