This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a81c3ac87d0 IGNITE-16118 Fixed a rare issue with updating the TTL in
the near-cache/backup if the request was initiated from another backup (#9865)
a81c3ac87d0 is described below
commit a81c3ac87d0957b7d2387cbaf92d142434d26a7e
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Apr 6 12:14:12 2022 +0300
IGNITE-16118 Fixed a rare issue with updating the TTL in the
near-cache/backup if the request was initiated from another backup (#9865)
---
.../processors/cache/GridCacheEntryEx.java | 7 ++
.../processors/cache/GridCacheMapEntry.java | 51 ++++++--
.../cache/distributed/dht/GridDhtCacheAdapter.java | 134 ++++++---------------
.../processors/cache/GridCacheTestEntryEx.java | 5 +
4 files changed, 87 insertions(+), 110 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 6b5bd6274a4..a019a60f6e6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -1074,6 +1074,13 @@ public interface GridCacheEntryEx {
*/
public long ttl() throws GridCacheEntryRemovedException;
+ /**
+ * @param ver Version.
+ * @param expiryPlc Expiry policy.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ */
+ public void updateTtl(GridCacheVersion ver, IgniteCacheExpiryPolicy
expiryPlc) throws GridCacheEntryRemovedException;
+
/**
* @param ver Version.
* @param ttl Time to live.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9131eb4e915..cbaeeb017d6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -831,7 +831,7 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
}
if (ret != null && expiryPlc != null)
- updateTtl(expiryPlc);
+ updateTtlUnlocked(version(), expiryPlc);
if (retVer && resVer == null) {
resVer = (isNear() && cctx.transactional()) ?
((GridNearCacheEntry)this).dhtVersion() : this.ver;
@@ -1996,7 +1996,7 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
if (!pass) {
if (expiryPlc != null && !readFromStore &&
!cctx.putIfAbsentFilter(filter) && hasValueUnlocked())
- updateTtl(expiryPlc);
+ updateTtlUnlocked(expiryPlc);
Object val = retval ?
cctx.cacheObjectContext().unwrapBinaryIfNeeded(CU.value(old, cctx, false),
keepBinary, false, null)
@@ -2055,7 +2055,7 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
if (!entry.modified()) {
if (expiryPlc != null && !readFromStore &&
hasValueUnlocked())
- updateTtl(expiryPlc);
+ updateTtlUnlocked(expiryPlc);
updateMetrics(READ, metrics, transformOp, old != null);
@@ -2994,34 +2994,37 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
*
* @param expiryPlc Expiry policy.
*/
- private void updateTtl(ExpiryPolicy expiryPlc) throws
IgniteCheckedException, GridCacheEntryRemovedException {
+ private void updateTtlUnlocked(ExpiryPolicy expiryPlc) throws
IgniteCheckedException {
long ttl = CU.toTtl(expiryPlc.getExpiryForAccess());
if (ttl != CU.TTL_NOT_CHANGED)
- updateTtl(ttl);
+ updateTtlUnlocked(ttl);
}
/**
* Update TTL is it is changed.
*
+ * @param ver Version.
* @param expiryPlc Expiry policy.
* @throws GridCacheEntryRemovedException If failed.
*/
- private void updateTtl(IgniteCacheExpiryPolicy expiryPlc) throws
GridCacheEntryRemovedException,
- IgniteCheckedException {
+ private void updateTtlUnlocked(
+ GridCacheVersion ver,
+ IgniteCacheExpiryPolicy expiryPlc
+ ) throws GridCacheEntryRemovedException, IgniteCheckedException {
long ttl = expiryPlc.forAccess();
if (ttl != CU.TTL_NOT_CHANGED) {
- updateTtl(ttl);
+ updateTtlUnlocked(ttl);
- expiryPlc.ttlUpdated(key(), version(), hasReaders() ?
((GridDhtCacheEntry)this).readers() : null);
+ expiryPlc.ttlUpdated(key(), ver, hasReaders() ?
((GridDhtCacheEntry)this).readers() : null);
}
}
/**
* @param ttl Time to live.
*/
- private void updateTtl(long ttl) throws IgniteCheckedException,
GridCacheEntryRemovedException {
+ private void updateTtlUnlocked(long ttl) throws IgniteCheckedException {
assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
assert lock.isHeldByCurrentThread();
@@ -3176,7 +3179,7 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
CacheObject val = this.val;
if (val != null && expiryPlc != null)
- updateTtl(expiryPlc);
+ updateTtlUnlocked(version(), expiryPlc);
return val;
}
@@ -4202,6 +4205,30 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
return cctx.tm().localTx();
}
+ /** {@inheritDoc} */
+ @Override public void updateTtl(
+ GridCacheVersion ver,
+ IgniteCacheExpiryPolicy expiryPlc
+ ) throws GridCacheEntryRemovedException {
+ lockEntry();
+
+ try {
+ checkObsolete();
+
+ if (hasValueUnlocked()) {
+ try {
+ updateTtlUnlocked(ver, expiryPlc);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to update TTL: " + e, e);
+ }
+ }
+ }
+ finally {
+ unlockEntry();
+ }
+ }
+
/** {@inheritDoc} */
@Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl)
throws GridCacheEntryRemovedException {
lockEntry();
@@ -4211,7 +4238,7 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
if (hasValueUnlocked()) {
try {
- updateTtl(ttl);
+ updateTtlUnlocked(ttl);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to update TTL: " + e, e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index f6744e86b7d..90b8ec8ebce 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -992,7 +992,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
* @param expiryPlc Expiry policy.
*/
public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy
expiryPlc) {
- if (expiryPlc != null && !F.isEmpty(expiryPlc.entries())) {
+ if (expiryPlc != null)
+ sendTtlUpdateRequest(expiryPlc,
ctx.shared().exchange().readyAffinityVersion(), null);
+ }
+
+ /**
+ * @param expiryPlc Expiry policy.
+ * @param topVer Topology version.
+ * @param srcNodeId ID of the node that sent the initial ttl request.
+ */
+ private void sendTtlUpdateRequest(
+ IgniteCacheExpiryPolicy expiryPlc,
+ AffinityTopologyVersion topVer,
+ @Nullable UUID srcNodeId
+ ) {
+ if (!F.isEmpty(expiryPlc.entries())) {
ctx.closures().runLocalSafe(new GridPlainRunnable() {
@SuppressWarnings({"ForLoopReplaceableByForEach"})
@Override public void run() {
@@ -1002,8 +1016,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new
HashMap<>();
- AffinityTopologyVersion topVer =
ctx.shared().exchange().readyAffinityVersion();
-
for (Map.Entry<KeyCacheObject, GridCacheVersion> e :
entries.entrySet()) {
ClusterNode primaryNode =
ctx.affinity().primaryByKey(e.getKey(), topVer);
@@ -1012,6 +1024,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
for (Iterator<ClusterNode> nodesIter =
nodes.iterator(); nodesIter.hasNext(); ) {
ClusterNode node = nodesIter.next();
+
+ // There's no need to re-send ttl update
request to the node
+ // that sent the initial ttl update request.
+ if (srcNodeId != null &&
srcNodeId.equals(node.id()))
+ continue;
+
GridCacheTtlUpdateRequest req =
reqMap.get(node);
if (req == null) {
@@ -1024,6 +1042,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
}
}
else {
+ // There is no need to re-send ttl update requests
if we are not on the primary node.
+ if (srcNodeId != null)
+ continue;
+
GridCacheTtlUpdateRequest req =
reqMap.get(primaryNode);
if (req == null) {
@@ -1077,122 +1099,38 @@ public abstract class GridDhtCacheAdapter<K, V>
extends GridDistributedCacheAdap
}
}
- /**
- * @param srcNodeId The Id of a node that sends original ttl request.
- * @param incomingReq Original ttl request.
- */
- private void sendTtlUpdateRequest(UUID srcNodeId,
GridCacheTtlUpdateRequest incomingReq) {
- ctx.closures().runLocalSafe(new GridPlainRunnable() {
- @SuppressWarnings({"ForLoopReplaceableByForEach"})
- @Override public void run() {
- Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new
HashMap<>();
-
- for (int i = 0; i < incomingReq.keys().size(); i++) {
- KeyCacheObject key = incomingReq.keys().get(i);
-
- // It's only required to broadcast ttl update requests if
we are on primary node for given key.
- if (!ctx.affinity().primaryByKey(key,
incomingReq.topologyVersion()).isLocal())
- continue;
-
- Collection<ClusterNode> nodes =
ctx.affinity().backupsByKey(key, incomingReq.topologyVersion());
-
- for (Iterator<ClusterNode> nodesIter = nodes.iterator();
nodesIter.hasNext(); ) {
- ClusterNode node = nodesIter.next();
-
- // There's no need to send and update ttl request to
the node that send us the initial
- // ttl update request.
- if (node.id().equals(srcNodeId))
- continue;
-
- GridCacheTtlUpdateRequest req = reqMap.get(node);
-
- if (req == null) {
- reqMap.put(node, req = new
GridCacheTtlUpdateRequest(ctx.cacheId(),
- incomingReq.topologyVersion(),
- incomingReq.ttl()));
- }
-
- req.addEntry(key, incomingReq.version(i));
- }
-
- GridDhtCacheEntry entry = ctx.dht().entryExx(key,
incomingReq.topologyVersion());
-
- Collection<UUID> readers = null;
-
- try {
- readers = entry.readers();
- }
- catch (GridCacheEntryRemovedException e) {
- U.error(log, "Failed to send TTL update request.", e);
- }
-
- for (UUID reader : readers) {
- // There's no need to send and update ttl request to
the node that send us the initial
- // ttl update request.
- if (reader.equals(srcNodeId))
- continue;
-
- ClusterNode node = ctx.node(reader);
-
- if (node != null) {
- GridCacheTtlUpdateRequest req = reqMap.get(node);
-
- if (req == null) {
- reqMap.put(node, req = new
GridCacheTtlUpdateRequest(ctx.cacheId(),
- incomingReq.topologyVersion(),
- incomingReq.ttl()));
- }
-
- req.addNearEntry(key, incomingReq.version(i));
- }
- }
- }
-
- for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req :
reqMap.entrySet()) {
- try {
- ctx.io().send(req.getKey(), req.getValue(),
ctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- if (e instanceof ClusterTopologyCheckedException) {
- if (log.isDebugEnabled())
- log.debug("Failed to send TTC update request,
node left: " + req.getKey());
- }
- else
- U.error(log, "Failed to send TTL update request.",
e);
- }
- }
- }
- });
- }
-
/**
* @param req Request.
*/
private void processTtlUpdateRequest(UUID srcNodeId,
GridCacheTtlUpdateRequest req) {
+ final CacheExpiryPolicy expiryPlc =
CacheExpiryPolicy.fromRemote(CU.TTL_NOT_CHANGED, req.ttl());
+
if (req.keys() != null)
- updateTtl(this, req.keys(), req.versions(), req.ttl());
+ updateTtl(this, req.keys(), req.versions(), expiryPlc);
if (req.nearKeys() != null) {
GridNearCacheAdapter<K, V> near = near();
assert near != null;
- updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
+ updateTtl(near, req.nearKeys(), req.nearVersions(), expiryPlc);
}
- sendTtlUpdateRequest(srcNodeId, req);
+ sendTtlUpdateRequest(expiryPlc, req.topologyVersion(), srcNodeId);
}
/**
* @param cache Cache.
* @param keys Entries keys.
* @param vers Entries versions.
- * @param ttl TTL.
+ * @param expiryPlc Expiry policy.
*/
- private void updateTtl(GridCacheAdapter<K, V> cache,
+ private void updateTtl(
+ GridCacheAdapter<K, V> cache,
List<KeyCacheObject> keys,
List<GridCacheVersion> vers,
- long ttl) {
+ IgniteCacheExpiryPolicy expiryPlc
+ ) {
assert !F.isEmpty(keys);
assert keys.size() == vers.size();
@@ -1209,7 +1147,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
entry.unswap(false);
- entry.updateTtl(vers.get(i), ttl);
+ entry.updateTtl(vers.get(i), expiryPlc);
break;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index b38dc0f4006..8be2c860b53 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -876,6 +876,11 @@ public class GridCacheTestEntryEx extends
GridMetadataAwareAdapter implements Gr
return ttl;
}
+ /** @inheritDoc */
+ @Override public void updateTtl(GridCacheVersion ver,
IgniteCacheExpiryPolicy expiryPlc) {
+ throw new UnsupportedOperationException();
+ }
+
/** @inheritDoc */
@Override public void updateTtl(GridCacheVersion ver, long ttl) {
throw new UnsupportedOperationException();