This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6033d49deee IGNITE-26152 Fixed node stop hang during snapshot
creation. (#12257)
6033d49deee is described below
commit 6033d49deee16cd96c9680971f1fc6f987e39eab
Author: Nikita Amelchev <[email protected]>
AuthorDate: Tue Oct 7 15:07:59 2025 +0300
IGNITE-26152 Fixed node stop hang during snapshot creation. (#12257)
---
.../IgniteAuthenticationProcessor.java | 2 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 2 +-
.../dht/preloader/PartitionsExchangeAware.java | 4 +-
.../snapshot/IgniteSnapshotManager.java | 50 +++++++++++-----------
.../processors/platform/PlatformContextImpl.java | 2 +-
.../distributed/PartitionsExchangeAwareTest.java | 6 ++-
.../topology/DelayedOwningDuringExchangeTest.java | 6 ++-
...nitePdsConsistencyOnDelayedPartitionOwning.java | 6 ++-
.../snapshot/IgniteClusterSnapshotSelfTest.java | 6 ++-
.../snapshot/IgniteSnapshotRemoteRequestTest.java | 15 ++++++-
10 files changed, 64 insertions(+), 35 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
index de96c525c83..22139ca7307 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
@@ -828,7 +828,7 @@ public class IgniteAuthenticationProcessor extends
GridProcessorAdapter implemen
}
/** {@inheritDoc} */
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut, @Nullable
Throwable err) {
activateFut.onDone();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d9715338ea2..35ac04f6385 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2456,7 +2456,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
for (PartitionsExchangeAware comp :
cctx.exchange().exchangeAwareComponents())
- comp.onDoneBeforeTopologyUnlock(this);
+ comp.onDoneBeforeTopologyUnlock(this, err);
// Create and destroy caches and cache proxies.
cctx.cache().onExchangeDone(this, err);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
index 7318726ff15..b05efc826b5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.jetbrains.annotations.Nullable;
/**
* Interface which allows to subscribe a component for partition map exchange
events
@@ -53,8 +54,9 @@ public interface PartitionsExchangeAware {
* Guarantees that no updates were performed on local node since exchange
process started.
*
* @param fut Partition map exchange future.
+ * @param err Optional error, e.g. node stopping.
*/
- public default void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ public default void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut, @Nullable
Throwable err) {
// No-op.
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 39b6982aa18..5f3070700a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -650,42 +650,37 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
/** {@inheritDoc} */
- @Override protected void stop0(boolean cancel) {
+ @Override protected void onKernalStop0(boolean cancel) {
busyLock.block();
- try {
- snpRmtMgr.stop();
+ snpRmtMgr.stop();
- IgniteCheckedException stopErr = new NodeStoppingException("Node
is stopping.");
+ IgniteCheckedException stopErr = new NodeStoppingException("Node is
stopping.");
- restoreCacheGrpProc.interrupt(stopErr);
- checkSnpProc.interrupt(stopErr);
+ restoreCacheGrpProc.interrupt(stopErr);
+ checkSnpProc.interrupt(stopErr);
- // Try stop all snapshot processing if not yet.
- for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
- sctx.acceptException(new
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+ // Try stop all snapshot processing if not yet.
+ for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
+ sctx.acceptException(new
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
- locSnpTasks.clear();
+ locSnpTasks.clear();
- synchronized (snpOpMux) {
- if (clusterSnpFut != null) {
- clusterSnpFut.onDone(new
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+ synchronized (snpOpMux) {
+ if (clusterSnpFut != null) {
+ clusterSnpFut.onDone(new
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
- clusterSnpFut = null;
- }
+ clusterSnpFut = null;
}
+ }
-
cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
-
cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
- if (discoLsnr != null)
-
cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+ if (discoLsnr != null)
+
cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
- cctx.exchange().unregisterExchangeAwareComponent(this);
- }
- finally {
- busyLock.unblock();
- }
+ cctx.exchange().unregisterExchangeAwareComponent(this);
}
/** {@inheritDoc} */
@@ -2221,7 +2216,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
/** {@inheritDoc} */
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut, @Nullable
Throwable err) {
if (clusterSnpReq == null || cctx.kernalContext().clientNode() ||
!isSnapshotOperation(fut.firstEvent()))
return;
@@ -2234,6 +2229,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
if (task == null)
return;
+ else if (err != null) {
+ task.onDone(err);
+
+ return;
+ }
if (task.start()) {
cctx.database().forceNewCheckpoint(String.format("Start snapshot
operation: %s", snpReq.snapshotName()), lsnr -> {});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
index d684d02a325..2be1c9303b7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java
@@ -679,7 +679,7 @@ public class PlatformContextImpl implements
PlatformContext, PartitionsExchangeA
}
/** {@inheritDoc} */
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut, @Nullable
Throwable err) {
AffinityTopologyVersion ver = fut.topologyVersion();
if (ver != null) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
index e27cb1c4dc8..b8e988ab6a9 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -183,7 +184,10 @@ public class PartitionsExchangeAwareTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void onDoneBeforeTopologyUnlock(
+ GridDhtPartitionsExchangeFuture fut,
+ @Nullable Throwable err
+ ) {
try {
onDoneBeforeUnlockReachedLatch.countDown();
onDoneBeforeUnlockWaitLatch.await();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
index 668ea6d0a79..2b390b14b8f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
@@ -130,7 +131,10 @@ public class DelayedOwningDuringExchangeTest extends
GridCommonAbstractTest {
wait(fut, 0);
}
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void onDoneBeforeTopologyUnlock(
+ GridDhtPartitionsExchangeFuture fut,
+ @Nullable Throwable err
+ ) {
wait(fut, 1);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsConsistencyOnDelayedPartitionOwning.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsConsistencyOnDelayedPartitionOwning.java
index 8e6c559db25..3654b5f4447 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsConsistencyOnDelayedPartitionOwning.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsConsistencyOnDelayedPartitionOwning.java
@@ -49,6 +49,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
@@ -203,7 +204,10 @@ public class IgnitePdsConsistencyOnDelayedPartitionOwning
extends GridCommonAbst
});
grid(1).context().cache().context().exchange().registerExchangeAwareComponent(new
PartitionsExchangeAware() {
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void onDoneBeforeTopologyUnlock(
+ GridDhtPartitionsExchangeFuture fut,
+ @Nullable Throwable err
+ ) {
if (fut.initialVersion().equals(new AffinityTopologyVersion(7,
0))) {
topInitLatch.countDown();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index 024a9107010..93390746e6f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -85,6 +85,7 @@ import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
import org.junit.Before;
import org.junit.Test;
@@ -1134,7 +1135,10 @@ public class IgniteClusterSnapshotSelfTest extends
AbstractSnapshotSelfTest {
assertEquals("Exchange order violated: " +
fut.firstEvent(), 1, order.getAndIncrement());
}
- @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void onDoneBeforeTopologyUnlock(
+ GridDhtPartitionsExchangeFuture fut,
+ @Nullable Throwable err
+ ) {
assertEquals("Exchange order violated: " +
fut.firstEvent(), 2, order.getAndIncrement());
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index 53e0a9a54ff..2e7813b8083 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
@@ -268,8 +269,18 @@ public class IgniteSnapshotRemoteRequestTest extends
IgniteClusterSnapshotRestor
latch.countDown();
- assertThrowsAnyCause(log, () -> fut.get(TIMEOUT),
ClusterTopologyCheckedException.class,
- "he node from which a snapshot has been requested left the grid");
+ try {
+ fut.get(TIMEOUT);
+ }
+ catch (Exception e) {
+ boolean expErr = X.hasCause(e, "The node from which a snapshot has
been requested left the grid",
+ ClusterTopologyCheckedException.class)
+ || X.hasCause(e, "Request cancelled. The snapshot operation
stopped on the remote node with an error: " +
+ "The operation is cancelled due to the local node is
stopping", IgniteCheckedException.class);
+
+ if (!expErr)
+ fail(e.getMessage());
+ }
}
/** @throws Exception If fails. */