This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0e978105b0d IGNITE-20507 Fixed StoredCacheData removing when a node
filter is set (#10970)
0e978105b0d is described below
commit 0e978105b0d52ee8c675e683baca17d9e6ab575d
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Oct 5 15:50:34 2023 +0300
IGNITE-20507 Fixed StoredCacheData removing when a node filter is set
(#10970)
---
.../GridCommandHandlerIndexForceRebuildTest.java | 8 -
.../processors/cache/ClusterCachesInfo.java | 15 ++
.../processors/cache/GridCacheProcessor.java | 33 +++-
.../internal/processors/cache/GridCacheUtils.java | 30 ---
.../cache/persistence/tree/util/PageHandler.java | 9 -
.../ignite/cache/NodeWithFilterRestartTest.java | 210 ++++++++++++++++++---
6 files changed, 225 insertions(+), 80 deletions(-)
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
index 34aa9b51239..806794d73f1 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
@@ -61,7 +61,6 @@ import static java.lang.String.valueOf;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
import static
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.internal.management.api.CommandUtils.INDENT;
-import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.apache.ignite.internal.util.IgniteUtils.max;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
@@ -239,13 +238,6 @@ public class GridCommandHandlerIndexForceRebuildTest
extends GridCommandHandlerA
grid(LAST_NODE_NUM).destroyCache("cacheWithNodeFilter");
awaitPartitionMapExchange();
-
- // TODO Remove after IGNITE-20507.
- // Cleaning cache meta being kept.
- for (Ignite ig : G.allGrids()) {
- U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(),
DFLT_STORE_DIR + '/' + ig.name()
- + "/cache-cacheWithNodeFilter", false));
- }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 086443b7732..307c86eb1cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -1773,6 +1773,21 @@ public class ClusterCachesInfo {
return null;
}
+ /**
+ * @param cacheName Cache name.
+ */
+ public @Nullable DynamicCacheDescriptor markedForDeletionCache(String
cacheName) {
+ // Find the "earliest" available descriptor.
+ for (Map<String, DynamicCacheDescriptor> descriptors :
markedForDeletionCaches.values()) {
+ DynamicCacheDescriptor desc = descriptors.get(cacheName);
+
+ if (desc != null)
+ return desc;
+ }
+
+ return null;
+ }
+
/**
* Save dynamic cache descriptor on disk.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e223f45e6ec..482ea647e8a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1086,15 +1086,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
U.stopLifecycleAware(log, lifecycleAwares(ctx.group(),
cache.configuration(), ctx.store().configuredStore()));
- if (callDestroy && CU.storeCacheConfig(sharedCtx, ctx.config())) {
- try {
- locCfgMgr.removeCacheData(new
StoredCacheData(ctx.config()));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to delete cache configuration data
while destroying cache" +
- "[cache=" + ctx.name() + "]", e);
- }
- }
+ if (callDestroy)
+ removeCacheConfig(ctx.config());
if (log.isInfoEnabled()) {
if (ctx.group().sharedGroup())
@@ -1108,6 +1101,19 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
}
}
+ /** */
+ private void removeCacheConfig(CacheConfiguration<?, ?> cacheCfg) {
+ if (CU.storeCacheConfig(sharedCtx, cacheCfg)) {
+ try {
+ locCfgMgr.removeCacheData(new StoredCacheData(cacheCfg));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to delete cache configuration data while
destroying cache" +
+ "[cache=" + cacheCfg.getName() + "]", e);
+ }
+ }
+ }
+
/**
* @param cache Cache.
* @throws IgniteCheckedException If failed.
@@ -2607,9 +2613,16 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
stopCache(cache, true, callDestroy, clearCache, clearDbObjects);
}
- else
+ else {
// Try to unregister query structures for not started caches.
ctx.query().onCacheStop(cacheName);
+
+ // Cache adapter may not exist due to the node filter.
+ DynamicCacheDescriptor cacheToDelete = callDestroy ?
cachesInfo.markedForDeletionCache(cacheName) : null;
+
+ if (cacheToDelete != null)
+ removeCacheConfig(cacheToDelete.cacheConfiguration());
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index c1680aa89ab..8878d158e79 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -334,29 +334,6 @@ public class GridCacheUtils {
}
};
- /** Transaction entry to key. */
- private static final IgniteClosure tx2key = new C1<IgniteTxEntry,
Object>() {
- @Override public Object apply(IgniteTxEntry e) {
- return e.key();
- }
-
- @Override public String toString() {
- return "Cache transaction entry to key converter.";
- }
- };
-
- /** Transaction entry to key. */
- private static final IgniteClosure txCol2key = new
C1<Collection<IgniteTxEntry>, Collection<Object>>() {
- @SuppressWarnings( {"unchecked"})
- @Override public Collection<Object> apply(Collection<IgniteTxEntry> e)
{
- return F.viewReadOnly(e, tx2key);
- }
-
- @Override public String toString() {
- return "Cache transaction entry collection to key collection
converter.";
- }
- };
-
/** Converts transaction to XID version. */
private static final IgniteClosure tx2xidVer = new C1<IgniteInternalTx,
GridCacheVersion>() {
@Override public GridCacheVersion apply(IgniteInternalTx tx) {
@@ -368,13 +345,6 @@ public class GridCacheUtils {
}
};
- /** Converts tx entry to entry. */
- private static final IgniteClosure tx2entry = new C1<IgniteTxEntry,
GridCacheEntryEx>() {
- @Override public GridCacheEntryEx apply(IgniteTxEntry e) {
- return e.cached();
- }
- };
-
/** Transaction entry to key. */
private static final IgniteClosure entry2key = new C1<GridCacheEntryEx,
KeyCacheObject>() {
@Override public KeyCacheObject apply(GridCacheEntryEx e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
index 5fb36db276a..39ce3b4873f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
@@ -539,13 +539,4 @@ public abstract class PageHandler<X, R> {
public static void copyMemory(long srcAddr, long srcOff, long dstAddr,
long dstOff, long cnt) {
GridUnsafe.copyMemory(null, srcAddr + srcOff, null, dstAddr + dstOff,
cnt);
}
-
- /**
- * @param addr Address.
- * @param off Offset.
- * @param len Length.
- */
- public static void zeroMemory(long addr, int off, int len) {
- GridUnsafe.zeroMemory(addr + off, len);
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cache/NodeWithFilterRestartTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/NodeWithFilterRestartTest.java
index d701a5eae67..9ff7fcd78ae 100644
---
a/modules/core/src/test/java/org/apache/ignite/cache/NodeWithFilterRestartTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/cache/NodeWithFilterRestartTest.java
@@ -17,10 +17,17 @@
package org.apache.ignite.cache;
+import java.io.File;
import java.util.Arrays;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -28,6 +35,7 @@ import
org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
@@ -38,6 +46,8 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
/**
*
*/
@@ -45,42 +55,74 @@ public class NodeWithFilterRestartTest extends
GridCommonAbstractTest {
/** */
private static final TcpDiscoveryVmIpFinder IP_FINDER = new
TcpDiscoveryVmIpFinder(true);
+ /** */
+ private boolean persistence;
+
+ /** */
+ private boolean testAttribute;
+
+ /** */
+ private boolean blockPme = true;
+
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
- if (getTestIgniteInstanceName(5).equals(igniteInstanceName))
- cfg.setUserAttributes(F.asMap("FILTER", "true"));
+ stopAllGrids();
-// if (getTestIgniteInstanceName(3).equals(igniteInstanceName))
-// cfg.setUserAttributes(F.asMap("FILTER", "true"));
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
- if (getTestIgniteInstanceName(0).equals(igniteInstanceName)) {
- TestRecordingCommunicationSpi commSpi = new
TestRecordingCommunicationSpi();
+ if (blockPme) {
+ if (getTestIgniteInstanceName(5).equals(igniteInstanceName))
+ cfg.setUserAttributes(F.asMap("FILTER", "true"));
+
+ if (getTestIgniteInstanceName(0).equals(igniteInstanceName)) {
+ TestRecordingCommunicationSpi commSpi = new
TestRecordingCommunicationSpi();
- commSpi.blockMessages(new IgniteBiPredicate<ClusterNode,
Message>() {
- /** {@inheritDoc} */
- @Override public boolean apply(ClusterNode node, Message msg) {
- if (msg instanceof GridDhtPartitionsFullMessage &&
(node.id().getLeastSignificantBits() & 0xFFFF) == 5) {
- GridDhtPartitionsFullMessage fullMsg =
(GridDhtPartitionsFullMessage)msg;
+ commSpi.blockMessages(new IgniteBiPredicate<ClusterNode,
Message>() {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node, Message
msg) {
+ if (msg instanceof GridDhtPartitionsFullMessage &&
(node.id().getLeastSignificantBits() & 0xFFFF) == 5) {
+ GridDhtPartitionsFullMessage fullMsg =
(GridDhtPartitionsFullMessage)msg;
- if (fullMsg.exchangeId() != null &&
fullMsg.topologyVersion().equals(new AffinityTopologyVersion(8, 0))) {
- info("Going to block message [node=" + node + ",
msg=" + msg + ']');
+ if (fullMsg.exchangeId() != null &&
fullMsg.topologyVersion().equals(new AffinityTopologyVersion(8, 0))) {
+ info("Going to block message [node=" + node +
", msg=" + msg + ']');
- return true;
+ return true;
+ }
}
+
+ return false;
}
+ });
- return false;
- }
- });
+ cfg.setCommunicationSpi(commSpi);
+ }
+ else
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ }
- cfg.setCommunicationSpi(commSpi);
+ if (persistence) {
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration().
+ setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setPersistenceEnabled(true)));
}
- else
- cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ if (testAttribute)
+ cfg.setUserAttributes(F.asMap("FILTER", "true"));
return cfg;
}
@@ -136,10 +178,132 @@ public class NodeWithFilterRestartTest extends
GridCommonAbstractTest {
}
}
+ /** */
+ @Test
+ public void testNodeRejoinsClusterAfterFilteredCacheRemoved() throws
Exception {
+ blockPme = false;
+ persistence = true;
+ testAttribute = true;
+
+ Ignite ig = startGrids(2);
+
+ int filteredGridIdx = G.allGrids().size();
+
+ startFilteredGrid(filteredGridIdx);
+
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ ig.cluster().baselineAutoAdjustEnabled(false);
+
+
grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+ int nonBaselineIdx = G.allGrids().size();
+
+ startGrid(nonBaselineIdx);
+
+ CacheConfiguration cacheCfg = createAndFillCache();
+
+ File cacheMetaPath1 =
grid(filteredGridIdx).context().cache().configManager().cacheConfigurationFile(cacheCfg);
+ File cacheMetaPath2 =
grid(nonBaselineIdx).context().cache().configManager().cacheConfigurationFile(cacheCfg);
+
+ assertTrue(cacheMetaPath1.exists() && cacheMetaPath1.isFile());
+ assertTrue(cacheMetaPath2.exists() && cacheMetaPath2.isFile());
+
+ grid(0).destroyCache(DEFAULT_CACHE_NAME);
+ awaitPartitionMapExchange();
+
+ assertFalse(cacheMetaPath1.exists() && cacheMetaPath1.isFile());
+ assertFalse(cacheMetaPath2.exists() && cacheMetaPath2.isFile());
+
+ // Try just restart grid.stopGrid(filteredGridIdx);
+ stopGrid(filteredGridIdx);
+ stopGrid(nonBaselineIdx);
+
+ startFilteredGrid(filteredGridIdx);
+ startGrid(nonBaselineIdx);
+
+ createAndFillCache();
+
+ assertTrue(cacheMetaPath1.exists() && cacheMetaPath1.isFile());
+
+ // Test again with the local cache proxy.
+ assertEquals(100,
grid(filteredGridIdx).cache(DEFAULT_CACHE_NAME).size());
+
+ grid(0).destroyCache(DEFAULT_CACHE_NAME);
+ awaitPartitionMapExchange();
+
+ assertFalse(cacheMetaPath1.exists() && cacheMetaPath1.isFile());
+
+ stopGrid(filteredGridIdx);
+ startFilteredGrid(filteredGridIdx);
+ }
+
+ /**
+ * Ensures cache with a node filter is not lost when all nodes restarted.
+ */
+ @Test
+ public void testAllNodesRestarted() throws Exception {
+ blockPme = false;
+ persistence = true;
+ testAttribute = true;
+
+ startFilteredGrid(0);
+ startGrid(1);
+ startGrid(2);
+
+ grid(1).cluster().state(ClusterState.ACTIVE);
+
+ createAndFillCache();
+
+ stopAllGrids();
+
+ startFilteredGrid(0);
+
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ assertThrows(null, () -> {
+
grid(0).createCache(defaultCacheConfiguration().setName(DEFAULT_CACHE_NAME));
+ }, IgniteException.class, "cache with the same name is already
started");
+
+ startGrid(1);
+ startGrid(2);
+
+ assertEquals(100, grid(0).cache(DEFAULT_CACHE_NAME).size());
+ assertEquals(100, grid(1).cache(DEFAULT_CACHE_NAME).size());
+ }
+
+ /** */
+ private IgniteEx startFilteredGrid(int idx) throws Exception {
+ testAttribute = false;
+
+ IgniteEx res = startGrid(idx);
+
+ testAttribute = true;
+
+ return res;
+ }
+
+ /** */
+ private CacheConfiguration createAndFillCache() throws
InterruptedException {
+ final CacheConfiguration<Object, Object> cfg =
defaultCacheConfiguration()
+ .setBackups(1)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setNodeFilter(new NodeFilter());
+
+ grid(1).createCache(cfg);
+
+ try (IgniteDataStreamer<Integer, Integer> ds =
grid(1).dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < 100; ++i)
+ ds.addData(i, i);
+ }
+
+ return cfg;
+ }
+
/**
*
*/
- private static class NodeFilter implements IgnitePredicate<ClusterNode> {
+ private static final class NodeFilter implements
IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode clusterNode) {
return "true".equals(clusterNode.attribute("FILTER"));