This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2ccb962aded IGNITE-21929 Fix pending tree extra cleanup on entries
expiration - Fixes #11300.
2ccb962aded is described below
commit 2ccb962adedbae93c521776a2f62307d2e8b21b7
Author: Yuri Naryshkin <[email protected]>
AuthorDate: Fri Apr 5 10:46:21 2024 +0300
IGNITE-21929 Fix pending tree extra cleanup on entries expiration - Fixes
#11300.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../apache/ignite/internal/binary/BinaryUtils.java | 23 +----
.../cache/IgniteCacheOffheapManagerImpl.java | 37 ++++++-
.../cache/GridCacheTtlManagerSelfTest.java | 111 +++++++++++++++++++++
3 files changed, 148 insertions(+), 23 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index f513d3ddd2f..72790b94317 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -67,13 +67,7 @@ import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.internal.binary.builder.BinaryLazyValue;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import
org.apache.ignite.internal.processors.cacheobject.PlatformCacheObjectImpl;
-import
org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl;
-import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
-import
org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.util.MutableSingletonList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -738,20 +732,7 @@ public class BinaryUtils {
* @return True if this is an object of a known type.
*/
public static boolean knownCacheObject(Object obj) {
- if (obj == null)
- return false;
-
- Class<?> cls = obj.getClass();
-
- return cls == KeyCacheObjectImpl.class ||
- cls == BinaryObjectImpl.class ||
- cls == CacheObjectImpl.class ||
- cls == CacheObjectByteArrayImpl.class ||
- cls == BinaryEnumObjectImpl.class ||
- cls == UserKeyCacheObjectImpl.class ||
- cls == UserCacheObjectImpl.class ||
- cls == PlatformCacheObjectImpl.class ||
- cls == UserCacheObjectByteArrayImpl.class;
+ return obj instanceof CacheObject;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index dbb2a3601ca..754aeeee880 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1138,7 +1138,8 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
if (obsoleteVer == null)
obsoleteVer = cctx.cache().nextVersion();
- GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+ GridCacheEntryEx entry = cctx.cache().entryEx(row.key
instanceof KeyCacheObjectImpl
+ ? new
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) :
row.key);
if (entry != null)
c.apply(entry, obsoleteVer);
@@ -1718,7 +1719,11 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
*/
private void finishRemove(GridCacheContext cctx, KeyCacheObject key,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
if (oldRow != null) {
- clearPendingEntries(cctx, oldRow);
+ if (!(key instanceof ExpiredKeyCacheObject)
+ || ((ExpiredKeyCacheObject)key).expireTime !=
oldRow.expireTime()
+ || ((ExpiredKeyCacheObject)key).link != oldRow.link()
+ )
+ clearPendingEntries(cctx, oldRow);
decrementSize(cctx.cacheId());
}
@@ -1977,4 +1982,32 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
return null;
}
}
+
+ /**
+ * This entry key is used to indicate that an expired entry has already
been deleted from
+ * PendingEntriesTree and doesn't need to participate in
PendingEntriesTree cleanup again.
+ */
+ private static class ExpiredKeyCacheObject extends KeyCacheObjectImpl {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long expireTime;
+
+ /** */
+ private long link;
+
+ /** */
+ private ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long
expireTime, long link) {
+ super(keyCacheObj.val, keyCacheObj.valBytes,
keyCacheObj.partition());
+
+ this.expireTime = expireTime;
+
+ this.link = link;
+ }
+
+ /** */
+ public ExpiredKeyCacheObject() {
+ }
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
index 217fb107bc1..151bb9f130d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
@@ -17,12 +17,26 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.TouchedExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
@@ -104,4 +118,101 @@ public class GridCacheTtlManagerSelfTest extends
GridCommonAbstractTest {
stopAllGrids();
}
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPartitionedRemove() throws Exception {
+ checkRemove(PARTITIONED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReplicatedRemove() throws Exception {
+ checkRemove(REPLICATED);
+ }
+
+ /**
+ * @param mode Cache mode.
+ * @throws Exception If failed.
+ */
+ private void checkRemove(CacheMode mode) throws Exception {
+ cacheMode = mode;
+
+ Map<String, Integer> calls = new ConcurrentHashMap<>();
+
+ BPlusTree.testHndWrapper = (tree, hnd) -> {
+ if (tree instanceof PendingEntriesTree) {
+ return new PageHandler<Object, BPlusTree.Result>() {
+ @Override public BPlusTree.Result run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO io,
+ Boolean walPlc,
+ Object arg,
+ int lvl,
+ IoStatisticsHolder statHolder
+ ) throws IgniteCheckedException {
+ calls.merge(arg.getClass().getSimpleName(), 1,
Integer::sum);
+
+ return ((PageHandler<Object,
BPlusTree.Result>)hnd).run(cacheId, pageId, page, pageAddr, io,
+ walPlc, arg, lvl, statHolder);
+ }
+
+ @Override public boolean releaseAfterWrite(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ Object arg,
+ int intArg
+ ) {
+ return ((PageHandler<Object, BPlusTree.Result>)hnd)
+ .releaseAfterWrite(cacheId, pageId, page,
pageAddr, arg, intArg);
+ }
+ };
+ }
+
+ return hnd;
+ };
+
+ try (IgniteEx g = startGrid(0)) {
+ final String key = "key";
+
+ final int records = 1500;
+
+ IgniteCache<Object, Object> cache =
g.cache(DEFAULT_CACHE_NAME).withExpiryPolicy(
+ new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)));
+
+ IntStream.range(0, records).forEach(x -> cache.put(key + x, x));
+
+ assertTrue(GridTestUtils.waitForCondition(
+ () -> {
+ try {
+ return
g.context().cache().cache(DEFAULT_CACHE_NAME).context().ttl().pendingSize() ==
0;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }, 5_000L)
+ );
+
+ log.info("Invocation counts\n" + calls.keySet().stream()
+ .map(k -> k + ": " + calls.get(k))
+ .collect(Collectors.joining("\n")));
+
+ assertNotNull(calls.get("RemoveRange"));
+ assertNull(calls.get("Remove"));
+
+ IntStream.range(0, records).forEach(x -> assertNull(cache.get(key
+ x)));
+ }
+ finally {
+ BPlusTree.testHndWrapper = null;
+ }
+ }
}