IGNITE-8942 In some cases grid cannot be deactivated because of hanging CQ 
internal cleanup. - Fixes #4329.

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08f98e38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08f98e38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08f98e38

Branch: refs/heads/ignite-8446
Commit: 08f98e3841ce751d490e1481f3572fc87e43b239
Parents: d7723dc
Author: ascherbakoff <alexey.scherbak...@gmail.com>
Authored: Wed Jul 11 17:09:05 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Wed Jul 11 17:09:05 2018 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |   4 +-
 .../discovery/GridDiscoveryManager.java         |  10 +-
 .../IgniteSequenceInternalCleanupTest.java      | 147 +++++++++++++++++++
 .../IgniteCacheDataStructuresSelfTestSuite.java |   3 +
 4 files changed, 157 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/08f98e38/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index fef44fa..0bb01f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -164,10 +164,8 @@ public class DiscoCache {
         this.consIdxToNodeId = consIdxToNodeId;
 
         aliveBaselineNodePred = new P1<BaselineNode>() {
-            @Override
-            public boolean apply(BaselineNode node) {
+            @Override public boolean apply(BaselineNode node) {
                 return node instanceof ClusterNode && 
alives.contains(((ClusterNode)node).id());
-
             }
         };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f98e38/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 15badf2..4122fd6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1085,6 +1085,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             /** {@inheritDoc} */
             @Override public Map<Integer, CacheMetrics> cacheMetrics() {
                 try {
+                    /** Caches should not be accessed while state transition 
is in progress. */
+                    if (ctx.state().clusterState().transition())
+                        return Collections.emptyMap();
+
                     Collection<GridCacheAdapter<?, ?>> caches = 
ctx.cache().internalCaches();
 
                     if (!F.isEmpty(caches)) {
@@ -1093,10 +1097,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                         for (GridCacheAdapter<?, ?> cache : caches) {
                             if (cache.context().statisticsEnabled() &&
                                 cache.context().started() &&
-                                
cache.context().affinity().affinityTopologyVersion().topologyVersion() > 0) {
-
+                                
cache.context().affinity().affinityTopologyVersion().topologyVersion() > 0)
                                 metrics.put(cache.context().cacheId(), 
cache.localMetrics());
-                            }
                         }
 
                         return metrics;
@@ -3299,7 +3301,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 if (CU.affinityNode(node, grpAff.cacheFilter)) {
                     if (grpAff.persistentCacheGrp && bltNodes != null && 
!bltNodes.contains(node.id())) // Filter out.
                         continue;
-                    
+
                     List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId);
 
                     if (nodes == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f98e38/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSequenceInternalCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSequenceInternalCleanupTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSequenceInternalCleanupTest.java
new file mode 100644
index 0000000..d5a76f8
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSequenceInternalCleanupTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ */
+public class IgniteSequenceInternalCleanupTest extends GridCommonAbstractTest {
+    /** */
+    public static final int GRIDS_CNT = 5;
+
+    /** */
+    public static final int SEQ_RESERVE = 50_000;
+
+    /** */
+    public static final int CACHES_CNT = 10;
+
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setClientMode("client".equals(igniteInstanceName));
+
+        cfg.setMetricsUpdateFrequency(10);
+
+        cfg.setActiveOnStart(false);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        AtomicConfiguration atomicCfg = atomicConfiguration();
+
+        assertNotNull(atomicCfg);
+
+        cfg.setAtomicConfiguration(atomicCfg);
+
+        List<CacheConfiguration> cacheCfg = new ArrayList<>();
+
+        for (int i = 0; i < CACHES_CNT; i++) {
+            cacheCfg.add(new CacheConfiguration("test" + i).
+                setStatisticsEnabled(true).
+                setCacheMode(PARTITIONED).
+                setAtomicityMode(TRANSACTIONAL).
+                setAffinity(new RendezvousAffinityFunction(false, 16)));
+        }
+
+        cfg.setCacheConfiguration(cacheCfg.toArray(new 
CacheConfiguration[cacheCfg.size()]));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    protected AtomicConfiguration atomicConfiguration() {
+        AtomicConfiguration cfg = new AtomicConfiguration();
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setBackups(1);
+        cfg.setAtomicSequenceReserveSize(SEQ_RESERVE);
+
+        return cfg;
+    }
+
+    /** */
+    public void testDeactivate() throws Exception {
+        try {
+            Ignite ignite = startGridsMultiThreaded(GRIDS_CNT);
+
+            ignite.cache("test0").put(0, 0);
+
+            int id = 0;
+
+            for (Ignite ig : G.allGrids()) {
+                IgniteAtomicSequence seq = ig.atomicSequence("testSeq", 0, 
true);
+
+                long id0 = seq.getAndIncrement();
+
+                assertEquals(id0, id);
+
+                id += SEQ_RESERVE;
+            }
+
+            doSleep(1000);
+
+            long puts = ignite.cache("test0").metrics().getCachePuts();
+
+            assertEquals(1, puts);
+
+            grid(GRIDS_CNT - 1).cluster().active(false);
+
+            ignite.cluster().active(true);
+
+            long putsAfter = ignite.cache("test0").metrics().getCachePuts();
+
+            assertEquals(0, putsAfter);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f98e38/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index 9583143..80e8319 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDa
 import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDiscoveryDataStructuresTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureUniqueNameTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureWithJobTest;
+import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteSequenceInternalCleanupTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.SemaphoreFailoverNoWaitingAcquirerTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.SemaphoreFailoverSafeReleasePermitsTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicQueueApiSelfTest;
@@ -173,6 +174,8 @@ public class IgniteCacheDataStructuresSelfTestSuite extends 
TestSuite {
 
         suite.addTest(new 
TestSuite(IgnitePartitionedQueueNoBackupsTest.class));
 
+        suite.addTest(new TestSuite(IgniteSequenceInternalCleanupTest.class));
+
         suite.addTestSuite(AtomicCacheAffinityConfigurationTest.class);
 
         return suite;

Reply via email to