# ignite-283: It works for ATOMIC cache!!!
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e09ff861 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e09ff861 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e09ff861 Branch: refs/heads/ignite-283 Commit: e09ff8611bae9da8dadef82d30b8ac3af354d712 Parents: 857ed95 Author: vozerov-gridgain <[email protected]> Authored: Wed Feb 18 16:39:34 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Feb 18 16:39:34 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 62 +++---- .../cache/GridCacheMapEntryOperationExpiry.java | 161 +++++++++++++++++++ .../dht/atomic/GridDhtAtomicCache.java | 3 - .../processors/cache/GridCacheTestEntryEx.java | 1 - 4 files changed, 193 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e09ff861/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 8314b0c..8427f3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1552,14 +1552,17 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean verCheck, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter, GridDrType drType, - long conflictTtl, - long conflictExpireTime, + long explicitTtl, + long explicitExpireTime, @Nullable GridCacheVersion conflictVer, boolean conflictResolve, boolean intercept, @Nullable UUID subjId, String taskName ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { + GridCacheMapEntryOperationExpiry expiry0 = + new GridCacheMapEntryOperationExpiry(expiryPlc, explicitTtl, explicitExpireTime); + assert cctx.atomic(); boolean res = true; @@ -1604,14 +1607,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> valBytes = null; } - // Get TTL and expire time (no special-purpose TTL values can be set for conflict). - assert conflictTtl != CU.TTL_ZERO && conflictTtl != CU.TTL_NOT_CHANGED && conflictTtl >= 0; - assert conflictExpireTime != CU.EXPIRE_TIME_CALCULATE && conflictExpireTime >= 0; + GridTuple3<Long, Long, Boolean> expiration = expiry0.ttlAndExpireTime(this); // Prepare old and new entries for conflict resolution. GridCacheVersionedEntryEx<K, V> oldEntry = versionedEntry(); GridCacheVersionedEntryEx<K, V> newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj, - conflictTtl, conflictExpireTime, conflictVer); + expiration.get1(), expiration.get2(), conflictVer != null ? conflictVer : newVer); // Resolve conflict. conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); @@ -1620,13 +1621,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Use old value? if (conflictCtx.isUseOld()) { + GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; + // Handle special case with atomic comparator. - if (!isNew() && // Not initial value, - verCheck && // and atomic version check, - oldConflictVer.dataCenterId() == conflictVer.dataCenterId() && // and data centers are equal, - ATOMIC_VER_COMPARATOR.compare(oldConflictVer, conflictVer) == 0 && // and both versions are equal, - cctx.writeThrough() && // and store is enabled, - primary) // and we are primary. + if (!isNew() && // Not initial value, + verCheck && // and atomic version check, + oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, + ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal, + cctx.writeThrough() && // and store is enabled, + primary) // and we are primary. { V val = rawGetOrUnmarshalUnlocked(false); @@ -1664,9 +1667,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Update value is known at this point, so update operation type. op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; } - - newTtl = conflictCtx.ttl(); - newExpireTime = conflictCtx.expireTime(); } else // Nullify conflict version on this update, so that we will use regular version during next updates. @@ -1732,20 +1732,18 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> oldVal = (V)cctx.kernalContext().portable().detachPortable(oldVal); // Calculate initial TTL and expire time. - long initTtl = 0; - long initExpireTime = 0; + long initTtl; + long initExpireTime; - if (expiryPlc != null && oldVal != null) { - initTtl = expiryPlc.forCreate(); + if (expiry0.hasExpiry() && oldVal != null) { + IgniteBiTuple<Long, Long> initTtlAndExpireTime = expiry0.initialTtlAndExpireTime(); - if (initTtl == CU.TTL_ZERO) { - initTtl = CU.TTL_MINIMUM; - initExpireTime = CU.expireTimeInPast(); - } - else if (initTtl == CU.TTL_NOT_CHANGED) - initTtl = CU.TTL_ETERNAL; - else - initExpireTime = CU.toExpireTime(initTtl); + initTtl = initTtlAndExpireTime.get1(); + initExpireTime = initTtlAndExpireTime.get2(); + } + else { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; } if (oldVal != null) @@ -1852,10 +1850,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> // Conflict context is null if there were no explicit conflict resolution. if (conflictCtx == null) { // Calculate TTL and expire time for local update. - if (conflictTtl != CU.TTL_NOT_CHANGED) { + if (explicitTtl != CU.TTL_NOT_CHANGED) { // TTL/expireTime was sent to us from node where conflict had been resolved. - newTtl = conflictTtl; - newExpireTime = conflictExpireTime; + newTtl = explicitTtl; + newExpireTime = explicitExpireTime; } else { if (expiryPlc != null) @@ -1881,6 +1879,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> newExpireTime = CU.toExpireTime(newTtl); } } + else { + newTtl = conflictCtx.ttl(); + newExpireTime = conflictCtx.expireTime(); + } } else { assert op == GridCacheOperation.DELETE; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e09ff861/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryOperationExpiry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryOperationExpiry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryOperationExpiry.java new file mode 100644 index 0000000..592d846 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryOperationExpiry.java @@ -0,0 +1,161 @@ +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import javax.cache.expiry.*; + +/** + * + */ +public class GridCacheMapEntryOperationExpiry { + /** */ + private final IgniteCacheExpiryPolicy expiry; + + /** */ + private final long explicitTtl; + + /** */ + private final long explicitExpireTime; + + /** + * + * @param expiry + * @param explicitTtl + * @param explicitExpireTime + */ + public GridCacheMapEntryOperationExpiry(IgniteCacheExpiryPolicy expiry, long explicitTtl, long explicitExpireTime) { + this.expiry = expiry; + this.explicitTtl = explicitTtl; + this.explicitExpireTime = explicitExpireTime; + } + + /** + * + * @return + */ + public boolean hasExpiry() { + return expiry != null; + } + + /** + * + * @return + */ + public IgniteCacheExpiryPolicy expiry() { + return expiry; + } + + /** + * + * @return + */ + public boolean hasExplicitTtl() { + return explicitTtl != CU.TTL_NOT_CHANGED; + } + + /** + * + * @return + */ + public long explicitTtl() { + return explicitTtl; + } + + /** + * + * @return + */ + public boolean hasExplicitExpireTime() { + return explicitExpireTime != CU.EXPIRE_TIME_CALCULATE; + } + + /** + * + * @return + */ + public long explicitExpireTime() { + return explicitExpireTime; + } + + /** + * @return + */ + public IgniteBiTuple<Long, Long> initialTtlAndExpireTime() { + assert hasExpiry(); + + long initTtl = expiry().forCreate(); + long initExpireTime; + + if (initTtl == CU.TTL_ZERO) { + initTtl = CU.TTL_MINIMUM; + initExpireTime = CU.expireTimeInPast(); + } + else if (initTtl == CU.TTL_NOT_CHANGED) { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; + } + else + initExpireTime = CU.toExpireTime(initTtl); + + return F.t(initTtl, initExpireTime); + } + + /** + * + * @param entry + * @return + */ + public GridTuple3<Long, Long, Boolean> ttlAndExpireTime(GridCacheMapEntry entry) { + long ttl; + long expireTime; + boolean rmv; + + if (hasExplicitTtl()) { + // TTL is set explicitly. + assert explicitTtl != CU.TTL_NOT_CHANGED && explicitTtl != CU.TTL_MINIMUM && explicitTtl >= 0L; + + ttl = explicitTtl; + expireTime = hasExplicitExpireTime() ? explicitExpireTime : CU.toExpireTime(explicitTtl); + rmv = false; + } + else { + // Need to calculate TTL. + if (hasExpiry()) { + // Expiry exists. + long sysTtl = entry.hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate(); + + if (sysTtl == CU.TTL_ZERO) { + // Entry must be expired immediately. + ttl = CU.TTL_MINIMUM; + expireTime = CU.expireTimeInPast(); + rmv = true; + } + else if (sysTtl == CU.TTL_NOT_CHANGED) { + // TTL is not changed. + ttl = entry.ttlExtras(); + expireTime = CU.toExpireTime(ttl); + rmv = false; + } + else { + // TTL is changed. + assert sysTtl >= 0; + + ttl = sysTtl; + expireTime = CU.toExpireTime(ttl); + rmv = false; + } + } + else { + // No expiry, entry is immortal. + ttl = CU.TTL_ETERNAL; + expireTime = CU.EXPIRE_TIME_ETERNAL; + rmv = false; + } + } + + return F.t(ttl, expireTime, rmv); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e09ff861/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 420b6f7..d8754d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1653,9 +1653,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; - if (newConflictVer == null) - newConflictVer = ver; - boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), req.topologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e09ff861/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 28e0d76..b5e226f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -490,7 +490,6 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme rawPut((V)val, 0), (V)val, null, - null, 0L, 0L, null,
