# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2a51321 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2a51321 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2a51321 Branch: refs/heads/ignite-1 Commit: c2a513218cb832390bdc6b228eaddb10af2c27a4 Parents: 7f1b3f0 Author: sboikov <[email protected]> Authored: Mon Dec 15 10:29:48 2014 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 15 17:45:23 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 94 +++++---- .../processors/cache/GridCacheAdapter.java | 71 +++++-- .../processors/cache/GridCacheContext.java | 20 ++ .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 138 +++++++++++--- .../processors/cache/GridCacheProcessor.java | 2 +- .../processors/cache/GridCacheProjectionEx.java | 12 ++ .../cache/GridCacheProjectionImpl.java | 83 ++++++-- .../processors/cache/GridCacheProxyImpl.java | 11 ++ .../distributed/GridCacheExpiryPolicy.java | 88 +++++++++ .../dht/atomic/GridDhtAtomicCache.java | 59 ++++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 27 ++- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 30 +-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 17 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 37 ++-- .../atomic/GridNearAtomicUpdateResponse.java | 41 +--- .../distributed/near/GridNearAtomicCache.java | 19 +- .../processors/cache/IgniteCacheTest.java | 2 + .../expiry/IgniteCacheExpiryPolicyTest.java | 191 +++++++++++++++---- .../processors/cache/GridCacheTestEntryEx.java | 28 ++- 20 files changed, 749 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 5971d69..f1d27f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -13,7 +13,6 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.tostring.*; @@ -36,25 +35,32 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** */ private static final long serialVersionUID = 0L; - /** Context. */ + /** */ private GridCacheContext<K, V> ctx; /** Gateway. */ private GridCacheGateway<K, V> gate; - /** Cache. */ + /** Delegate. */ @GridToStringInclude - private GridCacheAdapter<K, V> delegate; + private GridCacheProjectionEx<K, V> delegate; + + /** Projection. */ + private GridCacheProjectionImpl<K, V> prj; /** * @param delegate Delegate. + * @param prj Projection. */ - public IgniteCacheProxy(GridCacheAdapter<K, V> delegate) { + public IgniteCacheProxy(GridCacheContext<K, V> ctx, + GridCacheProjectionEx<K, V> delegate, + @Nullable GridCacheProjectionImpl<K, V> prj) { + assert ctx != null; assert delegate != null; + this.ctx = ctx; this.delegate = delegate; - - ctx = delegate.context(); + this.prj = prj; gate = ctx.gate(); } @@ -73,8 +79,16 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { - // TODO IGNITE-1. - throw new UnsupportedOperationException(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); + + return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ @@ -92,7 +106,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.putIfAbsent(key, val); @@ -126,7 +140,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean isLocked(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.isLocked(key); @@ -138,7 +152,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean isLockedByThread(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.isLockedByThread(key); @@ -162,7 +176,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { delegate.evictAll(keys); @@ -175,13 +189,23 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { // TODO IGNITE-1. - throw new UnsupportedOperationException(); + if (peekModes.length != 0) + throw new UnsupportedOperationException(); + + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.peek(key); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { delegate.promoteAll(keys); @@ -216,7 +240,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public V get(K key) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.get(key); @@ -233,7 +257,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.getAll(keys); @@ -264,10 +288,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void put(K key, V val) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.putx(key, val, null); + delegate.putx(key, val); } finally { gate.leave(prev); @@ -281,7 +305,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.put(key, val); @@ -298,10 +322,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.putAll(map, null); + delegate.putAll(map); } finally { gate.leave(prev); @@ -315,7 +339,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.putxIfAbsent(key, val); @@ -332,7 +356,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean remove(K key) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.removex(key); @@ -349,7 +373,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.remove(key, oldVal); @@ -366,10 +390,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public V getAndRemove(K key) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.remove(key, (GridCacheEntryEx<K, V>)null); + return delegate.remove(key); } finally { gate.leave(prev); @@ -383,7 +407,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.replace(key, oldVal, newVal); @@ -400,7 +424,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.replacex(key, val); @@ -417,7 +441,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { return delegate.replace(key, val); @@ -434,7 +458,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { try { - GridCacheProjectionImpl<K, V> prev = gate.enter(null); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { delegate.removeAll(keys); @@ -581,15 +605,17 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); out.writeObject(delegate); + out.writeObject(prj); } /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - delegate = (GridCacheAdapter<K, V>)in.readObject(); - - ctx = delegate.context(); + ctx = (GridCacheContext<K, V>)in.readObject(); + delegate = (GridCacheProjectionEx<K, V>)in.readObject(); + prj = (GridCacheProjectionImpl<K, V>)in.readObject(); gate = ctx.gate(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index bf40a85..559949f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -40,6 +40,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -52,8 +53,6 @@ import static org.gridgain.grid.cache.GridCacheFlag.*; import static org.gridgain.grid.cache.GridCachePeekMode.*; import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; import static org.gridgain.grid.cache.GridCacheTxIsolation.*; -import static org.gridgain.grid.cache.GridCacheTxState.*; -import static org.apache.ignite.events.IgniteEventType.*; import static org.gridgain.grid.kernal.GridClosureCallMode.*; import static org.gridgain.grid.kernal.processors.dr.GridDrType.*; import static org.gridgain.grid.kernal.processors.task.GridTaskThreadContextKey.*; @@ -367,8 +366,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im /** {@inheritDoc} */ @Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) { - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, null, - null, subjId, false); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + ctx, + null, + null, + null, + subjId, + false, + null); return new GridCacheProxyImpl<>(ctx, prj, prj); } @@ -378,8 +383,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im if (F.isEmpty(flags)) return this; - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, null, - EnumSet.copyOf(F.asList(flags)), null, false); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + ctx, + null, + null, + EnumSet.copyOf(F.asList(flags)), + null, + false, + null); return new GridCacheProxyImpl<>(ctx, prj, prj); } @@ -398,12 +409,31 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im null, null, null, - ctx.portableEnabled()); + ctx.portableEnabled(), + null); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); } /** {@inheritDoc} */ + @Nullable @Override public ExpiryPolicy expiry() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) { + return new GridCacheProjectionImpl<>( + this, + ctx, + null, + null, + null, + null, + ctx.portableEnabled(), + plc); + } + + /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "RedundantCast"}) @Override public <K1, V1> GridCacheProjection<K1, V1> projection( Class<? super K1> keyType, @@ -423,8 +453,13 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((GridCacheProjection<K1, V1>)this, - (GridCacheContext<K1, V1>)ctx, CU.<K1, V1>typeFilter(keyType, valType), /*filter*/null, /*flags*/null, - /*clientId*/null, false); + (GridCacheContext<K1, V1>)ctx, + CU.<K1, V1>typeFilter(keyType, valType), + /*filter*/null, + /*flags*/null, + /*clientId*/null, + false, + null); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj); } @@ -443,7 +478,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } } - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, p, null, null, null, false); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + ctx, + p, + null, + null, + null, + false, + null); return new GridCacheProxyImpl<>(ctx, prj, prj); } @@ -463,7 +505,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im } GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>( - this, ctx, null, filter, null, null, false); + this, + ctx, + null, + filter, + null, + null, + false, + null); return new GridCacheProxyImpl<>(ctx, prj, prj); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index c6cb355..58fd1cd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -46,6 +46,8 @@ import org.gridgain.grid.util.offheap.unsafe.*; import org.gridgain.grid.util.tostring.*; import org.jetbrains.annotations.*; +import javax.cache.configuration.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -170,6 +172,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache ID. */ private int cacheId; + /** */ + private ExpiryPolicy expiryPlc; + /** * Empty constructor required for {@link Externalizable}. */ @@ -275,6 +280,20 @@ public class GridCacheContext<K, V> implements Externalizable { } else cacheId = 1; + + Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory(); + + expiryPlc = factory.create(); + + if (expiryPlc instanceof EternalExpiryPolicy) + expiryPlc = null; + } + + /** + * @return Cache default {@link ExpiryPolicy}. + */ + @Nullable public ExpiryPolicy expiry() { + return expiryPlc; } /** @@ -1054,6 +1073,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * Gets thread local projection. + * * @return Projection per call. */ public GridCacheProjectionImpl<K, V> projectionPerCall() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index e94016f..fc8aaaa 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -18,6 +18,7 @@ import org.gridgain.grid.kernal.processors.dr.*; import org.gridgain.grid.util.lang.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.util.*; /** @@ -390,7 +391,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { * @param valBytes Value bytes. Can be non-null only if operation is UPDATE. * @param writeThrough Write through flag. * @param retval Return value flag. - * @param ttl Time to live. + * @param expiryPlc Expiry policy. * @param evt Event flag. * @param metrics Metrics update flag. * @param primary If update is performed on primary node (the one which assigns version). @@ -422,7 +423,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - long ttl, + @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 32c1485..b723d71 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -27,6 +27,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import sun.misc.*; +import javax.cache.expiry.*; import java.io.*; import java.nio.*; import java.util.*; @@ -1615,6 +1616,38 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return new IgniteBiTuple<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old)); } + /** + * @param expiryPlc Expiry policy. + * @param isNew {@code True} if entry is new. + * @return TTL. + */ + private static long ttlFromPolicy(@Nullable ExpiryPolicy expiryPlc, boolean isNew) { + if (expiryPlc == null) + return -1L; + + Duration duration = isNew ? expiryPlc.getExpiryForCreation() : expiryPlc.getExpiryForUpdate(); + + return toTtl(duration); + } + + private static long toTtl(Duration duration) { + if (duration == null) + return -1; + + if (duration.getDurationAmount() == 0) { + if (duration.isEternal()) + return 0; + + assert duration.isZero(); + + return 1L; + } + + assert duration.getTimeUnit() != null; + + return duration.getTimeUnit().toMillis(duration.getDurationAmount()); + } + /** {@inheritDoc} */ @Override public GridCacheUpdateAtomicResult<K, V> innerUpdate( GridCacheVersion newVer, @@ -1625,7 +1658,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - long ttl, + @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, @@ -1668,7 +1701,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> Object transformClo = null; if (drResolve) { - drRes = cctx.dr().resolveAtomic(this, op, writeObj, valBytes, ttl, drTtl, drExpireTime, drVer); + drRes = cctx.dr().resolveAtomic(this, + op, + writeObj, + valBytes, + ttlFromPolicy(expiryPlc, isNew()), + drTtl, + drExpireTime, + drVer); if (drRes != null) { if (drRes.isUseOld()) { @@ -1730,26 +1770,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']'; } - if (drRes == null) { - // Calculate TTL and expire time for local update. - if (drTtl >= 0L) { - assert drExpireTime >= 0L; - - newTtl = drTtl; - newExpireTime = drExpireTime; - } - else { - assert drExpireTime == -1L; - - newTtl = ttl; - - if (newTtl < 0) - newTtl = ttlExtras(); - - newExpireTime = toExpireTime(newTtl); - } - } - // Possibly get old value form store. old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; @@ -1777,7 +1797,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (!F.isEmptyOrNulls(filter)) { boolean pass = cctx.isAll(wrapFilterLocked(), filter); - if (!pass) + if (!pass) { + if (!isNew() && expiryPlc != null) { + Duration duration = expiryPlc.getExpiryForAccess(); + + if (duration != null) + updateTtl(toTtl(duration)); + } + return new GridCacheUpdateAtomicResult<>(false, retval ? old : null, null, @@ -1786,6 +1813,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> null, null, false); + } } // Apply metrics. @@ -1841,6 +1869,46 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } + if (drRes == null) { + // Calculate TTL and expire time for local update. + if (drTtl >= 0L) { + assert drExpireTime >= 0L; + + newTtl = drTtl; + newExpireTime = drExpireTime; + } + else { + assert drExpireTime == -1L; + + if (expiryPlc != null) { + if (!hadVal) { + Duration duration = expiryPlc.getExpiryForCreation(); + + if (duration != null && duration.isZero()) + return new GridCacheUpdateAtomicResult<>(false, + retval ? old : null, + null, + 0L, + -1L, + null, + null, + false); + + newTtl = toTtl(duration); + } + else + newTtl = toTtl(expiryPlc.getExpiryForUpdate()); + } + else + newTtl = -1L; + + if (newTtl < 0) + newTtl = ttlExtras(); + + newExpireTime = toExpireTime(newTtl); + } + } + // Try write-through. if (writeThrough) // Must persist inside synchronization in non-tx mode. @@ -1859,7 +1927,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> assert !deletedUnlocked() : "Invalid entry [entry=" + this + ", locNodeId=" + cctx.localNodeId() + ']'; - // Do not change size; + // Do not change size. } if (cctx.portableEnabled()) @@ -2393,6 +2461,28 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** + * @param ttl Time to live. + */ + private void updateTtl(long ttl) { + assert Thread.holdsLock(this); + + if (ttl == -1L) + return; + + long expireTime = toExpireTime(ttl); + + long oldExpireTime = expireTimeExtras(); + + if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl()) + cctx.ttl().removeTrackedEntry(this); + + ttlAndExpireTimeExtras(ttl, expireTime); + + if (expireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl()) + cctx.ttl().addTrackedEntry(this); + } + + /** * @return {@code true} If value bytes should be stored. */ protected boolean isStoreValueBytes() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index a083805..5dd98b3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -1596,7 +1596,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache == null) throw new IllegalArgumentException("Cache is not configured: " + name); - return new IgniteCacheProxy<>(cache); + return new IgniteCacheProxy<>(cache.context(), cache, null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java index d8fbdfe..2544eb9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java @@ -16,6 +16,7 @@ import org.gridgain.grid.cache.store.*; import org.gridgain.grid.kernal.processors.cache.dr.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.util.*; /** @@ -380,4 +381,15 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> { * @return Primary entry set. */ public Set<GridCacheEntry<K, V>> primaryEntrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter); + + /** + * @return {@link ExpiryPolicy} associated with this projection. + */ + public @Nullable ExpiryPolicy expiry(); + + /** + * @param plc {@link ExpiryPolicy} to associate with this projection. + * @return New projection based on this one, but with the specified expiry policy. + */ + public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java index 67eb9e8..9a73e64 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java @@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -73,6 +74,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** */ private boolean keepPortable; + /** */ + private ExpiryPolicy expiryPlc; + /** * Empty constructor required for {@link Externalizable}. */ @@ -95,7 +99,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V @Nullable IgnitePredicate<? super GridCacheEntry<K, V>> entryFilter, @Nullable Set<GridCacheFlag> flags, @Nullable UUID subjId, - boolean keepPortable) { + boolean keepPortable, + @Nullable ExpiryPolicy expiryPlc) { assert parent != null; assert cctx != null; @@ -125,6 +130,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V qry = new GridCacheQueriesImpl<>(cctx, this); this.keepPortable = keepPortable; + + this.expiryPlc = expiryPlc; } /** @@ -367,8 +374,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V @Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) { A.notNull(subjId, "subjId"); - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, flags, subjId, keepPortable); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + cctx, + noNullKvFilter.kvFilter, + noNullEntryFilter.entryFilter, + flags, + subjId, + keepPortable, + expiryPlc); return new GridCacheProxyImpl<>(cctx, prj, prj); } @@ -415,7 +428,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V (IgnitePredicate<GridCacheEntry>)noNullEntryFilter.entryFilter, flags, subjId, - keepPortable); + keepPortable, + expiryPlc); return new GridCacheProxyImpl((GridCacheContext<K1, V1>)cctx, prj, prj); } @@ -439,8 +453,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } } - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, kvFilter, - noNullEntryFilter.entryFilter, flags, subjId, keepPortable); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + cctx, + kvFilter, + noNullEntryFilter.entryFilter, + flags, + subjId, + keepPortable, + expiryPlc); return new GridCacheProxyImpl<>(cctx, prj, prj); } @@ -463,8 +483,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } } - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter, - filter, flags, subjId, keepPortable); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + cctx, + noNullKvFilter.kvFilter, + filter, + flags, + subjId, + keepPortable, + expiryPlc); return new GridCacheProxyImpl<>(cctx, prj, prj); } @@ -482,8 +508,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V res.addAll(EnumSet.copyOf(F.asList(flags))); - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, res, subjId, keepPortable); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + cctx, + noNullKvFilter.kvFilter, + noNullEntryFilter.entryFilter, + res, + subjId, + keepPortable, + expiryPlc); return new GridCacheProxyImpl<>(cctx, prj, prj); } @@ -500,8 +532,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V res.removeAll(EnumSet.copyOf(F.asList(flags))); - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, res, subjId, keepPortable); + GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, + cctx, + noNullKvFilter.kvFilter, + noNullEntryFilter.entryFilter, + res, + subjId, + keepPortable, + expiryPlc); return new GridCacheProxyImpl<>(cctx, prj, prj); } @@ -516,7 +554,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V (IgnitePredicate<GridCacheEntry>)noNullEntryFilter.entryFilter, flags, subjId, - true); + true, + expiryPlc); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)cctx, prj, prj); } @@ -1242,6 +1281,24 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public @Nullable ExpiryPolicy expiry() { + return expiryPlc; + } + + /** {@inheritDoc} */ + @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) { + return new GridCacheProjectionImpl<>( + this, + cctx, + noNullKvFilter.kvFilter, + noNullEntryFilter.entryFilter, + flags, + subjId, + true, + plc); + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(cctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index 8b6ade8..38cc2ca 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -24,6 +24,7 @@ import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -1879,6 +1880,16 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Nullable @Override public ExpiryPolicy expiry() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheProxyImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java new file mode 100644 index 0000000..f7fe27a --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java @@ -0,0 +1,88 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.distributed; + +import javax.cache.expiry.*; +import java.io.*; +import java.util.concurrent.*; + +/** + * + */ +public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { + /** */ + private ExpiryPolicy plc; + + /** */ + private static final byte CREATE_TTL_MASK = 0x01; + + /** */ + private static final byte UPDATE_TTL_MASK = 0x02; + + /** */ + private Duration forCreate; + + /** */ + private Duration forUpdate; + + /** + * @param plc Expiry policy. + */ + public GridCacheExpiryPolicy(ExpiryPolicy plc) { + assert plc != null; + + this.plc = plc; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return forCreate; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return forUpdate; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + byte flags = 0; + + Duration create = plc.getExpiryForCreation(); + + if (create != null) + flags |= CREATE_TTL_MASK; + + Duration update = plc.getExpiryForUpdate(); + + if (update != null) + flags |= UPDATE_TTL_MASK; + + out.writeByte(flags); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + byte flags = in.readByte(); + + if ((flags & CREATE_TTL_MASK) != 0) + forCreate = new Duration(TimeUnit.MILLISECONDS, in.readLong()); + + if ((flags & UPDATE_TTL_MASK) != 0) + forUpdate = new Duration(TimeUnit.MILLISECONDS, in.readLong()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4991adb..fd2d98d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -33,6 +33,7 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import sun.misc.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -594,7 +595,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(GridSecurityPermission.CACHE_PUT); - UUID subjId = ctx.subjectIdPerCall(null); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41. int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); @@ -611,7 +614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retval, rawRetval, cached, - ttl, + prj != null ? prj.expiry() : null, filter, subjId, taskNameHash); @@ -651,7 +654,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE); - UUID subjId = ctx.subjectIdPerCall(null); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41. int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); @@ -667,7 +672,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retval, rawRetval, cached, - 0, + (filter != null && prj != null) ? prj.expiry() : null, filter, subjId, taskNameHash); @@ -897,7 +902,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate = ctx.isDrEnabled(); - if (storeEnabled() && keys.size() > 1 && ctx.dr().receiveEnabled()) { + if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) { // This method can only be used when there are no replicated entries in the batch. UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, req, res, locked, ver, dhtFut, completionCb, replicate, taskName); @@ -1023,6 +1028,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (!checkFilter(entry, req, res)) { + // TODO IGNITE-41 update TTL. + if (log.isDebugEnabled()) log.debug("Entry did not pass the filter (will skip write) [entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ", res=" + res + ']'); @@ -1284,6 +1291,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; + ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); + // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { K k = keys.get(i); @@ -1331,7 +1340,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { newValBytes, primary && storeEnabled(), req.returnValue(), - req.ttl(), + expiry, true, true, primary, @@ -1372,14 +1381,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { transformC = (IgniteClosure<V, V>)writeVal; if (!readersOnly) - dhtFut.addWriteEntry(entry, updRes.newValue(), newValBytes, transformC, - drExpireTime >= 0L ? ttl : -1L, drExpireTime, newDrVer, drExpireTime < 0L ? ttl : 0L); + dhtFut.addWriteEntry(entry, + updRes.newValue(), + newValBytes, + transformC, + drExpireTime >= 0L ? ttl : -1L, + drExpireTime, + newDrVer, + drExpireTime < 0L ? req.expiry() : null); if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), newValBytes, - transformC, drExpireTime < 0L ? ttl : 0L); + transformC, drExpireTime < 0L ? req.expiry() : null); } else { + // TODO IGNITE-41 ttl could be changed. + if (log.isDebugEnabled()) log.debug("Entry did not pass the filter or conflict resolution (will skip write) " + "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); @@ -1391,7 +1408,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { GridDrResolveResult<V> ctx = updRes.drResolveResult(); - res.nearTtl(updRes.newTtl()); + // TODO IGNITE-41 dr ttl for near cache. if (ctx != null && ctx.isMerge()) newValBytes = null; @@ -1524,6 +1541,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; + ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); + // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry<K, V> entry = entries.get(i); @@ -1562,12 +1581,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - req.ttl(), + expiry, true, true, primary, ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. - req.filter(), + req.filter(), // TODO IGNITE-41 filter already evaluated? replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, -1L, -1L, @@ -1605,11 +1624,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key()); if (!batchRes.readersOnly()) - dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.ttl()); + dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.expiry()); if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, valBytes, transformC, - req.ttl()); + req.expiry()); } if (hasNear) { @@ -1625,8 +1644,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearValue(idx, writeVal, valBytes); } - res.nearTtl(req.ttl()); - if (writeVal != null || !entry.valueBytes().isNull()) { IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); @@ -1861,9 +1878,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drPutVals = new ArrayList<>(size); for (int i = 0; i < size; i++) { - Long ttl = req.drTtl(i); + long ttl = req.drTtl(i); - if (ttl == null) + if (ttl == -1L) drPutVals.add(new GridCacheDrInfo<>(req.value(i), req.drVersion(i))); else drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), req.drVersion(i), ttl, @@ -1894,7 +1911,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.returnValue(), false, null, - req.ttl(), + req.expiry(), req.filter(), req.subjectId(), req.taskNameHash()); @@ -2020,6 +2037,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); + for (int i = 0; i < req.size(); i++) { K key = req.key(i); @@ -2048,7 +2067,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { valBytes, /*write-through*/false, /*retval*/false, - req.ttl(), + expiry, /*event*/true, /*metrics*/true, /*primary*/false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 982c777..3c7da7b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -23,6 +23,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -201,10 +202,16 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param drTtl DR TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). - * @param ttl Time to live. + * @param expiryPlc Expiry policy. */ - public void addWriteEntry(GridDhtCacheEntry<K, V> entry, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure<V, V> transformC, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer, long ttl) { + public void addWriteEntry(GridDhtCacheEntry<K, V> entry, + @Nullable V val, + @Nullable byte[] valBytes, + IgniteClosure<V, V> transformC, + long drTtl, + long drExpireTime, + @Nullable GridCacheVersion drVer, + @Nullable ExpiryPolicy expiryPlc) { long topVer = updateReq.topologyVersion(); Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -230,7 +237,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> writeVer, syncMode, topVer, - ttl, + expiryPlc, forceTransformBackups, this.updateReq.subjectId(), this.updateReq.taskNameHash()); @@ -249,10 +256,14 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> * @param entry Entry. * @param val Value. * @param valBytes Value bytes. - * @param ttl Time to live. + * @param expiryPlc Expiry policy.. */ - public void addNearWriteEntries(Iterable<UUID> readers, GridDhtCacheEntry<K, V> entry, @Nullable V val, - @Nullable byte[] valBytes, IgniteClosure<V, V> transformC, long ttl) { + public void addNearWriteEntries(Iterable<UUID> readers, + GridDhtCacheEntry<K, V> entry, + @Nullable V val, + @Nullable byte[] valBytes, + IgniteClosure<V, V> transformC, + @Nullable ExpiryPolicy expiryPlc) { GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); keys.add(entry.key()); @@ -276,7 +287,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void> writeVer, syncMode, topVer, - ttl, + expiryPlc, forceTransformBackups, this.updateReq.subjectId(), this.updateReq.taskNameHash()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index ad3f8da..c3b0918 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -20,6 +20,7 @@ import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.nio.*; import java.util.*; @@ -79,8 +80,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp /** Write synchronization mode. */ private GridCacheWriteSynchronizationMode syncMode; - /** Time to live. */ - private long ttl; + /** Expiry policy. */ + private ExpiryPolicy expiryPlc; + + /** Expiry policy bytes. */ + private byte[] expiryPlcBytes; /** Keys to update. */ @GridToStringInclude @@ -150,7 +154,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param writeVer Write version for cache values. * @param syncMode Cache write synchronization mode. * @param topVer Topology version. - * @param ttl Time to live. + * @param expiryPlc Expiry policy. * @param forceTransformBackups Force transform backups flag. * @param subjId Subject ID. */ @@ -161,7 +165,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp GridCacheVersion writeVer, GridCacheWriteSynchronizationMode syncMode, long topVer, - long ttl, + @Nullable ExpiryPolicy expiryPlc, boolean forceTransformBackups, UUID subjId, int taskNameHash @@ -171,7 +175,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp this.futVer = futVer; this.writeVer = writeVer; this.syncMode = syncMode; - this.ttl = ttl; + this.expiryPlc = expiryPlc; this.topVer = topVer; this.forceTransformBackups = forceTransformBackups; this.subjId = subjId; @@ -360,10 +364,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp } /** - * @return Time to live. + * @return Expiry policy. */ - public long ttl() { - return ttl; + @Nullable public ExpiryPolicy expiry() { + return expiryPlc; } /** @@ -621,7 +625,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp _clone.drTtls = drTtls; _clone.drExpireTimes = drExpireTimes; _clone.syncMode = syncMode; - _clone.ttl = ttl; + _clone.expiryPlc = expiryPlc; _clone.nearKeys = nearKeys; _clone.nearKeyBytes = nearKeyBytes; _clone.nearVals = nearVals; @@ -742,7 +746,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp commState.idx++; case 11: - if (!commState.putLong(ttl)) + if (!commState.putByteArray(expiryPlcBytes)) return false; commState.idx++; @@ -1037,10 +1041,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp commState.idx++; case 11: - if (buf.remaining() < 8) + byte[] expiryPlcBytes0 = commState.getByteArray(); + + if (expiryPlcBytes0 == BYTE_ARR_NOT_READ) return false; - ttl = commState.getLong(); + expiryPlcBytes = expiryPlcBytes0; commState.idx++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d660112..4750462 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -93,8 +94,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** Cached entry if keys size is 1. */ private GridCacheEntryEx<K, V> cached; - /** Time to live. */ - private final long ttl; + /** Expiry policy. */ + private final ExpiryPolicy expiryPlc; /** Future map topology version. */ private long topVer; @@ -141,7 +142,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> futVer = null; retval = false; fastMap = false; - ttl = 0; + expiryPlc = null; filter = null; syncMode = null; op = null; @@ -162,7 +163,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param retval Return value require flag. * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. * @param cached Cached entry if keys size is 1. - * @param ttl Time to live. + * @param expiryPlc Expiry policy. * @param filter Entry filter. */ public GridNearAtomicUpdateFuture( @@ -177,7 +178,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx<K, V> cached, - long ttl, + @Nullable ExpiryPolicy expiryPlc, final IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, int taskNameHash @@ -201,7 +202,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.drRmvVals = drRmvVals; this.retval = retval; this.cached = cached; - this.ttl = ttl; + this.expiryPlc = expiryPlc; this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; @@ -556,7 +557,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> op, retval, op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP), - ttl, + expiryPlc, filter, subjId, taskNameHash); @@ -662,7 +663,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> op, retval, op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP), - ttl, + expiryPlc, filter, subjId, taskNameHash); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 00512ff..3eca7e2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -14,12 +14,14 @@ import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.nio.*; import java.util.*; @@ -88,8 +90,11 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im /** Return value flag. */ private boolean retval; - /** Time to live. */ - private long ttl; + /** Expiry policy. */ + private ExpiryPolicy expiryPlc; + + /** Expiry policy bytes. */ + private byte[] expiryPlcBytes; /** Filter. */ @GridDirectTransient @@ -132,7 +137,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im * @param syncMode Synchronization mode. * @param op Cache update operation. * @param retval Return value required flag. - * @param ttl Time to live. + * @param expiryPlc Expiry policy. * @param filter Optional filter for atomic check. */ public GridNearAtomicUpdateRequest( @@ -146,7 +151,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im GridCacheOperation op, boolean retval, boolean forceTransformBackups, - long ttl, + ExpiryPolicy expiryPlc, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, int taskNameHash @@ -162,7 +167,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im this.op = op; this.retval = retval; this.forceTransformBackups = forceTransformBackups; - this.ttl = ttl; + this.expiryPlc = expiryPlc; this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; @@ -240,10 +245,10 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im } /** - * @return Time to live. + * @return Expiry policy. */ - public long ttl() { - return ttl; + public ExpiryPolicy expiry() { + return expiryPlc; } /** @@ -485,6 +490,9 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im keyBytes = marshalCollection(keys, ctx); valBytes = marshalValuesCollection(vals, ctx); filterBytes = marshalFilter(filter, ctx); + + if (expiryPlc != null) + expiryPlcBytes = CU.marshal(ctx, new GridCacheExpiryPolicy(expiryPlc)); } /** {@inheritDoc} */ @@ -494,6 +502,9 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im keys = unmarshalCollection(keyBytes, ctx, ldr); vals = unmarshalValueBytesCollection(valBytes, ctx, ldr); filter = unmarshalFilter(filterBytes, ctx, ldr); + + if (expiryPlcBytes != null) + expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr); } /** {@inheritDoc} */ @@ -527,7 +538,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im _clone.drTtls = drTtls; _clone.drExpireTimes = drExpireTimes; _clone.retval = retval; - _clone.ttl = ttl; + _clone.expiryPlc = expiryPlc; _clone.filter = filter; _clone.filterBytes = filterBytes; _clone.hasPrimary = hasPrimary; @@ -688,7 +699,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; case 15: - if (!commState.putLong(ttl)) + if (!commState.putByteArray(expiryPlcBytes)) return false; commState.idx++; @@ -928,10 +939,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im commState.idx++; case 15: - if (buf.remaining() < 8) + byte[] expiryPlcBytes0 = commState.getByteArray(); + + if (expiryPlcBytes0 == BYTE_ARR_NOT_READ) return false; - ttl = commState.getLong(); + expiryPlcBytes = expiryPlcBytes0; commState.idx++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index fe16214..fd4d7dc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -94,10 +94,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i @GridDirectVersion(1) private GridCacheVersion nearVer; - /** Ttl to be used for originating node's near cache update. */ - @GridDirectVersion(1) - private long nearTtl; - /** * Empty constructor required by {@link Externalizable}. */ @@ -204,20 +200,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i } /** - * @param ttl Time to live to be used for originating node's near cache update. - */ - public void nearTtl(long ttl) { - nearTtl = ttl; - } - - /** - * @return Time to live to be used for originating node's near cache update. - */ - public long nearTtl() { - return nearTtl; - } - - /** * @param nearVer Version generated on primary node to be used for originating node's near cache update. */ public void nearVersion(GridCacheVersion nearVer) { @@ -384,7 +366,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i _clone.nearVals = nearVals; _clone.nearValBytes = nearValBytes; _clone.nearVer = nearVer; - _clone.nearTtl = nearTtl; } /** {@inheritDoc} */ @@ -461,12 +442,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i commState.idx++; case 9: - if (!commState.putLong(nearTtl)) - return false; - - commState.idx++; - - case 10: if (nearValBytes != null) { if (commState.it == null) { if (!commState.putInt(nearValBytes.size())) @@ -493,7 +468,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i commState.idx++; - case 11: + case 10: if (nearValsIdxs != null) { if (commState.it == null) { if (!commState.putInt(nearValsIdxs.size())) @@ -520,7 +495,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i commState.idx++; - case 12: + case 11: if (!commState.putCacheVersion(nearVer)) return false; @@ -620,14 +595,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i commState.idx++; case 9: - if (buf.remaining() < 8) - return false; - - nearTtl = commState.getLong(); - - commState.idx++; - - case 10: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -656,7 +623,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i commState.idx++; - case 11: + case 10: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -685,7 +652,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i commState.idx++; - case 12: + case 11: GridCacheVersion nearVer0 = commState.getCacheVersion(); if (nearVer0 == CACHE_VER_NOT_READ) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index ce5e19c..2aa32c3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -23,6 +23,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; @@ -148,7 +149,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } try { - processNearAtomicUpdateResponse(ver, key, val, valBytes, res.nearTtl(), req.nodeId(), req.subjectId(), + processNearAtomicUpdateResponse(ver, + key, + val, + valBytes, + req.expiry(), + req.nodeId(), + req.subjectId(), taskName); } catch (IgniteCheckedException e) { @@ -162,7 +169,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param key Key. * @param val Value. * @param valBytes Value bytes. - * @param ttl Time to live. + * @param expiryPlc Expiry policy. * @param nodeId Node ID. * @throws IgniteCheckedException If failed. */ @@ -171,7 +178,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { K key, @Nullable V val, @Nullable byte[] valBytes, - Long ttl, + ExpiryPolicy expiryPlc, UUID nodeId, UUID subjId, String taskName @@ -196,7 +203,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { valBytes, /*write-through*/false, /*retval*/false, - ttl, + expiryPlc != null ? expiryPlc : ctx.expiry(), /*event*/true, /*metrics*/true, /*primary*/false, @@ -253,6 +260,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); + for (int i = 0; i < req.nearSize(); i++) { K key = req.nearKey(i); @@ -292,7 +301,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { valBytes, /*write-through*/false, /*retval*/false, - req.ttl(), + expiry, /*event*/true, /*metrics*/true, /*primary*/false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java index 88abfc4..ef0b4c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java @@ -46,6 +46,8 @@ public class IgniteCacheTest extends GridCommonAbstractTest { assert cnt >= 1 : "At least one grid must be started"; startGridsMultiThreaded(cnt); + + awaitPartitionMapExchange(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java index f72619b..0d22f62 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java @@ -10,15 +10,25 @@ package org.apache.ignite.internal.processors.cache.expiry; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.expiry.*; +import java.util.*; import java.util.concurrent.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.gridgain.grid.cache.GridCacheMode.*; + /** * */ @@ -36,59 +46,82 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { stopAllGrids(); } - @Override - protected int gridCount() { + /** {@inheritDoc} */ + @Override protected int gridCount() { return 2; } /** - * + * @throws Exception If failed. */ - private class TestCreatedPolicy implements ExpiryPolicy { - /** */ - private final Duration duration; + public void testCreated() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); - /** - * @param ttl TTL for creation. - */ - TestCreatedPolicy(long ttl) { - this.duration = new Duration(TimeUnit.MILLISECONDS, ttl); - } + startGrids(); - /** {@inheritDoc} */ - @Override public Duration getExpiryForCreation() { - return duration; - } + Collection<Integer> keys = keys(); - /** {@inheritDoc} */ - @Override public Duration getExpiryForAccess() { - return null; - } + IgniteCache<Integer, Integer> cache = jcache(0); - /** {@inheritDoc} */ - @Override public Duration getExpiryForUpdate() { - return null; + for (final Integer key : keys) { + log.info("Test key: " + key); + + cache.put(key, 1); + + checkTtl(key, 60_000); + + for (int i = 0; i < gridCount(); i++) { + assertEquals((Integer)1, cache.get(key)); + + checkTtl(key, 60_000); + } + + cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 2); // Update, should not change TTL. + + checkTtl(key, 60_000); + + assertEquals((Integer)2, cache.get(key)); + + assertTrue(cache.remove(key)); + + cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 3); // Create with provided TTL. + + checkTtl(key, 1000); + + waitExpired(key); } } /** + * @return Test keys. * @throws Exception If failed. */ - public void testCreated() throws Exception { - factory = new FactoryBuilder.SingletonFactory<>(new TestCreatedPolicy(1000)); + private Collection<Integer> keys() throws Exception { + GridCache<Integer, Object> cache = cache(0); - startGrids(); + Collection<Integer> keys = new ArrayList<>(); - final Integer key = 1; + keys.add(primaryKey(cache)); - IgniteCache<Integer, Integer> cache = jcache(0); + if (gridCount() > 1) { + keys.add(backupKey(cache)); + + if (cache.configuration().getDistributionMode() == NEAR_PARTITIONED) + keys.add(nearKey(cache)); + } - cache.put(1, 1); + return keys; + } + /** + * @param key Key. + * @throws Exception If failed. + */ + private void waitExpired(final Integer key) throws Exception { GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { for (int i = 0; i < gridCount(); i++) { - Object val = cache(i).peek(key); + Object val = jcache(i).localPeek(key); log.info("Value [grid=" + i + ", val=" + val + ']'); @@ -98,10 +131,50 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { return false; } - }, 2000); + }, 3000); + + GridCache<Integer, Object> cache = cache(0); + + for (int i = 0; i < gridCount(); i++) { + ClusterNode node = grid(i).cluster().localNode(); + + Object val = jcache(i).localPeek(key); + + log.info("Value [grid=" + i + + ", primary=" + cache.affinity().isPrimary(node, key) + + ", backup=" + cache.affinity().isBackup(node, key) + ']'); + + assertNull("Unexpected non-null value for grid " + i, val); + } for (int i = 0; i < gridCount(); i++) - assertNull("Unexpected non-null value for grid " + i, cache.get(key)); + assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key)); + } + + /** + * @param key Key. + * @param ttl TTL. + * @throws Exception If failed. + */ + private void checkTtl(Object key, long ttl) throws Exception { + for (int i = 0; i < gridCount(); i++) { + GridKernal grid = (GridKernal)grid(i); + + GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache(); + + GridCacheEntryEx<Object, Object> e = cache.peekEx(key); + + if (e == null && cache.context().isNear()) + e = cache.context().near().dht().peekEx(key); + + if (e == null) { + assertTrue(i > 0); + + assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); + } + else + assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl()); + } } /** {@inheritDoc} */ @@ -110,11 +183,61 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { GridCacheConfiguration cfg = super.cacheConfiguration(gridName); - cfg.setCacheMode(GridCacheMode.PARTITIONED); - cfg.setAtomicityMode(GridCacheAtomicityMode.ATOMIC); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(ATOMIC); + cfg.setBackups(1); + + cfg.setDistributionMode(PARTITIONED_ONLY); cfg.setExpiryPolicyFactory(factory); return cfg; } + + /** + * + */ + private class TestPolicy implements ExpiryPolicy { + /** */ + private Long create; + + /** */ + private Long access; + + /** */ + private Long update; + + /** + * @param create TTL for creation. + * @param access TTL for access. + * @param update TTL for update. + */ + TestPolicy(@Nullable Long create, + @Nullable Long access, + @Nullable Long update) { + this.create = create; + this.access = access; + this.update = update; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestPolicy.class, this); + } + } }
