This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9a5c27e5370 IGNITE-16789 Fixed an issue that led to errors in the
discovery thread and cluster instability
9a5c27e5370 is described below
commit 9a5c27e5370d9c0d44114c262cb1c9f4f47901bf
Author: Slava Koptilin <[email protected]>
AuthorDate: Tue Apr 5 16:24:49 2022 +0300
IGNITE-16789 Fixed an issue that led to errors in the discovery thread and
cluster instability
---
.../cache/CacheAffinitySharedManager.java | 10 +-
.../internal/processors/cache/CachesRegistry.java | 7 +-
.../processors/cache/ClusterCachesInfo.java | 129 ++++++++++++++++++---
.../processors/cache/GridCacheProcessor.java | 2 +
.../IgniteAbstractDynamicCacheStartFailTest.java | 69 ++++++++++-
5 files changed, 185 insertions(+), 32 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a5a35ce8a0e..06d6f07be9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -850,7 +850,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
assert res.isDone() : "There should be no caches to start: " +
exchActions;
- processCacheStopRequests(fut, crd, exchActions, true);
+ processCacheStopRequests(fut, crd, exchActions);
cctx.cache().forceCloseCaches(fut.initialVersion(), exchActions);
}
@@ -882,7 +882,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
processCacheStartRequests(fut, crd, exchActions);
- Set<Integer> stoppedGrps = processCacheStopRequests(fut, crd,
exchActions, false);
+ Set<Integer> stoppedGrps = processCacheStopRequests(fut, crd,
exchActions);
if (stoppedGrps != null) {
AffinityTopologyVersion notifyTopVer = null;
@@ -1068,14 +1068,12 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
* @param fut Exchange future.
* @param crd Coordinator flag.
* @param exchActions Cache change requests.
- * @param forceClose Force close flag.
* @return Set of cache groups to be stopped.
*/
private Set<Integer> processCacheStopRequests(
GridDhtPartitionsExchangeFuture fut,
boolean crd,
- final ExchangeActions exchActions,
- boolean forceClose
+ final ExchangeActions exchActions
) {
assert exchActions != null && !exchActions.empty() : exchActions;
@@ -1091,8 +1089,6 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
if (data.descriptor().config().getCacheMode() != LOCAL) {
CacheGroupHolder cacheGrp =
grpHolders.remove(data.descriptor().groupId());
- assert !crd || (cacheGrp != null || forceClose) :
data.descriptor();
-
if (cacheGrp != null) {
if (stoppedGrps == null)
stoppedGrps = new HashSet<>();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index 6010cc1d627..f4b4fce8311 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -183,11 +183,8 @@ public class CachesRegistry {
* @return Future that will be completed when all unregistered cache
configurations will be persisted.
*/
public IgniteInternalFuture<?> update(ExchangeActions exchActions) {
- for (ExchangeActions.CacheGroupActionData stopAction :
exchActions.cacheGroupsToStop()) {
- CacheGroupDescriptor rmvd =
unregisterGroup(stopAction.descriptor().groupId());
-
- assert rmvd != null : stopAction.descriptor().cacheOrGroupName();
- }
+ for (ExchangeActions.CacheGroupActionData stopAction :
exchActions.cacheGroupsToStop())
+ unregisterGroup(stopAction.descriptor().groupId());
for (ExchangeActions.CacheActionData req :
exchActions.cacheStopRequests())
unregisterCache(req.descriptor().cacheId());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 15b488319fc..616ad6a0242 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -119,6 +119,13 @@ public class ClusterCachesInfo {
/** */
private final GridKernalContext ctx;
+ /**
+ * Map contains cache descriptors that were removed from {@link
#registeredCaches} due to cache stop request.
+ * Such descriptors will be removed from the map only after whole cache
stop process is finished.
+ */
+ private final ConcurrentNavigableMap<AffinityTopologyVersion, Map<String,
DynamicCacheDescriptor>>
+ markedForDeletionCaches = new ConcurrentSkipListMap<>();
+
/**
* Map contains cache group descriptors that were removed from {@link
#registeredCacheGrps} due to cache stop request.
* Such descriptors will be removed from the map only after whole cache
stop process is finished.
@@ -560,19 +567,48 @@ public class ClusterCachesInfo {
* @param topVer Current topology version.
*/
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage
failMsg, AffinityTopologyVersion topVer) {
+ AffinityTopologyVersion actualTopVer =
failMsg.exchangeId().topologyVersion();
+
ExchangeActions exchangeActions = new ExchangeActions();
- List<DynamicCacheChangeRequest> requests = new
ArrayList<>(failMsg.cacheNames().size());
+ List<T2<DynamicCacheChangeRequest, DynamicCacheDescriptor>> descs =
new ArrayList<>(failMsg.cacheNames().size());
for (String cacheName : failMsg.cacheNames()) {
- DynamicCacheDescriptor cacheDescr =
registeredCaches.get(cacheName);
+ DynamicCacheDescriptor cacheDescr = null;
+
+ // The required cache desriptor may have already been deleted in
the following scenario:
+ // 1 - DynamicCacheChangeBatch(start a new user cache with name
"A")
+ // 2 - At the same, the user initiates destroing the cache which
should be started on the step 1.
+ // 3 - The corresponding exchange (see step 1) results in an
error, and so, the start should be rolled back.
+ // 4 - When DynamicCacheChangeFailureMessage received the reqired
cache descriptor is already deleted
+ // and therefore, we need to scan markedForDeletion
collections in order to fins the right descriptor.
+
+ // Find the "earliest" available descriptor.
+ for (Map<String, DynamicCacheDescriptor> descriptors :
markedForDeletionCaches.tailMap(actualTopVer).values()) {
+ cacheDescr = descriptors.get(cacheName);
+
+ if (cacheDescr != null)
+ break;
+ }
+
+ if (cacheDescr == null)
+ cacheDescr = registeredCaches.get(cacheName);
assert cacheDescr != null : "Dynamic cache descriptor is missing
[cacheName=" + cacheName + "]";
- requests.add(DynamicCacheChangeRequest.stopRequest(ctx, cacheName,
cacheDescr.sql(), true));
+ DynamicCacheChangeRequest req =
DynamicCacheChangeRequest.stopRequest(ctx, cacheName, cacheDescr.sql(), true);
+
+ descs.add(new T2<>(req, cacheDescr));
}
- processCacheChangeRequests(exchangeActions, requests, topVer, false);
+ CacheChangeProcessResult res = new CacheChangeProcessResult();
+
+ for (T2<DynamicCacheChangeRequest, DynamicCacheDescriptor> desc :
descs) {
+ DynamicCacheChangeRequest req = desc.get1();
+ DynamicCacheDescriptor cacheDesc = desc.get2();
+
+ processStopCacheRequest(exchangeActions, req, res,
req.cacheName(), cacheDesc, actualTopVer, true);
+ }
failMsg.exchangeActions(exchangeActions);
}
@@ -798,7 +834,7 @@ public class ClusterCachesInfo {
return;
}
- if (!processStopCacheRequest(exchangeActions, req, res,
cacheName, desc, topVer.nextMinorVersion()))
+ if (!processStopCacheRequest(exchangeActions, req, res,
cacheName, desc, topVer.nextMinorVersion(), false))
return;
needExchange = true;
@@ -821,6 +857,10 @@ public class ClusterCachesInfo {
* @param cacheName Cache name.
* @param desc Dynamic cache descriptor.
* @param topVer Topology version that will be applied after the
corresponding partition map exchange.
+ * @param checkForAlreadyDeleted {@code true} indicates that the given
cache descriptor may have already been deleted.
+ * This is possible in the case when the
{@code DynamicCacheChangeFailureMessage}
+ * is received after the {@code
DynamicCacheChangeRequest},
+ * which is responsible for stopping the
cache.
* @return {@code true} if stop request can be proceed.
*/
private boolean processStopCacheRequest(
@@ -829,7 +869,8 @@ public class ClusterCachesInfo {
CacheChangeProcessResult res,
String cacheName,
DynamicCacheDescriptor desc,
- AffinityTopologyVersion topVer
+ AffinityTopologyVersion topVer,
+ boolean checkForAlreadyDeleted
) {
if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
IgniteCheckedException err = new
IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
@@ -843,7 +884,34 @@ public class ClusterCachesInfo {
return false;
}
- DynamicCacheDescriptor old = registeredCaches.remove(cacheName);
+ boolean alreadyRemovedDesc = false;
+ DynamicCacheDescriptor old = null;
+
+ if (checkForAlreadyDeleted) {
+ // Find the "earliest" available descriptor.
+ for (Map<String, DynamicCacheDescriptor> descriptors :
markedForDeletionCaches.tailMap(topVer).values()) {
+ old = descriptors.get(cacheName);
+
+ if (old != null)
+ break;
+ }
+
+ alreadyRemovedDesc = old != null;
+ }
+
+ if (old == null) {
+ old = registeredCaches.get(cacheName);
+
+ markedForDeletionCaches
+ .computeIfAbsent(topVer, map -> new ConcurrentHashMap<>())
+ .put(cacheName, old);
+
+ registeredCaches.remove(cacheName);
+
+ ctx.discovery().removeCacheFilter(cacheName);
+
+ alreadyRemovedDesc = false;
+ }
assert old != null && old == desc : "Dynamic cache map was
concurrently modified [req=" + req + ']';
@@ -855,24 +923,44 @@ public class ClusterCachesInfo {
restartingCaches.put(cacheName, restartId == null ? NULL_OBJECT :
restartId);
}
- ctx.discovery().removeCacheFilter(cacheName);
-
exchangeActions.addCacheToStop(req, desc);
- CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId());
+ boolean alreadyRemovedGrp = false;
+ CacheGroupDescriptor grpDesc = null;
+
+ if (checkForAlreadyDeleted) {
+ // Find the "earliest" available descriptor.
+ for (Map<Integer, CacheGroupDescriptor> descriptors :
markedForDeletionCacheGrps.tailMap(topVer).values()) {
+ grpDesc = descriptors.get(desc.groupId());
+
+ if (grpDesc != null)
+ break;
+ }
+
+ alreadyRemovedGrp = grpDesc != null;
+ }
+
+ if (grpDesc == null) {
+ grpDesc = registeredCacheGrps.get(desc.groupId());
+
+ alreadyRemovedGrp = false;
+ }
assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc;
- grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
+ if (!alreadyRemovedDesc)
+ grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
if (!grpDesc.hasCaches()) {
- markedForDeletionCacheGrps
- .computeIfAbsent(topVer, (map) -> new ConcurrentHashMap<>())
- .put(grpDesc.groupId(), grpDesc);
+ if (!alreadyRemovedGrp) {
+ markedForDeletionCacheGrps
+ .computeIfAbsent(topVer, (map) -> new
ConcurrentHashMap<>())
+ .put(grpDesc.groupId(), grpDesc);
- registeredCacheGrps.remove(grpDesc.groupId());
+ ctx.discovery().removeCacheGroup(grpDesc);
+ }
- ctx.discovery().removeCacheGroup(grpDesc);
+ registeredCacheGrps.remove(grpDesc.groupId());
exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
@@ -1648,6 +1736,15 @@ public class ClusterCachesInfo {
* @param topVer Topology version.
*/
public void cleanupRemovedCaches(AffinityTopologyVersion topVer) {
+ markedForDeletionCaches.headMap(topVer, true).clear();
+ }
+
+ /**
+ * Cleanups cache group descriptors that belong to the {@code topVer} and
earlier.
+ *
+ * @param topVer Topology version.
+ */
+ public void cleanupRemovedCacheGroups(AffinityTopologyVersion topVer) {
markedForDeletionCacheGrps.headMap(topVer, true).clear();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c3db29bb3b4..c5efbf8bec7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2894,6 +2894,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
if (!sharedCtx.kernalContext().clientNode())
sharedCtx.database().onCacheGroupsStopped(grpsToStop);
+ cachesInfo.cleanupRemovedCacheGroups(topVer);
+
if (exchActions.deactivate())
sharedCtx.deactivate();
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
index b4b1df5c955..5cb444b8338 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
@@ -63,12 +63,19 @@ import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
/**
* Tests the recovery after a dynamic cache start failure.
*/
@@ -611,6 +618,58 @@ public abstract class
IgniteAbstractDynamicCacheStartFailTest extends GridCacheA
}
/** */
+ @Test
+ public void testStartAndStopFailedCache() throws Exception {
+ IgniteEx client = startClientGrid(gridCount());
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi spi1 =
TestRecordingCommunicationSpi.spi(grid(1));
+
+ // Block partition map exchange related to the start of a new cache.
+ spi1.blockMessages((node, msg) -> msg instanceof
GridDhtPartitionsSingleMessage);
+
+ IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(() -> {
+ // Create a new cache that should lead to an error during PME.
+ CacheConfiguration<?, ?> cfg =
+ createCacheConfigsWithBrokenCacheStore(true, 0, 0, 1,
false).get(0);
+
+ cfg.setName(DYNAMIC_CACHE_NAME);
+
+ // It is expected that the process of creating the cache will be
failed.
+ client.getOrCreateCache(cfg);
+ });
+
+ // Make sure that PME is blocked.
+ spi1.waitForBlocked();
+
+ AffinityTopologyVersion currVer =
grid(0).context().discovery().topologyVersionEx();
+ AffinityTopologyVersion expVer = currVer.nextMinorVersion();
+
+ // Let's try to destroy the cache that is being started.
+ IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() ->
client.destroyCache(DYNAMIC_CACHE_NAME));
+
+ // Wait for the next minor topology version. It means
DynamicCacheChangeBatch was processed by disco thread.
+ assertTrue(
+ "Failed to wait for DynamicCacheChangeBatch message (destroy)",
+ waitForCondition(() ->
grid(0).context().discovery().topologyVersionEx().equals(expVer),
getTestTimeout()));
+
+ // Unblock the process of creating the cache.
+ spi1.stopBlock();
+
+ // Creating a new cache should throw CacheException.
+ assertThrowsWithCause(() -> startFut.get(getTestTimeout()),
CacheException.class);
+
+ stopFut.get(getTestTimeout());
+ }
+
+ /**
+ * Initiates creating new caches from the given {@code initiatorId} node
and
+ * it is expected that creating caches results in error.
+ *
+ * @param cfgs Cache configurations.
+ * @param initiatorId Node index that to be used to initiate cache start.
+ */
protected void testDynamicCacheStart(final Collection<CacheConfiguration>
cfgs, final int initiatorId) {
assert initiatorId < gridCount();
@@ -630,14 +689,16 @@ public abstract class
IgniteAbstractDynamicCacheStartFailTest extends GridCacheA
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
- IgniteConfiguration res = super.getConfiguration(igniteInstanceName);
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (mbSrv == null)
- mbSrv = new FailureMBeanServer(res.getMBeanServer());
+ mbSrv = new FailureMBeanServer(cfg.getMBeanServer());
- res.setMBeanServer(mbSrv);
+ cfg.setMBeanServer(mbSrv);
- return res;
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return cfg;
}
/** {@inheritDoc} */