This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ccb962aded IGNITE-21929 Fix pending tree extra cleanup on entries 
expiration - Fixes #11300.
2ccb962aded is described below

commit 2ccb962adedbae93c521776a2f62307d2e8b21b7
Author: Yuri Naryshkin <[email protected]>
AuthorDate: Fri Apr 5 10:46:21 2024 +0300

    IGNITE-21929 Fix pending tree extra cleanup on entries expiration - Fixes 
#11300.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../apache/ignite/internal/binary/BinaryUtils.java |  23 +----
 .../cache/IgniteCacheOffheapManagerImpl.java       |  37 ++++++-
 .../cache/GridCacheTtlManagerSelfTest.java         | 111 +++++++++++++++++++++
 3 files changed, 148 insertions(+), 23 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index f513d3ddd2f..72790b94317 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -67,13 +67,7 @@ import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.internal.binary.builder.BinaryLazyValue;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import 
org.apache.ignite.internal.processors.cacheobject.PlatformCacheObjectImpl;
-import 
org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl;
-import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
-import 
org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.util.MutableSingletonList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -738,20 +732,7 @@ public class BinaryUtils {
      * @return True if this is an object of a known type.
      */
     public static boolean knownCacheObject(Object obj) {
-        if (obj == null)
-            return false;
-
-        Class<?> cls = obj.getClass();
-
-        return cls == KeyCacheObjectImpl.class ||
-            cls == BinaryObjectImpl.class ||
-            cls == CacheObjectImpl.class ||
-            cls == CacheObjectByteArrayImpl.class ||
-            cls == BinaryEnumObjectImpl.class ||
-            cls == UserKeyCacheObjectImpl.class ||
-            cls == UserCacheObjectImpl.class ||
-            cls == PlatformCacheObjectImpl.class ||
-            cls == UserCacheObjectByteArrayImpl.class;
+        return obj instanceof CacheObject;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index dbb2a3601ca..754aeeee880 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1138,7 +1138,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                     if (obsoleteVer == null)
                         obsoleteVer = cctx.cache().nextVersion();
 
-                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key 
instanceof KeyCacheObjectImpl
+                        ? new 
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) : 
row.key);
 
                     if (entry != null)
                         c.apply(entry, obsoleteVer);
@@ -1718,7 +1719,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
          */
         private void finishRemove(GridCacheContext cctx, KeyCacheObject key, 
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
             if (oldRow != null) {
-                clearPendingEntries(cctx, oldRow);
+                if (!(key instanceof ExpiredKeyCacheObject)
+                    || ((ExpiredKeyCacheObject)key).expireTime != 
oldRow.expireTime()
+                    || ((ExpiredKeyCacheObject)key).link != oldRow.link()
+                )
+                    clearPendingEntries(cctx, oldRow);
 
                 decrementSize(cctx.cacheId());
             }
@@ -1977,4 +1982,32 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             return null;
         }
     }
+
+    /**
+     * This entry key is used to indicate that an expired entry has already 
been deleted from
+     * PendingEntriesTree and doesn't need to participate in 
PendingEntriesTree cleanup again.
+     */
+    private static class ExpiredKeyCacheObject extends KeyCacheObjectImpl {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private long expireTime;
+
+        /** */
+        private long link;
+
+        /** */
+        private ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long 
expireTime, long link) {
+            super(keyCacheObj.val, keyCacheObj.valBytes, 
keyCacheObj.partition());
+
+            this.expireTime = expireTime;
+
+            this.link = link;
+        }
+
+        /** */
+        public ExpiredKeyCacheObject() {
+        }
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
index 217fb107bc1..151bb9f130d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
@@ -17,12 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.TouchedExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.util.typedef.CAX;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -104,4 +118,101 @@ public class GridCacheTtlManagerSelfTest extends 
GridCommonAbstractTest {
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPartitionedRemove() throws Exception {
+        checkRemove(PARTITIONED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReplicatedRemove() throws Exception {
+        checkRemove(REPLICATED);
+    }
+
+    /**
+     * @param mode Cache mode.
+     * @throws Exception If failed.
+     */
+    private void checkRemove(CacheMode mode) throws Exception {
+        cacheMode = mode;
+
+        Map<String, Integer> calls = new ConcurrentHashMap<>();
+
+        BPlusTree.testHndWrapper = (tree, hnd) -> {
+            if (tree instanceof PendingEntriesTree) {
+                return new PageHandler<Object, BPlusTree.Result>() {
+                    @Override public BPlusTree.Result run(
+                        int cacheId,
+                        long pageId,
+                        long page,
+                        long pageAddr,
+                        PageIO io,
+                        Boolean walPlc,
+                        Object arg,
+                        int lvl,
+                        IoStatisticsHolder statHolder
+                    ) throws IgniteCheckedException {
+                        calls.merge(arg.getClass().getSimpleName(), 1, 
Integer::sum);
+
+                        return ((PageHandler<Object, 
BPlusTree.Result>)hnd).run(cacheId, pageId, page, pageAddr, io,
+                            walPlc, arg, lvl, statHolder);
+                    }
+
+                    @Override public boolean releaseAfterWrite(
+                        int cacheId,
+                        long pageId,
+                        long page,
+                        long pageAddr,
+                        Object arg,
+                        int intArg
+                    ) {
+                        return ((PageHandler<Object, BPlusTree.Result>)hnd)
+                            .releaseAfterWrite(cacheId, pageId, page, 
pageAddr, arg, intArg);
+                    }
+                };
+            }
+
+            return hnd;
+        };
+
+        try (IgniteEx g = startGrid(0)) {
+            final String key = "key";
+
+            final int records = 1500;
+
+            IgniteCache<Object, Object> cache = 
g.cache(DEFAULT_CACHE_NAME).withExpiryPolicy(
+                new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)));
+
+            IntStream.range(0, records).forEach(x -> cache.put(key + x, x));
+
+            assertTrue(GridTestUtils.waitForCondition(
+                () -> {
+                    try {
+                        return 
g.context().cache().cache(DEFAULT_CACHE_NAME).context().ttl().pendingSize() == 
0;
+                    }
+                    catch (Exception e) {
+                        throw new IgniteException(e);
+                    }
+                }, 5_000L)
+            );
+
+            log.info("Invocation counts\n" + calls.keySet().stream()
+                .map(k -> k + ": " + calls.get(k))
+                .collect(Collectors.joining("\n")));
+
+            assertNotNull(calls.get("RemoveRange"));
+            assertNull(calls.get("Remove"));
+
+            IntStream.range(0, records).forEach(x -> assertNull(cache.get(key 
+ x)));
+        }
+        finally {
+            BPlusTree.testHndWrapper = null;
+        }
+    }
 }

Reply via email to