This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch ignite-2.13
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.13 by this push:
     new 8a1318c5d44 IGNITE-16118 Fixed a rare issue with updating the TTL in 
the near-cache/backup if the request was initiated from another backup (#9865)
8a1318c5d44 is described below

commit 8a1318c5d44246c0a42028d1333f568aa6b1e0b6
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Apr 6 12:14:12 2022 +0300

    IGNITE-16118 Fixed a rare issue with updating the TTL in the 
near-cache/backup if the request was initiated from another backup (#9865)
    
    (cherry picked from commit a81c3ac87d0957b7d2387cbaf92d142434d26a7e)
---
 .../processors/cache/GridCacheEntryEx.java         |   7 ++
 .../processors/cache/GridCacheMapEntry.java        |  51 ++++++--
 .../cache/distributed/dht/GridDhtCacheAdapter.java | 134 ++++++---------------
 .../processors/cache/GridCacheTestEntryEx.java     |   5 +
 4 files changed, 87 insertions(+), 110 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 6b5bd6274a4..a019a60f6e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -1074,6 +1074,13 @@ public interface GridCacheEntryEx {
      */
     public long ttl() throws GridCacheEntryRemovedException;
 
+    /**
+     * @param ver Version.
+     * @param expiryPlc Expiry policy.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     */
+    public void updateTtl(GridCacheVersion ver, IgniteCacheExpiryPolicy 
expiryPlc) throws GridCacheEntryRemovedException;
+
     /**
      * @param ver Version.
      * @param ttl Time to live.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9131eb4e915..cbaeeb017d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -831,7 +831,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
 
             if (ret != null && expiryPlc != null)
-                updateTtl(expiryPlc);
+                updateTtlUnlocked(version(), expiryPlc);
 
             if (retVer && resVer == null) {
                 resVer = (isNear() && cctx.transactional()) ? 
((GridNearCacheEntry)this).dhtVersion() : this.ver;
@@ -1996,7 +1996,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 if (!pass) {
                     if (expiryPlc != null && !readFromStore && 
!cctx.putIfAbsentFilter(filter) && hasValueUnlocked())
-                        updateTtl(expiryPlc);
+                        updateTtlUnlocked(expiryPlc);
 
                     Object val = retval ?
                         
cctx.cacheObjectContext().unwrapBinaryIfNeeded(CU.value(old, cctx, false), 
keepBinary, false, null)
@@ -2055,7 +2055,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 if (!entry.modified()) {
                     if (expiryPlc != null && !readFromStore && 
hasValueUnlocked())
-                        updateTtl(expiryPlc);
+                        updateTtlUnlocked(expiryPlc);
 
                     updateMetrics(READ, metrics, transformOp, old != null);
 
@@ -2994,34 +2994,37 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      *
      * @param expiryPlc Expiry policy.
      */
-    private void updateTtl(ExpiryPolicy expiryPlc) throws 
IgniteCheckedException, GridCacheEntryRemovedException {
+    private void updateTtlUnlocked(ExpiryPolicy expiryPlc) throws 
IgniteCheckedException {
         long ttl = CU.toTtl(expiryPlc.getExpiryForAccess());
 
         if (ttl != CU.TTL_NOT_CHANGED)
-            updateTtl(ttl);
+            updateTtlUnlocked(ttl);
     }
 
     /**
      * Update TTL is it is changed.
      *
+     * @param ver Version.
      * @param expiryPlc Expiry policy.
      * @throws GridCacheEntryRemovedException If failed.
      */
-    private void updateTtl(IgniteCacheExpiryPolicy expiryPlc) throws 
GridCacheEntryRemovedException,
-        IgniteCheckedException {
+    private void updateTtlUnlocked(
+        GridCacheVersion ver,
+        IgniteCacheExpiryPolicy expiryPlc
+    ) throws GridCacheEntryRemovedException, IgniteCheckedException {
         long ttl = expiryPlc.forAccess();
 
         if (ttl != CU.TTL_NOT_CHANGED) {
-            updateTtl(ttl);
+            updateTtlUnlocked(ttl);
 
-            expiryPlc.ttlUpdated(key(), version(), hasReaders() ? 
((GridDhtCacheEntry)this).readers() : null);
+            expiryPlc.ttlUpdated(key(), ver, hasReaders() ? 
((GridDhtCacheEntry)this).readers() : null);
         }
     }
 
     /**
      * @param ttl Time to live.
      */
-    private void updateTtl(long ttl) throws IgniteCheckedException, 
GridCacheEntryRemovedException {
+    private void updateTtlUnlocked(long ttl) throws IgniteCheckedException {
         assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
         assert lock.isHeldByCurrentThread();
 
@@ -3176,7 +3179,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     CacheObject val = this.val;
 
                     if (val != null && expiryPlc != null)
-                        updateTtl(expiryPlc);
+                        updateTtlUnlocked(version(), expiryPlc);
 
                     return val;
                 }
@@ -4202,6 +4205,30 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         return cctx.tm().localTx();
     }
 
+    /** {@inheritDoc} */
+    @Override public void updateTtl(
+        GridCacheVersion ver,
+        IgniteCacheExpiryPolicy expiryPlc
+    ) throws GridCacheEntryRemovedException {
+        lockEntry();
+
+        try {
+            checkObsolete();
+
+            if (hasValueUnlocked()) {
+                try {
+                    updateTtlUnlocked(ver, expiryPlc);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to update TTL: " + e, e);
+                }
+            }
+        }
+        finally {
+            unlockEntry();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl) 
throws GridCacheEntryRemovedException {
         lockEntry();
@@ -4211,7 +4238,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             if (hasValueUnlocked()) {
                 try {
-                    updateTtl(ttl);
+                    updateTtlUnlocked(ttl);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to update TTL: " + e, e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index f6744e86b7d..90b8ec8ebce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -992,7 +992,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @param expiryPlc Expiry policy.
      */
     public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy 
expiryPlc) {
-        if (expiryPlc != null && !F.isEmpty(expiryPlc.entries())) {
+        if (expiryPlc != null)
+            sendTtlUpdateRequest(expiryPlc, 
ctx.shared().exchange().readyAffinityVersion(), null);
+    }
+
+    /**
+     * @param expiryPlc Expiry policy.
+     * @param topVer Topology version.
+     * @param srcNodeId ID of the node that sent the initial ttl request.
+     */
+    private void sendTtlUpdateRequest(
+        IgniteCacheExpiryPolicy expiryPlc,
+        AffinityTopologyVersion topVer,
+        @Nullable UUID srcNodeId
+    ) {
+        if (!F.isEmpty(expiryPlc.entries())) {
             ctx.closures().runLocalSafe(new GridPlainRunnable() {
                 @SuppressWarnings({"ForLoopReplaceableByForEach"})
                 @Override public void run() {
@@ -1002,8 +1016,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
                     Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new 
HashMap<>();
 
-                    AffinityTopologyVersion topVer = 
ctx.shared().exchange().readyAffinityVersion();
-
                     for (Map.Entry<KeyCacheObject, GridCacheVersion> e : 
entries.entrySet()) {
                         ClusterNode primaryNode = 
ctx.affinity().primaryByKey(e.getKey(), topVer);
 
@@ -1012,6 +1024,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
                             for (Iterator<ClusterNode> nodesIter = 
nodes.iterator(); nodesIter.hasNext(); ) {
                                 ClusterNode node = nodesIter.next();
+
+                                // There's no need to re-send ttl update 
request to the node
+                                // that sent the initial ttl update request.
+                                if (srcNodeId != null && 
srcNodeId.equals(node.id()))
+                                    continue;
+
                                 GridCacheTtlUpdateRequest req = 
reqMap.get(node);
 
                                 if (req == null) {
@@ -1024,6 +1042,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                             }
                         }
                         else {
+                            // There is no need to re-send ttl update requests 
if we are not on the primary node.
+                            if (srcNodeId != null)
+                                continue;
+
                             GridCacheTtlUpdateRequest req = 
reqMap.get(primaryNode);
 
                             if (req == null) {
@@ -1077,122 +1099,38 @@ public abstract class GridDhtCacheAdapter<K, V> 
extends GridDistributedCacheAdap
         }
     }
 
-    /**
-     * @param srcNodeId The Id of a node that sends original ttl request.
-     * @param incomingReq Original ttl request.
-     */
-    private void sendTtlUpdateRequest(UUID srcNodeId, 
GridCacheTtlUpdateRequest incomingReq) {
-        ctx.closures().runLocalSafe(new GridPlainRunnable() {
-            @SuppressWarnings({"ForLoopReplaceableByForEach"})
-            @Override public void run() {
-                Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new 
HashMap<>();
-
-                for (int i = 0; i < incomingReq.keys().size(); i++) {
-                    KeyCacheObject key = incomingReq.keys().get(i);
-
-                    // It's only required to broadcast ttl update requests if 
we are on primary node for given key.
-                    if (!ctx.affinity().primaryByKey(key, 
incomingReq.topologyVersion()).isLocal())
-                        continue;
-
-                    Collection<ClusterNode> nodes = 
ctx.affinity().backupsByKey(key, incomingReq.topologyVersion());
-
-                    for (Iterator<ClusterNode> nodesIter = nodes.iterator(); 
nodesIter.hasNext(); ) {
-                        ClusterNode node = nodesIter.next();
-
-                        // There's no need to send and update ttl request to 
the node that send us the initial
-                        // ttl update request.
-                        if (node.id().equals(srcNodeId))
-                            continue;
-
-                        GridCacheTtlUpdateRequest req = reqMap.get(node);
-
-                        if (req == null) {
-                            reqMap.put(node, req = new 
GridCacheTtlUpdateRequest(ctx.cacheId(),
-                                incomingReq.topologyVersion(),
-                                incomingReq.ttl()));
-                        }
-
-                        req.addEntry(key, incomingReq.version(i));
-                    }
-
-                    GridDhtCacheEntry entry = ctx.dht().entryExx(key, 
incomingReq.topologyVersion());
-
-                    Collection<UUID> readers = null;
-
-                    try {
-                        readers = entry.readers();
-                    }
-                    catch (GridCacheEntryRemovedException e) {
-                        U.error(log, "Failed to send TTL update request.", e);
-                    }
-
-                    for (UUID reader : readers) {
-                        // There's no need to send and update ttl request to 
the node that send us the initial
-                        // ttl update request.
-                        if (reader.equals(srcNodeId))
-                            continue;
-
-                        ClusterNode node = ctx.node(reader);
-
-                        if (node != null) {
-                            GridCacheTtlUpdateRequest req = reqMap.get(node);
-
-                            if (req == null) {
-                                reqMap.put(node, req = new 
GridCacheTtlUpdateRequest(ctx.cacheId(),
-                                    incomingReq.topologyVersion(),
-                                    incomingReq.ttl()));
-                            }
-
-                            req.addNearEntry(key, incomingReq.version(i));
-                        }
-                    }
-                }
-
-                for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : 
reqMap.entrySet()) {
-                    try {
-                        ctx.io().send(req.getKey(), req.getValue(), 
ctx.ioPolicy());
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (e instanceof ClusterTopologyCheckedException) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send TTC update request, 
node left: " + req.getKey());
-                        }
-                        else
-                            U.error(log, "Failed to send TTL update request.", 
e);
-                    }
-                }
-            }
-        });
-    }
-
     /**
      * @param req Request.
      */
     private void processTtlUpdateRequest(UUID srcNodeId, 
GridCacheTtlUpdateRequest req) {
+        final CacheExpiryPolicy expiryPlc = 
CacheExpiryPolicy.fromRemote(CU.TTL_NOT_CHANGED, req.ttl());
+
         if (req.keys() != null)
-            updateTtl(this, req.keys(), req.versions(), req.ttl());
+            updateTtl(this, req.keys(), req.versions(), expiryPlc);
 
         if (req.nearKeys() != null) {
             GridNearCacheAdapter<K, V> near = near();
 
             assert near != null;
 
-            updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
+            updateTtl(near, req.nearKeys(), req.nearVersions(), expiryPlc);
         }
 
-        sendTtlUpdateRequest(srcNodeId, req);
+        sendTtlUpdateRequest(expiryPlc, req.topologyVersion(), srcNodeId);
     }
 
     /**
      * @param cache Cache.
      * @param keys Entries keys.
      * @param vers Entries versions.
-     * @param ttl TTL.
+     * @param expiryPlc Expiry policy.
      */
-    private void updateTtl(GridCacheAdapter<K, V> cache,
+    private void updateTtl(
+        GridCacheAdapter<K, V> cache,
         List<KeyCacheObject> keys,
         List<GridCacheVersion> vers,
-        long ttl) {
+        IgniteCacheExpiryPolicy expiryPlc
+    ) {
         assert !F.isEmpty(keys);
         assert keys.size() == vers.size();
 
@@ -1209,7 +1147,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
                             entry.unswap(false);
 
-                            entry.updateTtl(vers.get(i), ttl);
+                            entry.updateTtl(vers.get(i), expiryPlc);
 
                             break;
                         }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index b38dc0f4006..8be2c860b53 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -876,6 +876,11 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
         return ttl;
     }
 
+    /** @inheritDoc */
+    @Override public void updateTtl(GridCacheVersion ver, 
IgniteCacheExpiryPolicy expiryPlc) {
+        throw new UnsupportedOperationException();
+    }
+
     /** @inheritDoc */
     @Override public void updateTtl(GridCacheVersion ver, long ttl) {
         throw new UnsupportedOperationException();

Reply via email to