This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6b9c247 IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853. 6b9c247 is described below commit 6b9c24786972174c5a3bf125d2cfb0644cf10957 Author: Sergey Chugunov <sergey.chugu...@gmail.com> AuthorDate: Wed Jan 23 11:20:46 2019 +0300 IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../processors/cache/ClusterCachesInfo.java | 56 ++++++ .../processors/cache/GridCacheProcessor.java | 10 + ...gniteDiscoveryDataHandlingInNewClusterTest.java | 220 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite4.java | 2 + 4 files changed, 288 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index e3b67f2..7d5816a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -133,6 +134,61 @@ class ClusterCachesInfo { } /** + * Filters all dynamic cache descriptors and groups that were not presented on node start + * and were received with grid discovery data. + * + * @param localConfigData node's local cache configurations + * (both from static config and stored with persistent caches). + * + */ + public void filterDynamicCacheDescriptors(CacheJoinNodeDiscoveryData localConfigData) { + if (ctx.isDaemon()) + return; + + Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = localConfigData.caches(); + + Iterator<Map.Entry<String, DynamicCacheDescriptor>> cachesIter = registeredCaches.entrySet().iterator(); + + while (cachesIter.hasNext()) { + Map.Entry<String, DynamicCacheDescriptor> e = cachesIter.next(); + + if (!caches.containsKey(e.getKey())) { + cachesIter.remove(); + + ctx.discovery().removeCacheFilter(e.getKey()); + } + } + + Iterator<Map.Entry<Integer, CacheGroupDescriptor>> grpsIter = registeredCacheGrps.entrySet().iterator(); + + while (grpsIter.hasNext()) { + Map.Entry<Integer, CacheGroupDescriptor> e = grpsIter.next(); + + boolean removeGrp = true; + + for (DynamicCacheDescriptor cacheDescr : registeredCaches.values()) { + if (cacheDescr.groupId() == e.getKey()) { + removeGrp = false; + + break; + } + } + + if (removeGrp) { + grpsIter.remove(); + + ctx.discovery().removeCacheGroup(e.getValue()); + } + } + + locJoinCachesCtx = new LocalJoinCachesContext( + locJoinCachesCtx.caches(), + locJoinCachesCtx.initCaches(), + registeredCacheGrps, + registeredCaches); + } + + /** * @param joinDiscoData Information about configured caches and templates. * @throws IgniteCheckedException If configuration validation failed. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 933a9e3..84dfe37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -284,6 +284,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Tmp storage for meta migration. */ private MetaStorage.TmpStorage tmpStorage; + /** Node's local cache configurations (both from static configuration and from persistent caches). */ + private CacheJoinNodeDiscoveryData localConfigs; + /** * @param ctx Kernal context. */ @@ -783,6 +786,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { startAllCachesOnClientStart() ); + localConfigs = discoData; + cachesInfo.onStart(discoData); } @@ -1982,6 +1987,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Caches to be started when this node starts. */ @Nullable public LocalJoinCachesContext localJoinCachesContext() { + if (ctx.discovery().localNode().order() == 1 && localConfigs != null) + cachesInfo.filterDynamicCacheDescriptors(localConfigs); + + localConfigs = null; + return cachesInfo.localJoinCachesContext(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java new file mode 100644 index 0000000..2279751 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache; + +import java.util.Map; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * + */ +@RunWith(JUnit4.class) +public class IgniteDiscoveryDataHandlingInNewClusterTest extends GridCommonAbstractTest { + /** */ + private static final String NODE_1_CONS_ID = "node01"; + + /** */ + private static final String NODE_2_CONS_ID = "node02"; + + /** */ + private static final String NODE_3_CONS_ID = "node03"; + + /** */ + private static final String STATIC_CACHE_NAME = "staticCache"; + + /** */ + private static final String DYNAMIC_CACHE_NAME_1 = "dynamicCache1"; + + /** */ + private static final String DYNAMIC_CACHE_NAME_2 = "dynamicCache2"; + + /** Group where static and dynamic caches reside. */ + private static final String GROUP_WITH_STATIC_CACHES = "group1"; + + /** Group where only dynamic caches reside. */ + private static final String GROUP_WITH_DYNAMIC_CACHES = "group2"; + + /** Node filter to pin dynamic caches to a specific node. */ + private static final IgnitePredicate<ClusterNode> nodeFilter = new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return node.consistentId().toString().contains(NODE_1_CONS_ID); + } + }; + + /** Discovery SPI aimed to fail node with it when another server node joins the topology. */ + private TcpDiscoverySpi failingOnNodeJoinSpi = new TcpDiscoverySpi() { + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + super.startMessageProcess(msg); + + throw new RuntimeException("Simulation of failure of node " + NODE_1_CONS_ID); + } + + super.startMessageProcess(msg); + } + }; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.contains(NODE_1_CONS_ID)) { + failingOnNodeJoinSpi.setIpFinder(sharedStaticIpFinder); + failingOnNodeJoinSpi.setJoinTimeout(60_000); + + cfg.setDiscoverySpi(failingOnNodeJoinSpi); + } + + cfg.setConsistentId(igniteInstanceName); + + if (igniteInstanceName.contains("client")) + cfg.setClientMode(true); + else { + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setInitialSize(10500000) + .setMaxSize(6659883008L) + .setPersistenceEnabled(false) + ) + ); + } + + CacheConfiguration staticCacheCfg = new CacheConfiguration(STATIC_CACHE_NAME) + .setGroupName(GROUP_WITH_STATIC_CACHES) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setNodeFilter(nodeFilter); + + cfg.setCacheConfiguration(staticCacheCfg); + + return cfg; + } + + /** + * Verifies that new node received discovery data from stopped grid filters it out + * from GridCacheProcessor and GridDiscoveryManager internal structures. + * + * All subsequent servers and clients join topology successfully. + * + * See related ticket <a href="https://issues.apache.org/jira/browse/IGNITE-10878">IGNITE-10878</a>. + */ + @Test + public void testNewClusterFiltersDiscoveryDataReceivedFromStoppedCluster() throws Exception { + IgniteEx ig0 = startGrid(NODE_1_CONS_ID); + + prepareDynamicCaches(ig0); + + IgniteEx ig1 = startGrid(NODE_2_CONS_ID); + + verifyCachesAndGroups(ig1); + + IgniteEx ig2 = startGrid(NODE_3_CONS_ID); + + verifyCachesAndGroups(ig2); + + IgniteEx client = startGrid("client01"); + + verifyCachesAndGroups(client); + } + + /** */ + private void verifyCachesAndGroups(IgniteEx ignite) { + Map<String, DynamicCacheDescriptor> caches = ignite.context().cache().cacheDescriptors(); + + assertEquals(2, caches.size()); + caches.keySet().contains(GridCacheUtils.UTILITY_CACHE_NAME); + caches.keySet().contains(STATIC_CACHE_NAME); + + Map<Integer, CacheGroupDescriptor> groups = ignite.context().cache().cacheGroupDescriptors(); + + assertEquals(2, groups.size()); + + boolean defaultGroupFound = false; + boolean staticCachesGroupFound = false; + + for (CacheGroupDescriptor grpDesc : groups.values()) { + if (grpDesc.cacheOrGroupName().equals(GridCacheUtils.UTILITY_CACHE_NAME)) + defaultGroupFound = true; + else if (grpDesc.cacheOrGroupName().equals(GROUP_WITH_STATIC_CACHES)) + staticCachesGroupFound = true; + } + + assertTrue(String.format("Default group found: %b, static group found: %b", + defaultGroupFound, + staticCachesGroupFound), + defaultGroupFound && staticCachesGroupFound); + } + + /** */ + private void prepareDynamicCaches(IgniteEx ig) { + ig.getOrCreateCache(new CacheConfiguration<>(DYNAMIC_CACHE_NAME_1) + .setGroupName(GROUP_WITH_STATIC_CACHES) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setNodeFilter(nodeFilter) + ); + + ig.getOrCreateCache(new CacheConfiguration<>(DYNAMIC_CACHE_NAME_2) + .setGroupName(GROUP_WITH_DYNAMIC_CACHES) + .setAffinity(new RendezvousAffinityFunction(false, 16)) + .setNodeFilter((IgnitePredicate<ClusterNode>)node -> node.consistentId().toString().contains(NODE_1_CONS_ID)) + ); + } + + /** + * Turns off printing stack trace on detecting critical failure to speed up tests. + */ + @BeforeClass + public static void setUpClass() { + System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "false"); + } + + /** + * Restoring default value for printing stack trace setting. + */ + @AfterClass + public static void tearDownClass() { + System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "true"); + } + + /** {@inheritDoc} */ + @After + @Override public void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index a1a2f42..6cfa534 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxPreloadNoWriteTe import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest; import org.apache.ignite.internal.processors.cache.IgniteCacheTxStoreValueTest; import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest; +import org.apache.ignite.internal.processors.cache.IgniteDiscoveryDataHandlingInNewClusterTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartCoordinatorFailoverTest; @@ -258,6 +259,7 @@ public class IgniteCacheTestSuite4 { GridTestUtils.addTestIfNeeded(suite, IgniteCacheCreatePutTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheStartOnJoinTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCacheStartTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteDiscoveryDataHandlingInNewClusterTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheDiscoveryDataConcurrentJoinTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteClientCacheInitializationFailTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCacheFailedUpdateResponseTest.class, ignoredTests);