# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5b142ddd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5b142ddd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5b142ddd Branch: refs/heads/ignite-1 Commit: 5b142ddd2d54172510035a7a6a72adad7e133b40 Parents: 575dbae Author: sboikov <[email protected]> Authored: Mon Dec 22 16:50:58 2014 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 22 23:00:55 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 26 +++-- .../cache/IgniteCacheExpiryPolicy.java | 7 +- .../GridDistributedCacheAdapter.java | 18 ++- .../GridDistributedTxRemoteAdapter.java | 12 +- .../distributed/dht/GridDhtLockFuture.java | 18 ++- .../distributed/dht/GridDhtLockRequest.java | 50 +++++++- .../dht/GridDhtTransactionalCacheAdapter.java | 42 +++++-- .../distributed/dht/GridDhtTxLocalAdapter.java | 16 ++- .../cache/distributed/dht/GridDhtTxRemote.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 19 ++- .../dht/colocated/GridDhtColocatedCache.java | 87 +++++++++++--- .../colocated/GridDhtColocatedLockFuture.java | 23 +++- .../distributed/near/GridNearAtomicCache.java | 1 + .../distributed/near/GridNearCacheAdapter.java | 3 +- .../distributed/near/GridNearLockFuture.java | 11 +- .../distributed/near/GridNearLockRequest.java | 35 +++++- .../near/GridNearTransactionalCache.java | 28 +++-- .../cache/distributed/near/GridNearTxLocal.java | 116 +++++++++++++++++-- .../processors/cache/local/GridLocalCache.java | 11 +- .../processors/cache/local/GridLocalTx.java | 1 + .../local/atomic/GridLocalAtomicCache.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 11 +- .../transactions/IgniteTxLocalAdapter.java | 82 ++++++++++--- .../IgniteCacheExpiryPolicyAbstractTest.java | 43 +++++-- .../GridCacheInterceptorAbstractSelfTest.java | 1 + 25 files changed, 545 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 3780cbb..5295f20 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -533,6 +533,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im * @param retval Flag to return value. * @param isolation Transaction isolation. * @param invalidate Invalidate flag. + * @param filter TTL for read operation. * @param filter Optional filter. * @return Locks future. */ @@ -544,6 +545,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im boolean retval, IgniteTxIsolation isolation, boolean invalidate, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter); /** @@ -1743,11 +1745,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im subjId = ctx.subjectIdPerCall(subjId, prj); - ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; - - if (expiryPlc == null) - expiryPlc = ctx.expiry(); - return getAllAsync(keys, entry, !skipTx, @@ -1755,7 +1752,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im taskName, deserializePortable, forcePrimary, - GetExpiryPolicy.forPolicy(expiryPlc), + accessExpiryPolicy(prj != null ? prj.expiry() : null), filter); } @@ -4552,6 +4549,17 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } /** + * @param plc Explicitly specified expiry policy for cache operation. + * @return Expiry policy wrapper. + */ + @Nullable public GetExpiryPolicy accessExpiryPolicy(@Nullable ExpiryPolicy plc) { + if (plc == null) + plc = ctx.expiry(); + + return GetExpiryPolicy.forPolicy(plc); + } + + /** * Cache operation. */ private abstract class SyncOp<T> { @@ -4901,10 +4909,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im return -1L; } - /** - * - */ - public synchronized void reset() { + /** {@inheritDoc} */ + @Override public synchronized void reset() { if (entries != null) entries.clear(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java index 59cd937..d603b35 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java @@ -36,7 +36,7 @@ public interface IgniteCacheExpiryPolicy { public long forAccess(); /** - * Callback when entry's ttl is updated on access. + * Callback for ttl update on entry access. * * @param key Entry key. * @param keyBytes Entry key bytes. @@ -49,6 +49,11 @@ public interface IgniteCacheExpiryPolicy { @Nullable Collection<UUID> rdrs); /** + * Clears information about updated entries. + */ + public void reset(); + + /** * @return Entries with TTL updated on access. */ @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java index e19fe92..2d01a82 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -59,11 +59,12 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter boolean retval, IgniteTxIsolation isolation, boolean isInvalidate, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { assert tx != null; - return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, filter); + return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, accessTtl, filter); } /** {@inheritDoc} */ @@ -72,7 +73,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter IgniteTxLocalEx<K, V> tx = ctx.tm().userTxx(); // Return value flag is true because we choose to bring values for explicit locks. - return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, filter); + return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter); } /** @@ -83,12 +84,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param isRead Indicates whether value is read or written. * @param retval Flag to return value. * @param isolation Transaction isolation. + * @param accessTtl TTL for read operation. * @param filter Optional filter. * @return Future for locks. */ - protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval, - @Nullable IgniteTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter); + protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + long timeout, + @Nullable IgniteTxLocalEx<K, V> tx, + boolean isInvalidate, + boolean isRead, + boolean retval, + @Nullable IgniteTxIsolation isolation, + long accessTtl, + IgnitePredicate<GridCacheEntry<K, V>>[] filter); /** * @param key Key to remove. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 1abf714..2e87441 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -578,17 +578,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> } } else if (op == READ) { - ExpiryPolicy expiry = txEntry.expiry(); - - if (expiry == null) - expiry = cacheCtx.expiry(); - - if (expiry != null) { - Duration duration = expiry.getExpiryForAccess(); - - if (duration != null) - cached.updateTtl(null, CU.toTtl(duration)); - } + assert near(); if (log.isDebugEnabled()) log.debug("Ignoring READ entry when committing: " + txEntry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java index 5a9ee72..2970568 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -121,6 +121,9 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo /** Pending locks. */ private final Collection<K> pendingLocks = new GridConcurrentHashSet<>(); + /** TTL for read operation. */ + private long accessTtl; + /** * Empty constructor required by {@link Externalizable}. */ @@ -138,6 +141,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param timeout Lock acquisition timeout. * @param tx Transaction. * @param threadId Thread ID. + * @param accessTtl TTL for read operation. * @param filter Filter. */ public GridDhtLockFuture( @@ -150,10 +154,10 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo long timeout, GridDhtTxLocalAdapter<K, V> tx, long threadId, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { super(cctx.kernalContext(), CU.boolReducer()); - assert cctx != null; assert nearNodeId != null; assert nearLockVer != null; assert topVer > 0; @@ -166,6 +170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo this.timeout = timeout; this.filter = filter; this.tx = tx; + this.accessTtl = accessTtl; if (tx != null) tx.topologyVersion(topVer); @@ -202,6 +207,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } /** + * @param cacheCtx Cache context. * @param invalidPart Partition to retry. */ void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) { @@ -827,7 +833,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo inTx() ? tx.groupLockKey() : null, inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0); + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L); try { for (ListIterator<GridDhtCacheEntry<K, V>> it = dhtMapping.listIterator(); it.hasNext();) { @@ -838,11 +845,13 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean invalidateRdr = e.readerId(n.id()) != null; + IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null; + req.addDhtKey( e.key(), e.getOrMarshalKeyBytes(), tx != null ? tx.writeMap().get(e.txKey()) : null, - tx != null ? tx.entry(e.txKey()).drVersion() : null, + entry != null ? entry.drVersion() : null, invalidateRdr, cctx); @@ -906,7 +915,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo inTx() ? tx.groupLockKey() : null, inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0); + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L); try { for (ListIterator<GridDhtCacheEntry<K, V>> it = nearMapping.listIterator(); it.hasNext();) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java index cb4ef69..f72f921 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -73,6 +73,9 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { @GridDirectVersion(3) private BitSet preloadKeys; + /** TTL for read operation. */ + private long accessTtl; + /** * Empty constructor required for {@link Externalizable}. */ @@ -81,6 +84,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { } /** + * @param cacheId Cache ID. * @param nodeId Node ID. * @param nearXidVer Near transaction ID. * @param threadId Thread ID. @@ -98,6 +102,9 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { * @param txSize Expected transaction size. * @param grpLockKey Group lock key. * @param partLock {@code True} if partition lock. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param accessTtl TTL for read operation. */ public GridDhtLockRequest( int cacheId, @@ -119,10 +126,24 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { @Nullable IgniteTxKey grpLockKey, boolean partLock, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + long accessTtl ) { - super(cacheId, nodeId, nearXidVer, threadId, futId, lockVer, isInTx, isRead, isolation, isInvalidate, timeout, - dhtCnt == 0 ? nearCnt : dhtCnt, txSize, grpLockKey, partLock); + super(cacheId, + nodeId, + nearXidVer, + threadId, + futId, + lockVer, + isInTx, + isRead, + isolation, + isInvalidate, + timeout, + dhtCnt == 0 ? nearCnt : dhtCnt, + txSize, + grpLockKey, + partLock); this.topVer = topVer; @@ -135,6 +156,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { this.miniId = miniId; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.accessTtl = accessTtl; } /** {@inheritDoc} */ @@ -239,6 +261,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { } /** + * @param idx Key index. * @return {@code True} if need to preload key with given index. */ public boolean needPreloadKey(int idx) { @@ -282,6 +305,13 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { return miniId; } + /** + * @return TTL for read operation. + */ + public long accessTtl() { + return accessTtl; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { @@ -330,6 +360,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { _clone.subjId = subjId; _clone.taskNameHash = taskNameHash; _clone.preloadKeys = preloadKeys; + _clone.accessTtl = accessTtl; } /** {@inheritDoc} */ @@ -417,6 +448,12 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { commState.idx++; + case 32: + if (!commState.putLong(accessTtl)) + return false; + + commState.idx++; + } return true; @@ -526,6 +563,13 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> { commState.idx++; + case 32: + if (buf.remaining() < 8) + return false; + + accessTtl = commState.getLong(); + + commState.idx++; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 67c01b3..dc8ddcb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -210,13 +210,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach tx.addWrite( ctx, - writeEntry == null ? (req.txRead() ? READ : NOOP) : writeEntry.op(), + writeEntry == null ? NOOP : writeEntry.op(), txKey, req.keyBytes() != null ? req.keyBytes().get(i) : null, writeEntry == null ? null : writeEntry.value(), writeEntry == null ? null : writeEntry.valueBytes(), writeEntry == null ? null : writeEntry.transformClosures(), - drVer); + drVer, + req.accessTtl()); if (req.groupLock()) tx.groupLockKey(txKey); @@ -547,8 +548,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean isRead, boolean retval, IgniteTxIsolation isolation, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - return lockAllAsyncInternal(keys, timeout, txx, isInvalidate, isRead, retval, isolation, filter); + return lockAllAsyncInternal(keys, + timeout, + txx, + isInvalidate, + isRead, + retval, + isolation, + accessTtl, + filter); } /** @@ -561,6 +571,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param isRead Read flag. * @param retval Return value flag. * @param isolation Transaction isolation. + * @param accessTtl TTL for read operation. * @param filter Optional filter. * @return Lock future. */ @@ -571,6 +582,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean isRead, boolean retval, IgniteTxIsolation isolation, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (keys == null || keys.isEmpty()) return new GridDhtFinishedFuture<>(ctx.kernalContext(), true); @@ -589,6 +601,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach timeout, tx, tx.threadId(), + accessTtl, filter); for (K key : keys) { @@ -637,6 +650,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** + * @param cacheCtx Cache context. * @param nearNode Near node. * @param req Request. * @param filter0 Filter. @@ -705,6 +719,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.timeout(), tx, req.threadId(), + req.accessTtl(), filter); // Add before mapping. @@ -815,15 +830,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.drVersions(), req.messageId(), req.implicitTx(), - req.txRead()); + req.txRead(), + req.accessTtl()); final GridDhtTxLocal<K, V> t = tx; return new GridDhtEmbeddedFuture<>( txFut, new C2<GridCacheReturn<V>, Exception, IgniteFuture<GridNearLockResponse<K, V>>>() { - @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(GridCacheReturn<V> o, - Exception e) { + @Override public IgniteFuture<GridNearLockResponse<K, V>> apply( + GridCacheReturn<V> o, Exception e) { if (e != null) e = U.unwrap(e); @@ -831,7 +847,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Create response while holding locks. final GridNearLockResponse<K, V> resp = createLockReply(nearNode, - entries, req, t, t.xidVersion(), e); + entries, + req, + t, + t.xidVersion(), + e); if (resp.error() == null && t.onePhaseCommit()) { assert t.implicit(); @@ -880,8 +900,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach else if (!b) e = new GridCacheLockTimeoutException(req.version()); - GridNearLockResponse<K, V> res = createLockReply(nearNode, entries, req, - null, mappedVer, e); + GridNearLockResponse<K, V> res = createLockReply(nearNode, + entries, + req, + null, + mappedVer, + e); sendLockReply(nearNode, null, req, res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index d20a4a3..b752178 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -461,12 +461,15 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K } /** + * @param cacheCtx Cache context. * @param entries Entries to lock. * @param writeEntries Write entries for implicit transactions mapped to one node. + * @param onePhaseCommit One phase commit flag. * @param drVers DR versions. * @param msgId Message ID. * @param implicit Implicit flag. * @param read Read flag. + * @param accessTtl TTL for read operation. * @return Lock future. */ IgniteFuture<GridCacheReturn<V>> lockAllAsync( @@ -477,7 +480,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K GridCacheVersion[] drVers, long msgId, boolean implicit, - final boolean read + final boolean read, + long accessTtl ) { try { checkValid(); @@ -547,6 +551,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K txEntry.drExpireTime(w.drExpireTime()); txEntry.expiry(w.expiry()); } + else if (read) + txEntry.ttl(accessTtl); txEntry.cached(cached, txEntry.keyBytes()); @@ -573,7 +579,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K if (log.isDebugEnabled()) log.debug("Lock keys: " + passedKeys); - return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, null); + return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, accessTtl, null); } catch (IgniteCheckedException e) { setRollbackOnly(); @@ -588,6 +594,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K * @param passedKeys Passed keys. * @param read {@code True} if read. * @param skipped Skipped keys. + * @param accessTtl TTL for read operation. * @param filter Entry write filter. * @return Future for lock acquisition. */ @@ -597,6 +604,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K final Collection<? extends K> passedKeys, final boolean read, final Set<K> skipped, + final long accessTtl, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + @@ -614,6 +622,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K read, /*retval*/false, isolation, + accessTtl, CU.<K, V>empty()); return new GridEmbeddedFuture<>( @@ -631,7 +640,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K ret, /*remove*/false, /*retval*/false, - /*read*/true, + /*read*/read, + accessTtl, filter == null ? CU.<K, V>empty() : filter); return ret; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java index a545bc4..97ec1af 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -281,6 +281,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param valBytes Value bytes. * @param drVer Data center replication version. * @param clos Transform closures. + * @param ttl TTL. */ public void addWrite(GridCacheContext<K, V> cacheCtx, GridCacheOperation op, @@ -289,7 +290,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable V val, @Nullable byte[] valBytes, @Nullable Collection<IgniteClosure<V, V>> clos, - @Nullable GridCacheVersion drVer) { + @Nullable GridCacheVersion drVer, + long ttl) { checkInternal(key); if (isSystemInvalidate()) @@ -301,7 +303,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> this, op, val, - -1L, + ttl, -1L, cached, drVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 65099ef..0b5d638 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -576,6 +576,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean isRead, boolean retval, @Nullable IgniteTxIsolation isolation, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " + "GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)")); @@ -734,8 +735,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long topVer = ctx.affinity().affinityTopologyVersion(); - final GetExpiryPolicy expiry = - GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { @@ -2741,9 +2741,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void ttlUpdated(Object key, - byte[] keyBytes, - GridCacheVersion ver, - @Nullable Collection<UUID> rdrs) { + byte[] keyBytes, + GridCacheVersion ver, + @Nullable Collection<UUID> rdrs) { if (entries == null) entries = new HashMap<>(); @@ -2767,6 +2767,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public void reset() { + if (entries != null) + entries.clear(); + + if (rdrsMap != null) + rdrsMap.clear(); + } + + /** {@inheritDoc} */ @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { return entries; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 7534fd5..9878768 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -184,7 +184,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, filter, - prj != null ? prj.expiry() : null); + accessExpiryPolicy(prj != null ? prj.expiry() : null)); } /** {@inheritDoc} */ @@ -236,15 +236,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable ExpiryPolicy expiryPlc) { + @Nullable IgniteCacheExpiryPolicy expiryPlc) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); if (keyCheck) validateCacheKeys(keys); - final GetExpiryPolicy expiry = - GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + if (expiryPlc == null) + expiryPlc = accessExpiryPolicy(ctx.expiry()); // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { @@ -276,7 +276,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte null, taskName, filter, - expiry); + expiryPlc); // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -325,14 +325,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } if (success) { - sendTtlUpdateRequest(expiry); + sendTtlUpdateRequest(expiryPlc); return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); } } - if (expiry != null) - expiry.reset(); + if (expiryPlc != null) + expiryPlc.reset(); // Either reload or not all values are available locally. GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, @@ -344,7 +344,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, taskName, deserializePortable, - expiry); + expiryPlc); fut.init(); @@ -363,13 +363,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean isRead, boolean retval, @Nullable IgniteTxIsolation isolation, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { assert tx == null || tx instanceof GridNearTxLocal; GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx; - GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, keys, txx, isRead, retval, - timeout, filter); + GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, + keys, + txx, + isRead, + retval, + timeout, + accessTtl, + filter); // Future will be added to mvcc only if it was mapped to remote nodes. fut.map(); @@ -570,6 +577,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** + * @param cacheCtx Cache context. * @param tx Started colocated transaction (if any). * @param threadId Thread ID. * @param ver Lock version. @@ -577,6 +585,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param keys Mapped keys. * @param txRead Tx read. * @param timeout Lock timeout. + * @param accessTtl TTL for read operation. * @param filter filter Optional filter. * @return Lock future. */ @@ -589,6 +598,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final Collection<K> keys, final boolean txRead, final long timeout, + final long accessTtl, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { assert keys != null; @@ -601,7 +611,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte // Check for exception. keyFut.get(); - return lockAllAsync0(cacheCtx, tx, threadId, ver, topVer, keys, txRead, timeout, filter); + return lockAllAsync0(cacheCtx, + tx, + threadId, + ver, + topVer, + keys, + txRead, + timeout, + accessTtl, + filter); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(ctx.kernalContext(), e); @@ -614,7 +633,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); - return lockAllAsync0(cacheCtx, tx, threadId, ver, topVer, keys, txRead, timeout, filter); + return lockAllAsync0(cacheCtx, + tx, + threadId, + ver, + topVer, + keys, + txRead, + timeout, + accessTtl, + filter); } }, ctx.kernalContext()); @@ -622,6 +650,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** + * @param cacheCtx Cache context. * @param tx Started colocated transaction (if any). * @param threadId Thread ID. * @param ver Lock version. @@ -629,19 +658,35 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param keys Mapped keys. * @param txRead Tx read. * @param timeout Lock timeout. + * @param accessTtl TTL for read operation. * @param filter filter Optional filter. * @return Lock future. */ private IgniteFuture<Exception> lockAllAsync0( GridCacheContext<K, V> cacheCtx, - @Nullable final GridNearTxLocal<K, V> tx, long threadId, - final GridCacheVersion ver, final long topVer, final Collection<K> keys, final boolean txRead, - final long timeout, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + @Nullable final GridNearTxLocal<K, V> tx, + long threadId, + final GridCacheVersion ver, + final long topVer, + final Collection<K> keys, + final boolean txRead, + final long timeout, + final long accessTtl, + @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { int cnt = keys.size(); if (tx == null) { - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, ctx.localNodeId(), ver, topVer, cnt, txRead, - timeout, tx, threadId, filter); + GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, + ctx.localNodeId(), + ver, + topVer, + cnt, + txRead, + timeout, + tx, + threadId, + accessTtl, + filter); // Add before mapping. if (!ctx.mvcc().addFuture(fut)) @@ -704,7 +749,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); - IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead); + IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, + keys, + tx.implicit(), + txRead, + accessTtl); return new GridDhtEmbeddedFuture<>( ctx.kernalContext(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 1a8f8d1..8c8a8e5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -101,6 +101,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Trackable flag (here may be non-volatile). */ private boolean trackable; + /** TTL for read operation. */ + private long accessTtl; + /** * Empty constructor required by {@link Externalizable}. */ @@ -115,6 +118,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param read Read flag. * @param retval Flag to return value or not. * @param timeout Lock acquisition timeout. + * @param accessTtl TTL for read operation. * @param filter Filter. */ public GridDhtColocatedLockFuture( @@ -124,9 +128,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity boolean read, boolean retval, long timeout, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { super(cctx.kernalContext(), CU.boolReducer()); - assert cctx != null; + assert keys != null; this.cctx = cctx; @@ -135,6 +140,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity this.read = read; this.retval = retval; this.timeout = timeout; + this.accessTtl = accessTtl; this.filter = filter; threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); @@ -710,7 +716,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity inTx() ? tx.groupLockKey() : null, inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0); + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L); mapping.request(req); } @@ -878,8 +885,16 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); - IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, tx, threadId, lockVer, - topVer, keys, read, timeout, filter); + IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, + tx, + threadId, + lockVer, + topVer, + keys, + read, + timeout, + accessTtl, + filter); // Add new future. add(new GridEmbeddedFuture<>( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index fa19607..d7e32b3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -631,6 +631,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { boolean isRead, boolean retval, @Nullable IgniteTxIsolation isolation, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return dht.lockAllAsync(keys, timeout, filter); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java index 83ac913..f3994bf 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -279,8 +279,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null; - final GetExpiryPolicy expiry = - GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java index 2653fd0..f1f9a7f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java @@ -111,6 +111,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B @GridToStringExclude private List<GridDistributedCacheEntry<K, V>> entries; + /** TTL for read operation. */ + private long accessTtl; + /** * Empty constructor required by {@link Externalizable}. */ @@ -125,6 +128,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param read Read flag. * @param retval Flag to return value or not. * @param timeout Lock acquisition timeout. + * @param accessTtl TTL for read operation. * @param filter Filter. */ public GridNearLockFuture( @@ -134,9 +138,10 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B boolean read, boolean retval, long timeout, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { super(cctx.kernalContext(), CU.boolReducer()); - assert cctx != null; + assert keys != null; this.cctx = cctx; @@ -145,6 +150,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B this.read = read; this.retval = retval; this.timeout = timeout; + this.accessTtl = accessTtl; this.filter = filter; threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); @@ -852,7 +858,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B inTx() ? tx.groupLockKey() : null, inTx() && tx.partitionLock(), inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0); + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L); mapping.request(req); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java index 0c7bd8c..6607a66 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java @@ -74,6 +74,9 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> /** Sync commit flag. */ private boolean syncCommit; + /** TTL for read operation. */ + private long accessTtl; + /** * Empty constructor required for {@link Externalizable}. */ @@ -82,6 +85,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> } /** + * @param cacheId Cache ID. * @param topVer Topology version. * @param nodeId Node ID. * @param threadId Thread ID. @@ -96,8 +100,12 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> * @param timeout Lock timeout. * @param keyCnt Number of keys. * @param txSize Expected transaction size. + * @param syncCommit Synchronous commit flag. * @param grpLockKey Group lock key if this is a group-lock transaction. * @param partLock If partition is locked. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param accessTtl TTL for read operation. */ public GridNearLockRequest( int cacheId, @@ -119,7 +127,8 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> @Nullable IgniteTxKey grpLockKey, boolean partLock, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + long accessTtl ) { super( cacheId, @@ -146,6 +155,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> this.syncCommit = syncCommit; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.accessTtl = accessTtl; dhtVers = new GridCacheVersion[keyCnt]; } @@ -291,6 +301,13 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> return true; } + /** + * @return TTL for read operation. + */ + public long accessTtl() { + return accessTtl; + } + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -335,6 +352,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> _clone.taskNameHash = taskNameHash; _clone.hasTransforms = hasTransforms; _clone.syncCommit = syncCommit; + _clone.accessTtl = accessTtl; } /** {@inheritDoc} */ @@ -460,6 +478,13 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> return false; commState.idx++; + + case 35: + if (!commState.putLong(accessTtl)) + return false; + + commState.idx++; + } return true; @@ -607,6 +632,14 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V> syncCommit = commState.getBoolean(); commState.idx++; + + case 35: + if (buf.remaining() < 8) + return false; + + accessTtl = commState.getLong(); + + commState.idx++; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java index f09d5c2..839a5e9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -130,12 +130,14 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param keys Keys to load. * @param filter Filter. * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. * @return Future. */ IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx, @Nullable Collection<? extends K> keys, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - boolean deserializePortable) { + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc) { assert tx != null; GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, @@ -147,7 +149,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> CU.subjectId(tx, ctx.shared()), tx.resolveTaskName(), deserializePortable, - null); + expiryPlc); // init() will register future for responses if it has remote mappings. fut.init(); @@ -393,11 +395,23 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> } /** {@inheritDoc} */ - @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - IgniteTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval, - IgniteTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, keys, (GridNearTxLocal<K, V>)tx, isRead, - retval, timeout, filter); + @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + long timeout, + IgniteTxLocalEx<K, V> tx, + boolean isInvalidate, + boolean isRead, + boolean retval, + IgniteTxIsolation isolation, + long accessTtl, + IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, + keys, + (GridNearTxLocal<K, V>)tx, + isRead, + retval, + timeout, + accessTtl, + filter); if (!ctx.mvcc().addFuture(fut)) throw new IllegalStateException("Duplicate future ID: " + fut); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java index 675933e..0d90831 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java @@ -27,6 +27,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -71,6 +72,9 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { /** True if transaction contains colocated cache entries mapped to local node. */ private boolean colocatedLocallyMapped; + /** Info for entries accessed locally in optimistic transaction. */ + private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap; + /** * Empty constructor required for {@link Externalizable}. */ @@ -86,9 +90,13 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. + * @param invalidate + * @param storeEnabled * @param txSize Transaction size. * @param grpLockKey Group lock key if this is a group lock transaction. * @param partLock {@code True} if this is a group-lock transaction and the whole partition should be locked. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridNearTxLocal( GridCacheSharedContext<K, V> ctx, @@ -263,12 +271,17 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> loadMissing( GridCacheContext<K, V> cacheCtx, - boolean async, final Collection<? extends K> keys, + boolean async, + final Collection<? extends K> keys, boolean deserializePortable, final IgniteBiInClosure<K, V> c ) { if (cacheCtx.isNear()) { - return cacheCtx.nearTx().txLoadAsync(this, keys, CU.<K, V>empty(), deserializePortable).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() { + return cacheCtx.nearTx().txLoadAsync(this, + keys, + CU.<K, V>empty(), + deserializePortable, + accessPolicy(cacheCtx, keys)).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() { @Override public Boolean apply(IgniteFuture<Map<K, V>> f) { try { Map<K, V> map = f.get(); @@ -289,9 +302,15 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { }); } else if (cacheCtx.isColocated()) { - return cacheCtx.colocated().loadAsync(keys, /*reload*/false, /*force primary*/false, topologyVersion(), - CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null, null) - .chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() { + return cacheCtx.colocated().loadAsync(keys, + /*reload*/false, + /*force primary*/false, + topologyVersion(), + CU.subjectId(this, cctx), + resolveTaskName(), + deserializePortable, + null, + accessPolicy(cacheCtx, keys)).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() { @Override public Boolean apply(IgniteFuture<Map<K, V>> f) { try { Map<K, V> map = f.get(); @@ -529,8 +548,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. */ - void readyNearLocks(GridDistributedTxMapping<K, V> mapping, Collection<GridCacheVersion> pendingVers, - Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { + void readyNearLocks(GridDistributedTxMapping<K, V> mapping, + Collection<GridCacheVersion> pendingVers, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers) + { Collection<IgniteTxEntry<K, V>> entries = groupLock() ? Collections.singletonList(groupLockEntry()) : F.concat(false, mapping.reads(), mapping.writes()); @@ -1044,8 +1066,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { } /** {@inheritDoc} */ - public IgniteFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K, V> cacheCtx, final Collection<? extends K> keys, - boolean implicit, boolean read) { + public IgniteFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K, V> cacheCtx, + final Collection<? extends K> keys, + boolean implicit, + boolean read, + long accessTtl) { assert pessimistic(); try { @@ -1066,7 +1091,14 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { log.debug("Before acquiring transaction lock on keys: " + keys); IgniteFuture<Boolean> fut = cacheCtx.colocated().lockAllAsyncInternal(keys, - lockTimeout(), this, isInvalidate(), read, /*retval*/false, isolation, CU.<K, V>empty()); + lockTimeout(), + this, + isInvalidate(), + read, + /*retval*/false, + isolation, + accessTtl, + CU.<K, V>empty()); return new GridEmbeddedFuture<>( fut, @@ -1136,6 +1168,70 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { } /** {@inheritDoc} */ + @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx, + IgniteTxKey key, + @Nullable ExpiryPolicy expiryPlc) + { + assert optimistic(); + + if (expiryPlc == null) + expiryPlc = ctx.expiry(); + + if (expiryPlc != null) { + IgniteCacheExpiryPolicy plc = ctx.cache().accessExpiryPolicy(expiryPlc); + + if (plc != null) { + if (accessMap == null) + accessMap = new HashMap<>(); + + accessMap.put(key, plc); + } + + return plc; + } + + return null; + } + + /** + * @param cacheCtx Cache context. + * @param keys Keys. + * @return Expiry policy. + */ + private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys) { + if (accessMap != null) { + for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { + if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key())) + return e.getValue(); + } + } + + return null; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + super.close(); + + if (accessMap != null) { + assert optimistic(); + + for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { + if (e.getValue().entries() != null) { + GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); + + if (cctx0.isNear()) + cctx0.near().dht().sendTtlUpdateRequest(e.getValue()); + else + cctx0.dht().sendTtlUpdateRequest(e.getValue()); + } + } + + accessMap = null; + } + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java index 9ea91b9..c461894 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java @@ -86,9 +86,14 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, long timeout, - IgniteTxLocalEx<K, V> tx, boolean isRead, - boolean retval, IgniteTxIsolation isolation, boolean invalidate, + @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, + long timeout, + IgniteTxLocalEx<K, V> tx, + boolean isRead, + boolean retval, + IgniteTxIsolation isolation, + boolean invalidate, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return lockAllAsync(keys, timeout, tx, filter); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java index 0226ff2..86fed36 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java @@ -173,6 +173,7 @@ class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> { rollbackAsync().get(); } + /** {@inheritDoc} */ @Override public IgniteFuture<IgniteTx> rollbackAsync() { try { state(ROLLING_BACK); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 88b6cfc..eaf0173 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -594,8 +594,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(keys); - final GetExpiryPolicy expiry = - GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc); boolean success = true; @@ -1287,6 +1286,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { boolean retval, IgniteTxIsolation isolation, boolean invalidate, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " + "GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)")); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java index 8d3e6a0..1d4b5d7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java @@ -1149,8 +1149,15 @@ public class IgniteTxHandler<K, V> { "(transaction has been completed): " + req.version()); } - tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(), - txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion()); + tx.addWrite(cacheCtx, + txEntry.op(), + txEntry.txKey(), + txEntry.keyBytes(), + txEntry.value(), + txEntry.valueBytes(), + txEntry.transformClosures(), + txEntry.drVersion(), + txEntry.ttl()); if (!marked) { if (tx.markFinalizing(USER_FINISH)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index 8e742fd..27c1f44 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -410,6 +410,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** * Gets cache entry for given key. * + * @param cacheCtx Cache context. * @param key Key. * @return Cache entry. */ @@ -420,6 +421,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** * Gets cache entry for given key and topology version. * + * @param cacheCtx Cache context. * @param key Key. * @param topVer Topology version. * @return Cache entry. @@ -996,6 +998,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } } + protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx, + IgniteTxKey key, + @Nullable ExpiryPolicy expiryPlc) { + return null; + } + /** * Checks if there is a cached or swapped value for * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method. @@ -1156,6 +1164,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> V val = null; if (!pessimistic() || readCommitted() || groupLock()) { + IgniteCacheExpiryPolicy accessPlc = + optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; + // This call will check for filter. val = entry.innerGet(this, /*swap*/true, @@ -1169,7 +1180,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> null, resolveTaskName(), filter, - null); + accessPlc); if (val != null) { V val0 = val; @@ -1269,6 +1280,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * Loads all missed keys for * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method. * + * @param cacheCtx Cache context. * @param map Return map. * @param missedMap Missed keys. * @param redos Keys to retry. @@ -1466,7 +1478,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @Override public IgniteFuture<Map<K, V>> getAllAsync( final GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys, - @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable, + @Nullable GridCacheEntryEx<K, V> cached, + final boolean deserializePortable, final IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -1486,10 +1499,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall(); + ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; + final Collection<K> lockKeys = enlistRead(cacheCtx, keys, cached, - prj != null ? prj.expiry() : null, + expiryPlc, retMap, missed, keysCnt, @@ -1501,8 +1516,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> // Handle locks. if (pessimistic() && !readCommitted() && !groupLock()) { - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true, - isolation, isInvalidate(), CU.<K, V>empty()); + if (expiryPlc == null) + expiryPlc = cacheCtx.expiry(); + + long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : -1L; + + IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, + lockTimeout(), + this, + true, + true, + isolation, + isInvalidate(), + accessTtl, + CU.<K, V>empty()); PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() { @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException { @@ -2053,6 +2080,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param rmv {@code True} if remove. * @param retval Flag to return value or not. * @param read {@code True} if read. + * @param accessTtl TTL for read operation. * @param filter Filter to check entries. * @return Failed keys. * @throws IgniteCheckedException If error. @@ -2067,6 +2095,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> boolean rmv, boolean retval, boolean read, + long accessTtl, IgnitePredicate<GridCacheEntry<K, V>>[] filter ) throws IgniteCheckedException { for (K k : keys) { @@ -2158,10 +2187,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } if (updateTtl) { - ExpiryPolicy expiryPlc = txEntry.expiry() != null ? txEntry.expiry() : cacheCtx.expiry(); + if (!read) { + ExpiryPolicy expiryPlc = txEntry.expiry() != null ? txEntry.expiry() : cacheCtx.expiry(); - if (expiryPlc != null) - txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess())); + if (expiryPlc != null) + txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess())); + } + else + txEntry.ttl(accessTtl); } break; // While. @@ -2336,8 +2369,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for put on keys: " + keys); - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, lockTimeout(), this, false, - retval, isolation, isInvalidate(), CU.<K, V>empty()); + IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L, + CU.<K, V>empty()); PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) { @Override public GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException { @@ -2355,6 +2395,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /*remove*/false, retval, /*read*/false, + -1L, filter); return ret; @@ -2530,8 +2571,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys); - IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, lockTimeout(), this, false, retval, - isolation, isInvalidate(), CU.<K, V>empty()); + IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L, + CU.<K, V>empty()); PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) { @Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException { @@ -2547,6 +2595,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /*remove*/true, retval, /*read*/false, + -1L, filter); return ret; @@ -2681,7 +2730,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> // Lock group key in pessimistic mode only. return pessimistic() ? - cacheCtx.cache().txLockAsync(enlisted, lockTimeout(), this, false, false, isolation, isInvalidate(), + cacheCtx.cache().txLockAsync(enlisted, + lockTimeout(), + this, + false, + false, + isolation, + isInvalidate(), + -1L, CU.<K, V>empty()) : new GridFinishedFuture<>(cctx.kernalContext()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 6100479..a57da71 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -163,40 +163,58 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } if (atomicityMode() == TRANSACTIONAL) { - for (final Integer key : keys()) { - log.info("Test txGet [key=" + key + ']'); + IgniteTxConcurrency[] txModes = {PESSIMISTIC}; - txGet(key); + for (IgniteTxConcurrency txMode : txModes) { + for (final Integer key : keys()) { + log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']'); + + txGet(key, txMode); + } } - txGetAll(); + for (IgniteTxConcurrency txMode : txModes) { + log.info("Test txGetAll [txMode=" + txMode + ']'); + + txGetAll(txMode); + } } } /** * @param key Key. + * @param txMode Transaction concurrency mode. * @throws Exception If failed. */ - private void txGet(Integer key) throws Exception { + private void txGet(Integer key, IgniteTxConcurrency txMode) throws Exception { IgniteCache<Integer, Integer> cache = jcache(); cache.put(key, 1); checkTtl(key, 60_000L); - try (IgniteTx tx = ignite(0).transactions().txStart()) { + try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) { assertEquals((Integer)1, cache.get(key)); tx.commit(); } checkTtl(key, 62_000L, true); + + try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) { + assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(100L, 200L, 1000L)).get(key)); + + tx.commit(); + } + + checkTtl(key, 1000L, true); } /** + * @param txMode Transaction concurrency mode. * @throws Exception If failed. */ - private void txGetAll() throws Exception { + private void txGetAll(IgniteTxConcurrency txMode) throws Exception { IgniteCache<Integer, Integer> cache = jcache(0); Map<Integer, Integer> vals = new HashMap<>(); @@ -206,7 +224,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs cache.putAll(vals); - try (IgniteTx tx = ignite(0).transactions().txStart()) { + try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) { assertEquals(vals, cache.getAll(vals.keySet())); tx.commit(); @@ -214,6 +232,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs for (Integer key : vals.keySet()) checkTtl(key, 62_000L); + + try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) { + assertEquals(vals, cache.withExpiryPolicy(new TestPolicy(100L, 200L, 1000L)).getAll(vals.keySet())); + + tx.commit(); + } + + for (Integer key : vals.keySet()) + checkTtl(key, 1000L); } /**
