IGNITE-3699 CreatedExpiryPolicy doesn't work if entry is loaded from store
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28d66dbc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28d66dbc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28d66dbc Branch: refs/heads/master Commit: 28d66dbc100b7ba299a48cce0f001a4070566978 Parents: c42b50c Author: Anton Vinogradov <[email protected]> Authored: Tue Jan 24 12:28:42 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Jan 24 12:28:42 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 30 +++--- .../GridDistributedCacheAdapter.java | 6 +- .../distributed/dht/GridDhtCacheAdapter.java | 6 +- .../distributed/dht/GridDhtLockFuture.java | 28 +++--- .../dht/GridDhtTransactionalCacheAdapter.java | 7 ++ .../distributed/dht/GridDhtTxLocalAdapter.java | 8 +- .../dht/GridPartitionedGetFuture.java | 1 + .../dht/GridPartitionedSingleGetFuture.java | 2 + .../dht/atomic/GridDhtAtomicCache.java | 1 + .../dht/colocated/GridDhtColocatedCache.java | 10 ++ .../colocated/GridDhtColocatedLockFuture.java | 10 +- .../distributed/near/GridNearAtomicCache.java | 1 + .../distributed/near/GridNearGetFuture.java | 1 + .../distributed/near/GridNearGetRequest.java | 77 +++++++++----- .../distributed/near/GridNearLockFuture.java | 7 ++ .../distributed/near/GridNearLockRequest.java | 81 ++++++++++----- .../near/GridNearSingleGetRequest.java | 57 ++++++++--- .../near/GridNearTransactionalCache.java | 2 + .../cache/distributed/near/GridNearTxLocal.java | 17 +++- .../processors/cache/local/GridLocalCache.java | 1 + .../local/atomic/GridLocalAtomicCache.java | 1 + .../transactions/IgniteTxLocalAdapter.java | 36 +++++-- .../cache/transactions/IgniteTxLocalEx.java | 3 + ...eCacheExpiryPolicyWithStoreAbstractTest.java | 100 +++++++++++++++---- .../IgniteCacheTxExpiryPolicyWithStoreTest.java | 21 ++++ 25 files changed, 381 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index e414160..ecf9ea9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -528,6 +528,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param retval Flag to return value. * @param isolation Transaction isolation. * @param invalidate Invalidate flag. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @return Locks future. */ @@ -539,6 +540,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V boolean retval, TransactionIsolation isolation, boolean invalidate, + long createTtl, long accessTtl); /** @@ -5754,28 +5756,28 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @param ttl Access TTL. + * @param createTtl Create TTL. + * @param accessTtl Access TTL. * @return Access expire policy. */ - @Nullable public static CacheExpiryPolicy forAccess(final long ttl) { - if (ttl == CU.TTL_NOT_CHANGED) + @Nullable public static CacheExpiryPolicy fromRemote(final long createTtl, final long accessTtl) { + if (createTtl == CU.TTL_NOT_CHANGED && accessTtl == CU.TTL_NOT_CHANGED) return null; return new CacheExpiryPolicy() { - @Override public long forAccess() { - return ttl; + @Override public long forCreate() { + return createTtl; } - }; - } - /** {@inheritDoc} */ - @Override public long forCreate() { - return CU.TTL_NOT_CHANGED; - } + @Override public long forAccess() { + return accessTtl; + } - /** {@inheritDoc} */ - @Override public long forUpdate() { - return CU.TTL_NOT_CHANGED; + /** {@inheritDoc} */ + @Override public long forUpdate() { + return CU.TTL_NOT_CHANGED; + } + }; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 03f6474..d89a468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -102,11 +102,12 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter boolean retval, TransactionIsolation isolation, boolean isInvalidate, + long createTtl, long accessTtl ) { assert tx != null; - return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, accessTtl); + return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, createTtl, accessTtl); } /** {@inheritDoc} */ @@ -121,6 +122,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter false, /*retval*/true, null, + -1L, -1L); } @@ -132,6 +134,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param isRead Indicates whether value is read or written. * @param retval Flag to return value. * @param isolation Transaction isolation. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @return Future for locks. */ @@ -142,6 +145,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter boolean isRead, boolean retval, @Nullable TransactionIsolation isolation, + long createTtl, long accessTtl); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 63213e1..c9f7c5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -758,7 +758,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) { assert ctx.affinityNode(); - final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(req.accessTtl()); + final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl()); IgniteInternalFuture<GridCacheEntryInfo> fut = getDhtSingleAsync( @@ -858,9 +858,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap assert ctx.affinityNode(); assert !req.reload() : req; - long ttl = req.accessTtl(); - - final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); + final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl()); IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getDhtAsync(nodeId, http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 3f35305..125b4f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -28,7 +28,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; -import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -157,6 +156,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> /** Pending locks. */ private final Collection<KeyCacheObject> pendingLocks; + /** TTL for create operation. */ + private long createTtl; + /** TTL for read operation. */ private long accessTtl; @@ -195,6 +197,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> long timeout, GridDhtTxLocalAdapter tx, long threadId, + long createTtl, long accessTtl, CacheEntryPredicate[] filter, boolean skipStore, @@ -215,6 +218,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> this.timeout = timeout; this.filter = filter; this.tx = tx; + this.createTtl = createTtl; this.accessTtl = accessTtl; this.skipStore = skipStore; this.keepBinary = keepBinary; @@ -1062,22 +1066,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> try { CacheObject val0 = cctx.toCacheObject(val); - long ttl = CU.TTL_ETERNAL; - long expireTime = CU.EXPIRE_TIME_ETERNAL; - - ExpiryPolicy expiry = cctx.expiry(); - - if (expiry != null) { - ttl = CU.toTtl(expiry.getExpiryForCreation()); + long ttl = createTtl; + long expireTime; - if (ttl == CU.TTL_ZERO) - expireTime = CU.expireTimeInPast(); - else { - if (ttl == CU.TTL_NOT_CHANGED) - ttl = CU.TTL_ETERNAL; + if (ttl == CU.TTL_ZERO) + expireTime = CU.expireTimeInPast(); + else { + if (ttl == CU.TTL_NOT_CHANGED) + ttl = CU.TTL_ETERNAL; - expireTime = CU.toExpireTime(ttl); - } + expireTime = CU.toExpireTime(ttl); } entry0.initialValue(val0, http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 01bc4e0..a9e3bc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -677,6 +677,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean isRead, boolean retval, TransactionIsolation isolation, + long createTtl, long accessTtl) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -688,6 +689,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach isRead, retval, isolation, + createTtl, accessTtl, CU.empty0(), opCtx != null && opCtx.skipStore(), @@ -704,6 +706,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param isRead Read flag. * @param retval Return value flag. * @param isolation Transaction isolation. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param filter Optional filter. * @param skipStore Skip store flag. @@ -716,6 +719,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean isRead, boolean retval, TransactionIsolation isolation, + long createTtl, long accessTtl, CacheEntryPredicate[] filter, boolean skipStore, @@ -738,6 +742,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach timeout, tx, tx.threadId(), + createTtl, accessTtl, filter, skipStore, @@ -859,6 +864,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.timeout(), tx, req.threadId(), + req.createTtl(), req.accessTtl(), filter, req.skipStore(), @@ -1007,6 +1013,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.messageId(), req.txRead(), req.needReturnValue(), + req.createTtl(), req.accessTtl(), req.skipStore(), req.keepBinary()); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 1d88d84..1823cce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -148,7 +148,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { storeEnabled, onePhaseCommit, txSize, - subjId, + subjId, taskNameHash ); @@ -533,6 +533,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param entries Entries to lock. * @param msgId Message ID. * @param read Read flag. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param needRetVal Return value flag. * @param skipStore Skip store flag. @@ -545,6 +546,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { long msgId, final boolean read, final boolean needRetVal, + long createTtl, long accessTtl, boolean skipStore, boolean keepBinary @@ -651,6 +653,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { passedKeys, read, needRetVal, + createTtl, accessTtl, null, skipStore, @@ -669,6 +672,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param passedKeys Passed keys. * @param read {@code True} if read. * @param needRetVal Return value flag. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param filter Entry write filter. * @param skipStore Skip store flag. @@ -680,6 +684,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { final Collection<KeyCacheObject> passedKeys, final boolean read, final boolean needRetVal, + final long createTtl, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, boolean skipStore, @@ -705,6 +710,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { read, needRetVal, isolation, + createTtl, accessTtl, CU.empty0(), skipStore, http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 2e22d9e..7efe841 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -340,6 +340,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index aeb7eba..a0b7940 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -280,6 +280,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, /**add reader*/false, @@ -299,6 +300,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 07b9dad..acfe141 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -885,6 +885,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean isRead, boolean retval, @Nullable TransactionIsolation isolation, + long createTtl, long accessTtl) { return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " + "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 9cf400d..2d18a47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -600,6 +600,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean isRead, boolean retval, @Nullable TransactionIsolation isolation, + long createTtl, long accessTtl ) { assert tx == null || tx instanceof GridNearTxLocal : tx; @@ -614,6 +615,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte isRead, retval, timeout, + createTtl, accessTtl, CU.empty0(), opCtx != null && opCtx.skipStore(), @@ -861,6 +863,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param txRead Tx read. * @param retval Return value flag. * @param timeout Lock timeout. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param filter filter Optional filter. * @param skipStore Skip store flag. @@ -876,6 +879,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final boolean txRead, final boolean retval, final long timeout, + final long createTtl, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, final boolean skipStore, @@ -900,6 +904,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte txRead, retval, timeout, + createTtl, accessTtl, filter, skipStore, @@ -921,6 +926,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte txRead, retval, timeout, + createTtl, accessTtl, filter, skipStore, @@ -941,6 +947,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param txRead Tx read. * @param retval Return value flag. * @param timeout Lock timeout. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param filter filter Optional filter. * @param skipStore Skip store flag. @@ -956,6 +963,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final boolean txRead, boolean retval, final long timeout, + final long createTtl, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, boolean skipStore, @@ -973,6 +981,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte timeout, tx, threadId, + createTtl, accessTtl, filter, skipStore, @@ -1041,6 +1050,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte keys, retval, txRead, + createTtl, accessTtl, skipStore, keepBinary); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 742f004..69b66f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -145,6 +145,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Trackable flag (here may be non-volatile). */ private boolean trackable; + /** TTL for create operation. */ + private final long createTtl; + /** TTL for read operation. */ private final long accessTtl; @@ -164,6 +167,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @param read Read flag. * @param retval Flag to return value or not. * @param timeout Lock acquisition timeout. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param filter Filter. * @param skipStore Skip store flag. @@ -175,6 +179,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture boolean read, boolean retval, long timeout, + long createTtl, long accessTtl, CacheEntryPredicate[] filter, boolean skipStore, @@ -189,6 +194,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture this.read = read; this.retval = retval; this.timeout = timeout; + this.createTtl = createTtl; this.accessTtl = accessTtl; this.filter = filter; this.skipStore = skipStore; @@ -928,6 +934,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture inTx() && tx.syncMode() == FULL_SYNC, inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, + read ? createTtl : -1L, read ? accessTtl : -1L, skipStore, keepBinary, @@ -1104,7 +1111,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** * Locks given keys directly through dht cache. - * @param keys Collection of keys. + * @param keys Collection of keys. * @param topVer Topology version to lock on. */ private void lockLocally( @@ -1123,6 +1130,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture read, retval, timeout, + createTtl, accessTtl, filter, skipStore, http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 4350b3e..b843e4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -586,6 +586,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { boolean isRead, boolean retval, @Nullable TransactionIsolation isolation, + long createTtl, long accessTtl) { return dht.lockAllAsync(null, timeout); } http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index b7fcbbd..6ac55f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -374,6 +374,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 4272a4d..7ca2635 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -100,6 +100,9 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep private int taskNameHash; /** TTL for read operation. */ + private long createTtl; + + /** TTL for read operation. */ private long accessTtl; /** @@ -121,6 +124,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep * @param topVer Topology version. * @param subjId Subject ID. * @param taskNameHash Task name hash. + * @param createTtl New TTL to set after entry is created, -1 to leave unchanged. * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. * @param addDepInfo Deployment info. */ @@ -134,6 +138,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep @NotNull AffinityTopologyVersion topVer, UUID subjId, int taskNameHash, + long createTtl, long accessTtl, boolean skipVals, boolean addDepInfo @@ -161,6 +166,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep this.topVer = topVer; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.createTtl = createTtl; this.accessTtl = accessTtl; this.skipVals = skipVals; this.addDepInfo = addDepInfo; @@ -238,6 +244,13 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep } /** + * @return New TTL to set after entry is created, -1 to leave unchanged. + */ + public long createTtl() { + return createTtl; + } + + /** * @return New TTL to set after entry is accessed, -1 to leave unchanged. */ public long accessTtl() { @@ -325,73 +338,79 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep writer.incrementState(); case 4: - if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN)) + if (!writer.writeLong("createTtl", createTtl)) return false; writer.incrementState(); case 5: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 7: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 8: - if (!writer.writeBoolean("readThrough", readThrough)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 9: - if (!writer.writeBoolean("reload", reload)) + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 10: - if (!writer.writeBoolean("skipVals", skipVals)) + if (!writer.writeBoolean("readThrough", readThrough)) return false; writer.incrementState(); case 11: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("reload", reload)) return false; writer.incrementState(); case 12: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("skipVals", skipVals)) return false; writer.incrementState(); case 13: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 14: - if (!writer.writeMessage("ver", ver)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 15: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 16: + if (!writer.writeMessage("ver", ver)) return false; writer.incrementState(); @@ -421,7 +440,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 4: - flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN); + createTtl = reader.readLong("createTtl"); if (!reader.isLastRead()) return false; @@ -429,7 +448,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 5: - futId = reader.readIgniteUuid("futId"); + flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN); if (!reader.isLastRead()) return false; @@ -437,7 +456,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 6: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -445,7 +464,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 7: - miniId = reader.readIgniteUuid("miniId"); + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -453,7 +472,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 8: - readThrough = reader.readBoolean("readThrough"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -461,7 +480,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 9: - reload = reader.readBoolean("reload"); + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -469,7 +488,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 10: - skipVals = reader.readBoolean("skipVals"); + readThrough = reader.readBoolean("readThrough"); if (!reader.isLastRead()) return false; @@ -477,7 +496,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 11: - subjId = reader.readUuid("subjId"); + reload = reader.readBoolean("reload"); if (!reader.isLastRead()) return false; @@ -485,7 +504,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 12: - taskNameHash = reader.readInt("taskNameHash"); + skipVals = reader.readBoolean("skipVals"); if (!reader.isLastRead()) return false; @@ -493,7 +512,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 13: - topVer = reader.readMessage("topVer"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -501,7 +520,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 14: - ver = reader.readMessage("ver"); + taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) return false; @@ -509,7 +528,15 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 15: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: + ver = reader.readMessage("ver"); if (!reader.isLastRead()) return false; @@ -528,7 +555,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 17; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 3d350f6..d7a0fb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -148,6 +148,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean @GridToStringExclude private List<GridDistributedCacheEntry> entries; + /** TTL for create operation. */ + private long createTtl; + /** TTL for read operation. */ private long accessTtl; @@ -168,6 +171,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @param read Read flag. * @param retval Flag to return value or not. * @param timeout Lock acquisition timeout. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param filter Filter. * @param skipStore skipStore @@ -180,6 +184,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean boolean read, boolean retval, long timeout, + long createTtl, long accessTtl, CacheEntryPredicate[] filter, boolean skipStore, @@ -195,6 +200,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean this.read = read; this.retval = retval; this.timeout = timeout; + this.createTtl = createTtl; this.accessTtl = accessTtl; this.filter = filter; this.skipStore = skipStore; @@ -1056,6 +1062,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean inTx() && tx.syncMode() == FULL_SYNC, inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, + read ? createTtl : -1L, read ? accessTtl : -1L, skipStore, keepBinary, http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 2e8cd6e..9e12153 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** Sync commit flag. */ private boolean syncCommit; + /** TTL for create operation. */ + private long createTtl; + /** TTL for read operation. */ private long accessTtl; @@ -116,6 +119,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param syncCommit Synchronous commit flag. * @param subjId Subject ID. * @param taskNameHash Task name hash code. + * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. * @param firstClientReq {@code True} if first lock request for lock operation sent from client node. @@ -141,6 +145,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { boolean syncCommit, @Nullable UUID subjId, int taskNameHash, + long createTtl, long accessTtl, boolean skipStore, boolean keepBinary, @@ -174,6 +179,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { this.syncCommit = syncCommit; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.createTtl = createTtl; this.accessTtl = accessTtl; this.retVal = retVal; this.firstClientReq = firstClientReq; @@ -312,6 +318,13 @@ public class GridNearLockRequest extends GridDistributedLockRequest { } /** + * @return New TTL to set after entry is created, -1 to leave unchanged. + */ + public long createTtl() { + return createTtl; + } + + /** * @return TTL for read operation. */ public long accessTtl() { @@ -368,84 +381,90 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); case 21: - if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) + if (!writer.writeLong("createTtl", createTtl)) return false; writer.incrementState(); case 22: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 23: - if (!writer.writeBoolean("firstClientReq", firstClientReq)) + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 24: - if (!writer.writeBoolean("hasTransforms", hasTransforms)) + if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) + if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("implicitTx", implicitTx)) + if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) return false; writer.incrementState(); case 27: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeBoolean("implicitTx", implicitTx)) return false; writer.incrementState(); case 28: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 29: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); case 30: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 31: - if (!writer.writeBoolean("syncCommit", syncCommit)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 32: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("syncCommit", syncCommit)) return false; writer.incrementState(); case 33: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 34: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 35: if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) return false; @@ -476,7 +495,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 21: - dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + createTtl = reader.readLong("createTtl"); if (!reader.isLastRead()) return false; @@ -484,7 +503,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 22: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -492,7 +511,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 23: - firstClientReq = reader.readBoolean("firstClientReq"); + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) return false; @@ -500,7 +519,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 24: - hasTransforms = reader.readBoolean("hasTransforms"); + firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) return false; @@ -508,7 +527,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 25: - implicitSingleTx = reader.readBoolean("implicitSingleTx"); + hasTransforms = reader.readBoolean("hasTransforms"); if (!reader.isLastRead()) return false; @@ -516,7 +535,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 26: - implicitTx = reader.readBoolean("implicitTx"); + implicitSingleTx = reader.readBoolean("implicitSingleTx"); if (!reader.isLastRead()) return false; @@ -524,7 +543,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 27: - miniId = reader.readIgniteUuid("miniId"); + implicitTx = reader.readBoolean("implicitTx"); if (!reader.isLastRead()) return false; @@ -532,7 +551,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 28: - onePhaseCommit = reader.readBoolean("onePhaseCommit"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -540,7 +559,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 29: - retVal = reader.readBoolean("retVal"); + onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) return false; @@ -548,7 +567,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 30: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -556,7 +575,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 31: - syncCommit = reader.readBoolean("syncCommit"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -564,7 +583,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 32: - taskNameHash = reader.readInt("taskNameHash"); + syncCommit = reader.readBoolean("syncCommit"); if (!reader.isLastRead()) return false; @@ -572,7 +591,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 33: - topVer = reader.readMessage("topVer"); + taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) return false; @@ -580,6 +599,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 34: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 35: partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); if (!reader.isLastRead()) @@ -599,7 +626,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 35; + return 36; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index 7fc2b1e..8fe33d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -80,6 +80,9 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa private int taskNameHash; /** TTL for read operation. */ + private long createTtl; + + /** TTL for read operation. */ private long accessTtl; /** @@ -99,6 +102,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa * @param topVer Topology version. * @param subjId Subject ID. * @param taskNameHash Task name hash. + * @param createTtl New TTL to set after entry is created, -1 to leave unchanged. * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. * @param addReader Add reader flag. * @param needVer {@code True} if entry version is needed. @@ -112,6 +116,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa @NotNull AffinityTopologyVersion topVer, UUID subjId, int taskNameHash, + long createTtl, long accessTtl, boolean skipVals, boolean addReader, @@ -127,6 +132,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa this.topVer = topVer; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.createTtl = createTtl; this.accessTtl = accessTtl; this.addDepInfo = addDepInfo; @@ -181,6 +187,13 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa } /** + * @return New TTL to set after entry is created, -1 to leave unchanged. + */ + public long createTtl() { + return createTtl; + } + + /** * @return New TTL to set after entry is accessed, -1 to leave unchanged. */ public long accessTtl() { @@ -266,7 +279,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 4: - flags = reader.readByte("flags"); + createTtl = reader.readLong("createTtl"); if (!reader.isLastRead()) return false; @@ -274,7 +287,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 5: - futId = reader.readLong("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -282,7 +295,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 6: - key = reader.readMessage("key"); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -290,7 +303,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 7: - subjId = reader.readUuid("subjId"); + key = reader.readMessage("key"); if (!reader.isLastRead()) return false; @@ -298,7 +311,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 8: - taskNameHash = reader.readInt("taskNameHash"); + partId = reader.readInt("partId", -1); if (!reader.isLastRead()) return false; @@ -306,7 +319,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 9: - topVer = reader.readMessage("topVer"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -314,7 +327,15 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); case 10: - partId = reader.readInt("partId", -1); + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -348,43 +369,49 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa writer.incrementState(); case 4: - if (!writer.writeByte("flags", flags)) + if (!writer.writeLong("createTtl", createTtl)) return false; writer.incrementState(); case 5: - if (!writer.writeLong("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 6: - if (!writer.writeMessage("key", key)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); case 7: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeMessage("key", key)) return false; writer.incrementState(); case 8: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeInt("partId", partId)) return false; writer.incrementState(); case 9: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 10: - if (!writer.writeInt("partId", partId)) + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); @@ -406,7 +433,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 7ac3295..b3eb755 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -445,6 +445,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> boolean isRead, boolean retval, TransactionIsolation isolation, + long createTtl, long accessTtl ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -455,6 +456,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> isRead, retval, timeout, + createTtl, accessTtl, CU.empty0(), opCtx != null && opCtx.skipStore(), http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 0730300..094c5fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -329,15 +329,20 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final boolean skipVals, final boolean needVer, boolean keepBinary, + final ExpiryPolicy expiryPlc, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { + IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ? + accessPolicy(cacheCtx, keys) : + cacheCtx.cache().expiryPolicy(expiryPlc); + if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, topVer, keys, readThrough, /*deserializeBinary*/false, - accessPolicy(cacheCtx, keys), + expiryPlc0, skipVals, needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { @@ -368,7 +373,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { CU.subjectId(this, cctx), resolveTaskName(), /*deserializeBinary*/false, - accessPolicy(cacheCtx, keys), + expiryPlc0, skipVals, /*can remap*/true, needVer, @@ -399,7 +404,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { CU.subjectId(this, cctx), resolveTaskName(), /*deserializeBinary*/false, - accessPolicy(cacheCtx, keys), + expiryPlc0, skipVals, /*can remap*/true, needVer, @@ -433,6 +438,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { skipVals, keepBinary, needVer, + expiryPlc, c); } } @@ -1161,6 +1167,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @param keys Keys. * @param retval Return value flag. * @param read Read flag. + * @param accessTtl Create ttl. * @param accessTtl Access ttl. * @param <K> Key type. * @param skipStore Skip store flag. @@ -1171,6 +1178,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final Collection<? extends K> keys, boolean retval, boolean read, + long createTtl, long accessTtl, boolean skipStore, boolean keepBinary) { @@ -1205,6 +1213,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { read, retval, isolation, + createTtl, accessTtl, CU.empty0(), skipStore, @@ -1303,6 +1312,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) { + assert optimistic(); + if (accessMap != null) { for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key())) http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 16a35d3..5b44d75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -118,6 +118,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { boolean retval, TransactionIsolation isolation, boolean invalidate, + long createTtl, long accessTtl) { return lockAllAsync(keys, timeout, tx, CU.empty0()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a26d2f3..656b52c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1429,6 +1429,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { boolean retval, TransactionIsolation isolation, boolean invalidate, + long createTtl, long accessTtl) { return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " + "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f665eb8..3043ecc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -399,6 +399,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig boolean skipVals, boolean needVer, boolean keepBinary, + final ExpiryPolicy expiryPlc, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { assert cacheCtx.isLocal() : cacheCtx.name(); @@ -411,7 +412,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } try { - IgniteCacheExpiryPolicy expiryPlc = accessPolicy(cacheCtx, keys); + IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ? + accessPolicy(cacheCtx, keys) : + cacheCtx.cache().expiryPolicy(expiryPlc); Map<KeyCacheObject, GridCacheVersion> misses = null; @@ -436,7 +439,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CU.subjectId(this, cctx), null, resolveTaskName(), - expiryPlc, + expiryPlc0, txEntry == null ? keepBinary : txEntry.keepBinary()); if (res == null) { @@ -1434,6 +1437,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @param skipVals Skip values flag. * @param keepCacheObjects Keep cache objects flag. * @param skipStore Skip store flag. + * @param expiryPlc Expiry policy. * @return Loaded key-value pairs. */ private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed( @@ -1445,7 +1449,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final boolean skipVals, final boolean keepCacheObjects, final boolean skipStore, - final boolean needVer + final boolean needVer, + final ExpiryPolicy expiryPlc ) { if (log.isDebugEnabled()) @@ -1474,6 +1479,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, needReadVer, !deserializeBinary, + expiryPlc, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { if (isRollbackOnly()) { @@ -1598,6 +1604,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig expiryPlc = cacheCtx.expiry(); long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED; + long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED; long timeout = remainingTime(); @@ -1611,8 +1618,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig true, isolation, isInvalidate(), + createTtl, accessTtl); + final ExpiryPolicy expiryPlc0 = expiryPlc; + PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() { @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -1734,7 +1744,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, skipStore, - needVer); + needVer, + expiryPlc0); } return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -1807,7 +1818,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, skipStore, - needVer); + needVer, + expiryPlc); } return new GridFinishedFuture<>(retMap); @@ -2014,7 +2026,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig hasFilters, /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, - keepBinary); + keepBinary, + expiryPlc); } return new GridFinishedFuture<>(); @@ -2183,7 +2196,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig hasFilters, /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, - keepBinary); + keepBinary, + expiryPlc); } return new GridFinishedFuture<>(); @@ -2203,6 +2217,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @param hasFilters {@code True} if filters not empty. * @param readThrough Read through flag. * @param retval Return value flag. + * @param expiryPlc Expiry policy. * @return Load future. */ private IgniteInternalFuture<Void> loadMissing( @@ -2216,7 +2231,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final boolean hasFilters, final boolean readThrough, final boolean retval, - final boolean keepBinary) { + final boolean keepBinary, + final ExpiryPolicy expiryPlc) { GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, @@ -2290,6 +2306,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /*skipVals*/singleRmv, needReadVer, keepBinary, + expiryPlc, c); } @@ -2952,6 +2969,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig retval, isolation, isInvalidate(), + -1L, -1L); PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { @@ -3130,6 +3148,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig retval, isolation, isInvalidate(), + -1L, -1L); PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { @@ -3424,6 +3443,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig retval, isolation, isInvalidate(), + -1L, -1L); PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 9fb3558..f5687a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; import java.util.Map; import javax.cache.Cache; +import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -181,6 +182,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param skipVals Skip values flag. * @param needVer If {@code true} version is required for loaded values. * @param c Closure to be applied for loaded values. + * @param expiryPlc Expiry policy. * @return Future with {@code True} value if loading took place. */ public IgniteInternalFuture<Void> loadMissing( @@ -192,5 +194,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { boolean skipVals, boolean needVer, boolean keepBinary, + final ExpiryPolicy expiryPlc, GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c); } http://git-wip-us.apache.org/repos/asf/ignite/blob/28d66dbc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java index 58e6b02..1f6ec2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyWithStoreAbstractTest.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.processors.cache.expiry; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.cache.configuration.Factory; import javax.cache.expiry.Duration; @@ -27,6 +30,7 @@ import javax.cache.integration.CompletionListenerFuture; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.store.CacheStore; @@ -38,6 +42,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; /** * @@ -179,38 +186,93 @@ public abstract class IgniteCacheExpiryPolicyWithStoreAbstractTest extends Ignit * @throws Exception If failed. */ public void testGetReadThrough() throws Exception { + getReadThrough(false, null, null); + getReadThrough(true, null, null); + } + + /** + * @throws Exception If failed. + */ + protected void getReadThrough(boolean withExcPlc, + TransactionConcurrency txConcurrency, + TransactionIsolation txIsolation) throws Exception { IgniteCache<Integer, Integer> cache = jcache(0); - List<Integer> keys = new ArrayList<>(); + if (withExcPlc) + cache = cache.withExpiryPolicy(new ExpiryPolicy() { + @Override public Duration getExpiryForCreation() { + return new Duration(TimeUnit.MILLISECONDS, 501); + } + + @Override public Duration getExpiryForAccess() { + return new Duration(TimeUnit.MILLISECONDS, 601); + } + + @Override public Duration getExpiryForUpdate() { + return new Duration(TimeUnit.MILLISECONDS, 701); + } + }); - keys.add(primaryKeys(cache, 1, 100_000).get(0)); - // TODO https://issues.apache.org/jira/browse/IGNITE-3699 - // TODO: test 'get' inside transactions, 'get' for cache.withAsyncPolicy. - //keys.add(backupKeys(cache, 1, 100_000).get(0)); - //keys.add(nearKeys(cache, 1, 100_000).get(0)); + Integer prim = primaryKeys(cache, 1, 1000).get(0); + Integer back = backupKeys(cache, 1, 1000).get(0); + Integer near = nearKeys(cache, 1, 1000).get(0); + + Set<Integer> prims = new HashSet<>(primaryKeys(cache, 10, prim + 1)); + Set<Integer> backs = new HashSet<>(backupKeys(cache, 10, back + 1)); + Set<Integer> nears = new HashSet<>(nearKeys(cache, 10, near + 1)); + + Set<Integer> keys = new HashSet<>(); + + keys.add(prim); + keys.add(back); + keys.add(near); + + keys.addAll(prims); + keys.addAll(backs); + keys.addAll(nears); for (Integer key : keys) - storeMap.put(key, 100); + storeMap.put(key, key); + + IgniteTransactions transactions = grid(0).transactions(); + + Transaction tx = txConcurrency != null ? transactions.txStart(txConcurrency, txIsolation) : null; try { - for (Integer key : keys) { - Integer res = cache.get(key); + Collection<Integer> singleKeys = new HashSet<>(); - assertEquals((Integer)100, res); + singleKeys.add(prim); + singleKeys.add(back); + singleKeys.add(near); - checkTtl(key, 500, true); + assertEquals(3, singleKeys.size()); - assertEquals((Integer)100, res); - } + for (Integer key : singleKeys) + assertEquals(key, cache.get(key)); - U.sleep(600); + Map<Integer, Integer> res = new HashMap<>(); + + res.putAll(cache.getAll(prims)); + res.putAll(cache.getAll(backs)); + res.putAll(cache.getAll(nears)); - for (Integer key : keys) - checkExpired(key); + assertEquals(30, res.size()); + + for (Map.Entry<Integer, Integer> e : res.entrySet()) + assertEquals(e.getKey(), e.getValue()); } finally { - cache.removeAll(); + if (tx != null) + tx.rollback(); } + + for (Integer key : keys) + checkTtl(key, withExcPlc ? 501 : 500, true); + + U.sleep(600); + + for (Integer key : keys) + checkExpired(key); } /**
