This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a5c27e5370 IGNITE-16789 Fixed an issue that led to errors in the 
discovery thread and cluster instability
9a5c27e5370 is described below

commit 9a5c27e5370d9c0d44114c262cb1c9f4f47901bf
Author: Slava Koptilin <[email protected]>
AuthorDate: Tue Apr 5 16:24:49 2022 +0300

    IGNITE-16789 Fixed an issue that led to errors in the discovery thread and 
cluster instability
---
 .../cache/CacheAffinitySharedManager.java          |  10 +-
 .../internal/processors/cache/CachesRegistry.java  |   7 +-
 .../processors/cache/ClusterCachesInfo.java        | 129 ++++++++++++++++++---
 .../processors/cache/GridCacheProcessor.java       |   2 +
 .../IgniteAbstractDynamicCacheStartFailTest.java   |  69 ++++++++++-
 5 files changed, 185 insertions(+), 32 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a5a35ce8a0e..06d6f07be9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -850,7 +850,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         assert res.isDone() : "There should be no caches to start: " + 
exchActions;
 
-        processCacheStopRequests(fut, crd, exchActions, true);
+        processCacheStopRequests(fut, crd, exchActions);
 
         cctx.cache().forceCloseCaches(fut.initialVersion(), exchActions);
     }
@@ -882,7 +882,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         processCacheStartRequests(fut, crd, exchActions);
 
-        Set<Integer> stoppedGrps = processCacheStopRequests(fut, crd, 
exchActions, false);
+        Set<Integer> stoppedGrps = processCacheStopRequests(fut, crd, 
exchActions);
 
         if (stoppedGrps != null) {
             AffinityTopologyVersion notifyTopVer = null;
@@ -1068,14 +1068,12 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param fut Exchange future.
      * @param crd Coordinator flag.
      * @param exchActions Cache change requests.
-     * @param forceClose Force close flag.
      * @return Set of cache groups to be stopped.
      */
     private Set<Integer> processCacheStopRequests(
         GridDhtPartitionsExchangeFuture fut,
         boolean crd,
-        final ExchangeActions exchActions,
-        boolean forceClose
+        final ExchangeActions exchActions
     ) {
         assert exchActions != null && !exchActions.empty() : exchActions;
 
@@ -1091,8 +1089,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             if (data.descriptor().config().getCacheMode() != LOCAL) {
                 CacheGroupHolder cacheGrp = 
grpHolders.remove(data.descriptor().groupId());
 
-                assert !crd || (cacheGrp != null || forceClose) : 
data.descriptor();
-
                 if (cacheGrp != null) {
                     if (stoppedGrps == null)
                         stoppedGrps = new HashSet<>();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index 6010cc1d627..f4b4fce8311 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -183,11 +183,8 @@ public class CachesRegistry {
      * @return Future that will be completed when all unregistered cache 
configurations will be persisted.
      */
     public IgniteInternalFuture<?> update(ExchangeActions exchActions) {
-        for (ExchangeActions.CacheGroupActionData stopAction : 
exchActions.cacheGroupsToStop()) {
-            CacheGroupDescriptor rmvd = 
unregisterGroup(stopAction.descriptor().groupId());
-
-            assert rmvd != null : stopAction.descriptor().cacheOrGroupName();
-        }
+        for (ExchangeActions.CacheGroupActionData stopAction : 
exchActions.cacheGroupsToStop())
+            unregisterGroup(stopAction.descriptor().groupId());
 
         for (ExchangeActions.CacheActionData req : 
exchActions.cacheStopRequests())
             unregisterCache(req.descriptor().cacheId());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 15b488319fc..616ad6a0242 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -119,6 +119,13 @@ public class ClusterCachesInfo {
     /** */
     private final GridKernalContext ctx;
 
+    /**
+     * Map contains cache descriptors that were removed from {@link 
#registeredCaches} due to cache stop request.
+     * Such descriptors will be removed from the map only after whole cache 
stop process is finished.
+     */
+    private final ConcurrentNavigableMap<AffinityTopologyVersion, Map<String, 
DynamicCacheDescriptor>>
+        markedForDeletionCaches = new ConcurrentSkipListMap<>();
+
     /**
      * Map contains cache group descriptors that were removed from {@link 
#registeredCacheGrps} due to cache stop request.
      * Such descriptors will be removed from the map only after whole cache 
stop process is finished.
@@ -560,19 +567,48 @@ public class ClusterCachesInfo {
      * @param topVer Current topology version.
      */
     public void onCacheChangeRequested(DynamicCacheChangeFailureMessage 
failMsg, AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion actualTopVer = 
failMsg.exchangeId().topologyVersion();
+
         ExchangeActions exchangeActions = new ExchangeActions();
 
-        List<DynamicCacheChangeRequest> requests = new 
ArrayList<>(failMsg.cacheNames().size());
+        List<T2<DynamicCacheChangeRequest, DynamicCacheDescriptor>> descs = 
new ArrayList<>(failMsg.cacheNames().size());
 
         for (String cacheName : failMsg.cacheNames()) {
-            DynamicCacheDescriptor cacheDescr = 
registeredCaches.get(cacheName);
+            DynamicCacheDescriptor cacheDescr = null;
+
+            // The required cache desriptor may have already been deleted in 
the following scenario:
+            // 1 - DynamicCacheChangeBatch(start a new user cache with name 
"A")
+            // 2 - At the same, the user initiates destroing the cache which 
should be started on the step 1.
+            // 3 - The corresponding exchange (see step 1) results in an 
error, and so, the start should be rolled back.
+            // 4 - When DynamicCacheChangeFailureMessage received the reqired 
cache descriptor is already deleted
+            //     and therefore, we need to scan markedForDeletion 
collections in order to fins the right descriptor.
+
+            // Find the "earliest" available descriptor.
+            for (Map<String, DynamicCacheDescriptor> descriptors : 
markedForDeletionCaches.tailMap(actualTopVer).values()) {
+                cacheDescr = descriptors.get(cacheName);
+
+                if (cacheDescr != null)
+                    break;
+            }
+
+            if (cacheDescr == null)
+                cacheDescr = registeredCaches.get(cacheName);
 
             assert cacheDescr != null : "Dynamic cache descriptor is missing 
[cacheName=" + cacheName + "]";
 
-            requests.add(DynamicCacheChangeRequest.stopRequest(ctx, cacheName, 
cacheDescr.sql(), true));
+            DynamicCacheChangeRequest req = 
DynamicCacheChangeRequest.stopRequest(ctx, cacheName, cacheDescr.sql(), true);
+
+            descs.add(new T2<>(req, cacheDescr));
         }
 
-        processCacheChangeRequests(exchangeActions, requests, topVer, false);
+        CacheChangeProcessResult res = new CacheChangeProcessResult();
+
+        for (T2<DynamicCacheChangeRequest, DynamicCacheDescriptor> desc : 
descs) {
+            DynamicCacheChangeRequest req = desc.get1();
+            DynamicCacheDescriptor cacheDesc = desc.get2();
+
+            processStopCacheRequest(exchangeActions, req, res, 
req.cacheName(), cacheDesc, actualTopVer, true);
+        }
 
         failMsg.exchangeActions(exchangeActions);
     }
@@ -798,7 +834,7 @@ public class ClusterCachesInfo {
                     return;
                 }
 
-                if (!processStopCacheRequest(exchangeActions, req, res, 
cacheName, desc, topVer.nextMinorVersion()))
+                if (!processStopCacheRequest(exchangeActions, req, res, 
cacheName, desc, topVer.nextMinorVersion(), false))
                     return;
 
                 needExchange = true;
@@ -821,6 +857,10 @@ public class ClusterCachesInfo {
      * @param cacheName Cache name.
      * @param desc Dynamic cache descriptor.
      * @param topVer Topology version that will be applied after the 
corresponding partition map exchange.
+     * @param checkForAlreadyDeleted {@code true} indicates that the given 
cache descriptor may have already been deleted.
+     *                              This is possible in the case when the 
{@code DynamicCacheChangeFailureMessage}
+     *                              is received after the {@code 
DynamicCacheChangeRequest},
+     *                              which is responsible for stopping the 
cache.
      * @return {@code true} if stop request can be proceed.
      */
     private boolean processStopCacheRequest(
@@ -829,7 +869,8 @@ public class ClusterCachesInfo {
         CacheChangeProcessResult res,
         String cacheName,
         DynamicCacheDescriptor desc,
-        AffinityTopologyVersion topVer
+        AffinityTopologyVersion topVer,
+        boolean checkForAlreadyDeleted
     ) {
         if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
             IgniteCheckedException err = new 
IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
@@ -843,7 +884,34 @@ public class ClusterCachesInfo {
             return false;
         }
 
-        DynamicCacheDescriptor old = registeredCaches.remove(cacheName);
+        boolean alreadyRemovedDesc = false;
+        DynamicCacheDescriptor old = null;
+
+        if (checkForAlreadyDeleted) {
+            // Find the "earliest" available descriptor.
+            for (Map<String, DynamicCacheDescriptor> descriptors : 
markedForDeletionCaches.tailMap(topVer).values()) {
+                old = descriptors.get(cacheName);
+
+                if (old != null)
+                    break;
+            }
+
+            alreadyRemovedDesc = old != null;
+        }
+
+        if (old == null) {
+            old = registeredCaches.get(cacheName);
+
+            markedForDeletionCaches
+                .computeIfAbsent(topVer, map -> new ConcurrentHashMap<>())
+                .put(cacheName, old);
+
+            registeredCaches.remove(cacheName);
+
+            ctx.discovery().removeCacheFilter(cacheName);
+
+            alreadyRemovedDesc = false;
+        }
 
         assert old != null && old == desc : "Dynamic cache map was 
concurrently modified [req=" + req + ']';
 
@@ -855,24 +923,44 @@ public class ClusterCachesInfo {
             restartingCaches.put(cacheName, restartId == null ? NULL_OBJECT : 
restartId);
         }
 
-        ctx.discovery().removeCacheFilter(cacheName);
-
         exchangeActions.addCacheToStop(req, desc);
 
-        CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId());
+        boolean alreadyRemovedGrp = false;
+        CacheGroupDescriptor grpDesc = null;
+
+        if (checkForAlreadyDeleted) {
+            // Find the "earliest" available descriptor.
+            for (Map<Integer, CacheGroupDescriptor> descriptors : 
markedForDeletionCacheGrps.tailMap(topVer).values()) {
+                grpDesc = descriptors.get(desc.groupId());
+
+                if (grpDesc != null)
+                    break;
+            }
+
+            alreadyRemovedGrp = grpDesc != null;
+        }
+
+        if (grpDesc == null) {
+            grpDesc = registeredCacheGrps.get(desc.groupId());
+
+            alreadyRemovedGrp = false;
+        }
 
         assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc;
 
-        grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
+        if (!alreadyRemovedDesc)
+            grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
 
         if (!grpDesc.hasCaches()) {
-            markedForDeletionCacheGrps
-                .computeIfAbsent(topVer, (map) -> new ConcurrentHashMap<>())
-                .put(grpDesc.groupId(), grpDesc);
+            if (!alreadyRemovedGrp) {
+                markedForDeletionCacheGrps
+                    .computeIfAbsent(topVer, (map) -> new 
ConcurrentHashMap<>())
+                    .put(grpDesc.groupId(), grpDesc);
 
-            registeredCacheGrps.remove(grpDesc.groupId());
+                ctx.discovery().removeCacheGroup(grpDesc);
+            }
 
-            ctx.discovery().removeCacheGroup(grpDesc);
+            registeredCacheGrps.remove(grpDesc.groupId());
 
             exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
 
@@ -1648,6 +1736,15 @@ public class ClusterCachesInfo {
      * @param topVer Topology version.
      */
     public void cleanupRemovedCaches(AffinityTopologyVersion topVer) {
+        markedForDeletionCaches.headMap(topVer, true).clear();
+    }
+
+    /**
+     * Cleanups cache group descriptors that belong to the {@code topVer} and 
earlier.
+     *
+     * @param topVer Topology version.
+     */
+    public void cleanupRemovedCacheGroups(AffinityTopologyVersion topVer) {
         markedForDeletionCacheGrps.headMap(topVer, true).clear();
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c3db29bb3b4..c5efbf8bec7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2894,6 +2894,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         if (!sharedCtx.kernalContext().clientNode())
             sharedCtx.database().onCacheGroupsStopped(grpsToStop);
 
+        cachesInfo.cleanupRemovedCacheGroups(topVer);
+
         if (exchActions.deactivate())
             sharedCtx.deactivate();
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
index b4b1df5c955..5cb444b8338 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java
@@ -63,12 +63,19 @@ import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
 /**
  * Tests the recovery after a dynamic cache start failure.
  */
@@ -611,6 +618,58 @@ public abstract class 
IgniteAbstractDynamicCacheStartFailTest extends GridCacheA
     }
 
     /** */
+    @Test
+    public void testStartAndStopFailedCache() throws Exception {
+        IgniteEx client = startClientGrid(gridCount());
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi spi1 = 
TestRecordingCommunicationSpi.spi(grid(1));
+
+        // Block partition map exchange related to the start of a new cache.
+        spi1.blockMessages((node, msg) -> msg instanceof 
GridDhtPartitionsSingleMessage);
+
+        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(() -> {
+            // Create a new cache that should lead to an error during PME.
+            CacheConfiguration<?, ?> cfg =
+                createCacheConfigsWithBrokenCacheStore(true, 0, 0, 1, 
false).get(0);
+
+            cfg.setName(DYNAMIC_CACHE_NAME);
+
+            // It is expected that the process of creating the cache will be 
failed.
+            client.getOrCreateCache(cfg);
+        });
+
+        // Make sure that PME is blocked.
+        spi1.waitForBlocked();
+
+        AffinityTopologyVersion currVer = 
grid(0).context().discovery().topologyVersionEx();
+        AffinityTopologyVersion expVer = currVer.nextMinorVersion();
+
+        // Let's try to destroy the cache that is being started.
+        IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> 
client.destroyCache(DYNAMIC_CACHE_NAME));
+
+        // Wait for the next minor topology version. It means 
DynamicCacheChangeBatch was processed by disco thread.
+        assertTrue(
+            "Failed to wait for DynamicCacheChangeBatch message (destroy)",
+            waitForCondition(() -> 
grid(0).context().discovery().topologyVersionEx().equals(expVer), 
getTestTimeout()));
+
+        // Unblock the process of creating the cache.
+        spi1.stopBlock();
+
+        // Creating a new cache should throw CacheException.
+        assertThrowsWithCause(() -> startFut.get(getTestTimeout()), 
CacheException.class);
+
+        stopFut.get(getTestTimeout());
+    }
+
+    /**
+     * Initiates creating new caches from the given {@code initiatorId} node 
and
+     * it is expected that creating caches results in error.
+     *
+     * @param cfgs Cache configurations.
+     * @param initiatorId Node index that to be used to initiate cache start.
+     */
     protected void testDynamicCacheStart(final Collection<CacheConfiguration> 
cfgs, final int initiatorId) {
         assert initiatorId < gridCount();
 
@@ -630,14 +689,16 @@ public abstract class 
IgniteAbstractDynamicCacheStartFailTest extends GridCacheA
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
-        IgniteConfiguration res = super.getConfiguration(igniteInstanceName);
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         if (mbSrv == null)
-            mbSrv = new FailureMBeanServer(res.getMBeanServer());
+            mbSrv = new FailureMBeanServer(cfg.getMBeanServer());
 
-        res.setMBeanServer(mbSrv);
+        cfg.setMBeanServer(mbSrv);
 
-        return res;
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
     }
 
     /** {@inheritDoc} */

Reply via email to