IGNITE-4036 - Fix. Near cache is not expired together with corresponding server cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5e601e2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5e601e2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5e601e2 Branch: refs/heads/ignite-1.9 Commit: f5e601e2973bfa81593241e55e4b6f97c0e55c3c Parents: 6f6ff39 Author: dkarachentsev <[email protected]> Authored: Thu Jan 26 10:18:34 2017 +0300 Committer: dkarachentsev <[email protected]> Committed: Thu Jan 26 10:18:34 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/EntryGetResult.java | 40 +++++- .../processors/cache/EntryGetWithTtlResult.java | 58 +++++++++ .../processors/cache/GridCacheAdapter.java | 69 +++++------ .../processors/cache/GridCacheContext.java | 122 +++++++++++++++---- .../processors/cache/GridCacheEntryEx.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 29 +++-- .../distributed/dht/GridDhtCacheAdapter.java | 4 +- .../cache/distributed/dht/GridDhtGetFuture.java | 24 ++-- .../distributed/dht/GridDhtGetSingleFuture.java | 24 ++-- .../dht/GridPartitionedGetFuture.java | 19 ++- .../dht/GridPartitionedSingleGetFuture.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 12 +- .../dht/colocated/GridDhtColocatedCache.java | 15 ++- .../distributed/near/GridNearGetFuture.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 8 +- .../local/atomic/GridLocalAtomicCache.java | 13 +- .../transactions/IgniteTxLocalAdapter.java | 62 ++++++---- .../processors/cache/GridCacheTestEntryEx.java | 2 +- .../IgniteCacheExpiryPolicyAbstractTest.java | 44 ++++++- 19 files changed, 411 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java index a34ddae..9d06448 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; */ public class EntryGetResult { /** */ - private final CacheObject val; + private Object val; /** */ private final GridCacheVersion ver; @@ -35,18 +35,34 @@ public class EntryGetResult { /** * @param val Value. * @param ver Version. + * @param reserved Reserved flag. */ - EntryGetResult(CacheObject val, GridCacheVersion ver, boolean reserved) { + public EntryGetResult(Object val, GridCacheVersion ver, boolean reserved) { this.val = val; this.ver = ver; this.reserved = reserved; } /** + * @param val Value. + * @param ver Version. + */ + public EntryGetResult(Object val, GridCacheVersion ver) { + this(val, ver, false); + } + + /** * @return Value. */ - public CacheObject value() { - return val; + public <T> T value() { + return (T)val; + } + + /** + * @param val Value. + */ + public void value(Object val) { + this.val = val; } /** @@ -57,9 +73,23 @@ public class EntryGetResult { } /** - * @return Reserved flag, + * @return Reserved flag. */ public boolean reserved() { return reserved; } + + /** + * @return Entry expire time. + */ + public long expireTime() { + return 0L; + } + + /** + * @return Entry time to live. + */ + public long ttl() { + return 0L; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetWithTtlResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetWithTtlResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetWithTtlResult.java new file mode 100644 index 0000000..fddf16e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetWithTtlResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** + * + */ +public class EntryGetWithTtlResult extends EntryGetResult { + /** */ + private final long expireTime; + + /** */ + private final long ttl; + + /** + * @param val Value. + * @param ver Version. + * @param reserved Reserved flag. + * @param expireTime Entry expire time. + * @param ttl Entry time to live. + */ + public EntryGetWithTtlResult(Object val, GridCacheVersion ver, boolean reserved, long expireTime, long ttl) { + super(val, ver, reserved); + this.expireTime = expireTime; + this.ttl = ttl; + } + + /** + * @return Entry expire time. + */ + @Override public long expireTime() { + return expireTime; + } + + /** + * @return Entry time to live. + */ + @Override public long ttl() { + return ttl; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index dc8f030..11bf34b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1421,12 +1421,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keepBinary) key = (K)ctx.toCacheKeyObject(key); - T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !keepBinary, true); + EntryGetResult t + = (EntryGetResult)get(key, !keepBinary, true); CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>( keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false) : key, - t.get1(), - t.get2()) + (V)t.value(), + t.version()) : null; if (ctx.config().getInterceptor() != null) { @@ -1434,7 +1435,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); - val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null) : null; + val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.version() : null) : null; } if (statsEnabled) @@ -1484,29 +1485,29 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key; - IgniteInternalFuture<T2<V, GridCacheVersion>> fut = - (IgniteInternalFuture<T2<V, GridCacheVersion>>)getAsync(key0, !keepBinary, true); + IgniteInternalFuture<EntryGetResult> fut = + (IgniteInternalFuture<EntryGetResult>)getAsync(key0, !keepBinary, true); final boolean intercept = ctx.config().getInterceptor() != null; IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain( - new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() { - @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f) + new CX1<IgniteInternalFuture<EntryGetResult>, CacheEntry<K, V>>() { + @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<EntryGetResult> f) throws IgniteCheckedException { - T2<V, GridCacheVersion> t = f.get(); + EntryGetResult t = f.get(); K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0; CacheEntry val = t != null ? new CacheEntryImplEx<>( key, - t.get1(), - t.get2()) + t.value(), + t.version()) : null; if (intercept) { V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); - return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.get2() : null) : null; + return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null; } else return val; @@ -1514,7 +1515,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); if (statsEnabled) - fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start)); + fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start)); return fr; } @@ -1547,15 +1548,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V long start = statsEnabled ? System.nanoTime() : 0L; - Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true); + Map<K, EntryGetResult> map = (Map<K, EntryGetResult>)getAll(keys, !ctx.keepBinary(), true); Collection<CacheEntry<K, V>> res = new HashSet<>(); if (ctx.config().getInterceptor() != null) res = interceptGetEntries(keys, map); else - for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) - res.add(new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2())); + for (Map.Entry<K, EntryGetResult> e : map.entrySet()) + res.add(new CacheEntryImplEx<>(e.getKey(), (V)e.getValue().value(), e.getValue().version())); if (statsEnabled) metrics0().addGetTimeNanos(System.nanoTime() - start); @@ -1595,24 +1596,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final long start = statsEnabled ? System.nanoTime() : 0L; - IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut = - (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>) + IgniteInternalFuture<Map<K, EntryGetResult>> fut = + (IgniteInternalFuture<Map<K, EntryGetResult>>) ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true)); final boolean intercept = ctx.config().getInterceptor() != null; IgniteInternalFuture<Collection<CacheEntry<K, V>>> rf = - fut.chain(new CX1<IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>, Collection<CacheEntry<K, V>>>() { + fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryGetResult>>, Collection<CacheEntry<K, V>>>() { @Override public Collection<CacheEntry<K, V>> applyx( - IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> f) throws IgniteCheckedException { + IgniteInternalFuture<Map<K, EntryGetResult>> f) throws IgniteCheckedException { if (intercept) return interceptGetEntries(keys, f.get()); else { Map<K, CacheEntry<K, V>> res = U.newHashMap(f.get().size()); - for (Map.Entry<K, T2<V, GridCacheVersion>> e : f.get().entrySet()) + for (Map.Entry<K, EntryGetResult> e : f.get().entrySet()) res.put(e.getKey(), - new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2())); + new CacheEntryImplEx<>(e.getKey(), (V)e.getValue().value(), e.getValue().version())); return res.values(); } @@ -1620,7 +1621,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); if (statsEnabled) - fut.listen(new UpdateGetTimeStatClosure<Map<K, T2<V, GridCacheVersion>>>(metrics0(), start)); + fut.listen(new UpdateGetTimeStatClosure<Map<K, EntryGetResult>>(metrics0(), start)); return rf; } @@ -1675,7 +1676,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @SuppressWarnings("IfMayBeConditional") private Collection<CacheEntry<K, V>> interceptGetEntries( - @Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) { + @Nullable Collection<? extends K> keys, Map<K, EntryGetResult> map) { Map<K, CacheEntry<K, V>> res; if (F.isEmpty(keys)) { @@ -1690,11 +1691,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert interceptor != null; - for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) { - V val = interceptor.onGet(e.getKey(), e.getValue().get1()); + for (Map.Entry<K, EntryGetResult> e : map.entrySet()) { + V val = interceptor.onGet(e.getKey(), (V)e.getValue().value()); if (val != null) - res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().get2())); + res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().version())); } if (map.size() != keys.size()) { // Not all requested keys were in cache. @@ -1976,12 +1977,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (res != null) { ctx.addResult(map, key, - res.value(), + res, skipVals, keepCacheObjects, deserializeBinary, true, - needVer ? res.version() : null); + needVer); if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); @@ -2025,7 +2026,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheEntryEx entry = entryEx(key); try { - T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue( + EntryGetResult verVal = entry.versionedValue( cacheVal, res.version(), null, @@ -2035,19 +2036,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (log.isDebugEnabled()) log.debug("Set value loaded from store into entry [" + "oldVer=" + res.version() + - ", newVer=" + verVal.get2() + ", " + + ", newVer=" + verVal.version() + ", " + "entry=" + entry + ']'); // Don't put key-value pair into result map if value is null. - if (verVal.get1() != null) { + if (verVal.value() != null) { ctx.addResult(map, key, - verVal.get1(), + verVal, skipVals, keepCacheObjects, deserializeBinary, true, - needVer ? verVal.get2() : null); + needVer); } if (tx0 == null || (!tx0.implicit() && http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 424e325..6322f9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -90,7 +90,6 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -1895,7 +1894,65 @@ public class GridCacheContext<K, V> implements Externalizable { boolean keepCacheObjects, boolean deserializeBinary, boolean cpy, - final GridCacheVersion ver) { + final GridCacheVersion ver, + final long expireTime, + final long ttl) { + // Creates EntryGetResult + addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, cpy, null, + ver, expireTime, ttl, ver != null); + } + + /** + * @param map Map. + * @param key Key. + * @param getRes EntryGetResult. + * @param skipVals Skip values. + * @param keepCacheObjects Keep CacheObject. + * @param deserializeBinary Deserialize binary flag. + * @param cpy Copy flag. + * @param needVer Need version flag. + */ + @SuppressWarnings("unchecked") + public <K1, V1> void addResult(Map<K1, V1> map, + KeyCacheObject key, + EntryGetResult getRes, + boolean skipVals, + boolean keepCacheObjects, + boolean deserializeBinary, + boolean cpy, + boolean needVer) { + // Uses getRes as result. + addResult(map, key, getRes.<CacheObject>value(), skipVals, keepCacheObjects, deserializeBinary, cpy, getRes, + null, 0, 0, needVer); + } + + /** + * @param map Map. + * @param key Key. + * @param val Value. + * @param skipVals Skip values. + * @param keepCacheObjects Keep CacheObject. + * @param deserializeBinary Deserialize binary. + * @param cpy Copy flag. + * @param getRes EntryGetResult. + * @param ver Version. + * @param expireTime Entry expire time. + * @param ttl Entry TTL. + * @param needVer Need version flag. + */ + @SuppressWarnings("unchecked") + public <K1, V1> void addResult(Map<K1, V1> map, + KeyCacheObject key, + CacheObject val, + boolean skipVals, + boolean keepCacheObjects, + boolean deserializeBinary, + boolean cpy, + @Nullable EntryGetResult getRes, + final GridCacheVersion ver, + final long expireTime, + final long ttl, + boolean needVer) { assert key != null; assert val != null || skipVals; @@ -1907,32 +1964,53 @@ public class GridCacheContext<K, V> implements Externalizable { assert key0 != null : key; assert val0 != null : val; - map.put((K1)key0, ver != null ? (V1)new T2<>(val0, ver) : (V1)val0); + V1 v = createValue(ver, expireTime, ttl, val0, getRes, needVer); + + map.put((K1)key0, v); + } + else { + Object val0 = skipVals ? true : val; + + V1 v = createValue(ver, expireTime, ttl, val0, getRes, needVer); + + map.put((K1)key, v); } - else - map.put((K1)key, - (V1)(ver != null ? - (V1)new T2<>(skipVals ? true : val, ver) : - skipVals ? true : val)); } /** - * @param map Map. - * @param key Key. + * Creates new EntryGetResult or uses existing one. + * + * @param ver Version. + * @param expireTime Entry expire time. + * @param ttl Entry TTL. * @param val Value. - * @param skipVals Skip values flag. - * @param keepCacheObjects Keep cache objects flag. - * @param deserializeBinary Deserialize binary flag. - * @param cpy Copy flag. + * @param getRes EntryGetResult + * @param needVer Need version flag. + * @return EntryGetResult or value. */ - public <K1, V1> void addResult(Map<K1, V1> map, - KeyCacheObject key, - CacheObject val, - boolean skipVals, - boolean keepCacheObjects, - boolean deserializeBinary, - boolean cpy) { - addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, cpy, null); + @SuppressWarnings("unchecked") + private <V1> V1 createValue(final GridCacheVersion ver, + final long expireTime, + final long ttl, + final Object val, + @Nullable final EntryGetResult getRes, + final boolean needVer) { + final V1 v; + + if (!needVer) + v = (V1) val; + else if (getRes == null) { + v = expireTime != 0 || ttl != 0 + ? (V1)new EntryGetWithTtlResult(val, ver, false, expireTime, ttl) + : (V1)new EntryGetResult(val, ver, false); + } + else { + getRes.value(val); + + v = (V1)getRes; + } + + return v; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 3c42d53..ccd2285 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -761,7 +761,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If index could not be updated. * @throws GridCacheEntryRemovedException If entry was removed. */ - public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val, + public EntryGetResult versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, @Nullable GridCacheVersion newVer, @Nullable ReaderArguments readerArgs, http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 7e26719..58b4ae3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -69,7 +69,6 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -975,7 +974,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert !deferred; // If return value is consistent, then done. - res = retVer ? new EntryGetResult(ret, resVer, false) : ret; + res = retVer ? entryGetResult(ret, resVer, false) : ret; } else if (reserveForLoad && !obsolete) { assert !readThrough; @@ -986,7 +985,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (reserve) flags |= IS_EVICT_DISABLED; - res = new EntryGetResult(null, resVer, reserve); + res = entryGetResult(null, resVer, reserve); } } @@ -1092,6 +1091,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return ret; } + /** + * Creates EntryGetResult or EntryGetWithTtlResult if expire time information exists. + * + * @param val Value. + * @param ver Version. + * @param reserve Reserve flag. + * @return EntryGetResult. + */ + private EntryGetResult entryGetResult(CacheObject val, GridCacheVersion ver, boolean reserve) { + return extras == null || extras.expireTime() == 0 + ? new EntryGetResult(val, ver, reserve) + : new EntryGetWithTtlResult(val, ver, reserve, rawExpireTime(), rawTtl()); + } + /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "TooBroadScope"}) @Nullable @Override public final CacheObject innerReload() @@ -3382,7 +3395,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * TODO: GG-4009: do we need to generate event and invalidate value? + * TODO: IGNITE-3500: do we need to generate event and invalidate value? * * @return {@code true} if expired. * @throws IgniteCheckedException In case of failure. @@ -3621,7 +3634,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public synchronized T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val, + @Override public synchronized EntryGetResult versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer, @Nullable ReaderArguments readerArgs, @@ -3637,7 +3650,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheMvcc mvcc = mvccExtras(); if (mvcc != null && !mvcc.isEmpty()) - return new T2<>(this.val, ver); + return entryGetResult(this.val, ver, false); if (newVer == null) newVer = cctx.versions().next(); @@ -3671,13 +3684,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Version does not change for load ops. update(val, expTime, ttl, newVer, true); - return new T2<>(val, newVer); + return entryGetResult(val, newVer, false); } assert !evictionDisabled() : this; } - return new T2<>(this.val, ver); + return entryGetResult(this.val, ver, false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index bc34df7..dcd379a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.CachePeekModes; +import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; @@ -74,7 +75,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CI3; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -648,7 +648,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param canRemap Can remap flag. * @return Get future. */ - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync( + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> getDhtAllAsync( Collection<KeyCacheObject> keys, @Nullable final ReaderArguments readerArgs, boolean readThrough, http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 3bf4489..8b92e9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -31,6 +31,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -46,7 +47,6 @@ import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -390,7 +390,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col txFut.markInitialized(); } - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut; if (txFut == null || txFut.isDone()) { fut = cache().getDhtAllAsync( @@ -411,8 +411,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // transactions to complete. fut = new GridEmbeddedFuture<>( txFut, - new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() { - @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>>() { + @Override public IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> apply(Boolean b, Exception e) { if (e != null) throw new GridClosureException(e); @@ -438,9 +438,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } return new GridEmbeddedFuture<>( - new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() { + new C2<Map<KeyCacheObject, EntryGetResult>, Exception, Collection<GridCacheEntryInfo>>() { @Override public Collection<GridCacheEntryInfo> apply( - Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e + Map<KeyCacheObject, EntryGetResult> map, Exception e ) { if (e != null) { onDone(e); @@ -458,14 +458,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param map Map to convert. * @return List of infos. */ - private Collection<GridCacheEntryInfo> toEntryInfos(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) { + private Collection<GridCacheEntryInfo> toEntryInfos(Map<KeyCacheObject, EntryGetResult> map) { if (map.isEmpty()) return Collections.emptyList(); Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size()); - for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) { - T2<CacheObject, GridCacheVersion> val = entry.getValue(); + for (Map.Entry<KeyCacheObject, EntryGetResult> entry : map.entrySet()) { + EntryGetResult val = entry.getValue(); assert val != null; @@ -473,8 +473,10 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col info.cacheId(cctx.cacheId()); info.key(entry.getKey()); - info.value(skipVals ? null : val.get1()); - info.version(val.get2()); + info.value(skipVals ? null : (CacheObject)val.value()); + info.version(val.version()); + info.expireTime(val.expireTime()); + info.ttl(val.ttl()); infos.add(info); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 49bebd6..f3a27bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -39,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.ReaderArguments; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; @@ -348,7 +348,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa } } - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut; if (rdrFut == null || rdrFut.isDone()) { fut = cache().getDhtAllAsync( @@ -375,7 +375,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa return; } - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0 = + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut0 = cache().getDhtAllAsync( Collections.singleton(key), args, @@ -403,11 +403,11 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa /** * @return Listener for get future. */ - @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>> + @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>> createGetFutureListener() { - return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() { + return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>>() { @Override public void apply( - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut ) { onResult(fut); } @@ -417,7 +417,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa /** * @param fut Completed future to finish this process with. */ - private void onResult(IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut) { + private void onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut) { assert fut.isDone(); if (fut.error() != null) @@ -436,11 +436,11 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa * @param map Map to convert. * @return List of infos. */ - private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) { + private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, EntryGetResult> map) { if (map.isEmpty()) return null; - T2<CacheObject, GridCacheVersion> val = map.get(key); + EntryGetResult val = map.get(key); assert val != null; @@ -448,8 +448,10 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa info.cacheId(cctx.cacheId()); info.key(key); - info.value(skipVals ? null : val.get1()); - info.version(val.get2()); + info.value(skipVals ? null : (CacheObject)val.value()); + info.version(val.version()); + info.expireTime(val.expireTime()); + info.ttl(val.ttl()); return info; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 5892b37..c41711c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -447,11 +447,12 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (entry != null) { boolean isNew = entry.isNewLocked(); + EntryGetResult getRes = null; CacheObject v = null; GridCacheVersion ver = null; if (needVer) { - EntryGetResult res = entry.innerGetVersioned( + getRes = entry.innerGetVersioned( null, null, /*swap*/true, @@ -465,9 +466,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda !deserializeBinary, null); - if (res != null) { - v = res.value(); - ver = res.version(); + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); } } else { @@ -501,7 +502,11 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda keepCacheObjects, deserializeBinary, true, - ver); + getRes, + ver, + 0, + 0, + needVer); return true; } @@ -560,7 +565,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda keepCacheObjects, deserializeBinary, false, - needVer ? info.version() : null); + needVer ? info.version() : null, + 0, + 0); } return map; http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 7c14f35..2b5624b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -51,7 +51,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; @@ -615,7 +614,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im if (needVer) { assert ver != null || !res; - onDone(new T2<>(res, ver)); + onDone(new EntryGetResult(res, ver)); } else onDone(res); @@ -633,10 +632,10 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im if (!keepCacheObjects) { Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); - onDone(needVer ? new T2<>(res, ver) : res); + onDone(needVer ? new EntryGetResult(res, ver) : res); } else - onDone(needVer ? new T2<>(val, ver) : val); + onDone(needVer ? new EntryGetResult(val, ver) : val); } else onDone(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2f97bcc..72e1bb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1491,11 +1491,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry != null) { boolean isNew = entry.isNewLocked(); + EntryGetResult getRes = null; CacheObject v = null; GridCacheVersion ver = null; if (needVer) { - EntryGetResult res = entry.innerGetVersioned( + getRes = entry.innerGetVersioned( null, null, /*swap*/true, @@ -1509,9 +1510,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, null); - if (res != null) { - v = res.value(); - ver = res.version(); + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); } } else { @@ -1539,7 +1540,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { success = false; } else - ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, ver); + ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, + getRes, ver, 0, 0, needVer); } else success = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 5ed30db..ccdc51d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -481,11 +481,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (entry != null) { boolean isNew = entry.isNewLocked(); + EntryGetResult getRes = null; CacheObject v = null; GridCacheVersion ver = null; if (needVer) { - EntryGetResult res = entry.innerGetVersioned( + getRes = entry.innerGetVersioned( null, null, /*swap*/true, @@ -499,9 +500,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte !deserializeBinary, null); - if (res != null) { - v = res.value(); - ver = res.version(); + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); } } else { @@ -540,7 +541,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte keepCacheObj, deserializeBinary, true, - ver); + getRes, + ver, + 0, + 0, + needVer); } } else http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 8c64e3e..cb47498 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -56,7 +56,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -665,7 +664,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (keepCacheObjects) { K key0 = (K)key; V val0 = needVer ? - (V)new T2<>(skipVals ? true : v, ver) : + (V)new EntryGetResult(skipVals ? true : v, ver) : (V)(skipVals ? true : v); add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); @@ -673,7 +672,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap else { K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); V val0 = needVer ? - (V)new T2<>(!skipVals ? + (V)new EntryGetResult(!skipVals ? (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : (V)Boolean.TRUE, ver) : !skipVals ? @@ -759,7 +758,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap keepCacheObjects, deserializeBinary, false, - needVer ? info.version() : null); + needVer ? info.version() : null, + 0, + 0); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 67518ef..ae9edcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -60,7 +61,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -476,10 +476,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { GridCacheVersion ver; if (needVer) { - T2<Object, GridCacheVersion> t = (T2)val; + EntryGetResult getRes = (EntryGetResult)val; - v = t.get1(); - ver = t.get2(); + v = getRes.value(); + ver = getRes.version(); } else { v = val; http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index ee4f7a6..7da11b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -513,7 +513,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (entry != null) { CacheObject v; - GridCacheVersion ver; if (needVer) { EntryGetResult res = entry.innerGetVersioned( @@ -531,18 +530,15 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { null); if (res != null) { - v = res.value(); - ver = res.version(); - ctx.addResult( vals, cacheKey, - v, + res, skipVals, false, deserializeBinary, true, - ver); + needVer); } else success = false; @@ -569,7 +565,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { skipVals, false, deserializeBinary, - true); + true, + null, + 0, + 0); } else success = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index a1c1123..777489e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -481,7 +481,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); try { - T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(cacheVal, + EntryGetResult verVal = entry.versionedValue(cacheVal, ver, null, null, @@ -490,11 +490,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (log.isDebugEnabled()) { log.debug("Set value loaded from store into entry [" + "oldVer=" + ver + - ", newVer=" + verVal.get2() + + ", newVer=" + verVal.version() + ", entry=" + entry + ']'); } - ver = verVal.get2(); + ver = verVal.version(); break; } @@ -1212,7 +1212,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig assert ver != null; } - cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, ver); + cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, + ver, 0, 0); } } else { @@ -1221,6 +1222,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig while (true) { try { GridCacheVersion readVer = null; + EntryGetResult getRes = null; Object transformClo = (txEntry.op() == TRANSFORM && @@ -1228,7 +1230,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig F.first(txEntry.entryProcessors()) : null; if (needVer) { - EntryGetResult res = txEntry.cached().innerGetVersioned( + getRes = txEntry.cached().innerGetVersioned( null, this, /*swap*/true, @@ -1242,9 +1244,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txEntry.keepBinary(), null); - if (res != null) { - val = res.value(); - readVer = res.version(); + if (getRes != null) { + val = getRes.value(); + readVer = getRes.version(); } } else { @@ -1277,7 +1279,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig keepCacheObjects, deserializeBinary, false, - readVer); + getRes, + readVer, + 0, + 0, + needVer); } else missed.put(key, txEntry.cached().version()); @@ -1306,13 +1312,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheObject val = null; GridCacheVersion readVer = null; + EntryGetResult getRes = null; if (!pessimistic() || readCommitted() && !skipVals) { IgniteCacheExpiryPolicy accessPlc = optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; if (needReadVer) { - EntryGetResult res = primaryLocal(entry) ? + getRes = primaryLocal(entry) ? entry.innerGetVersioned( null, this, @@ -1327,9 +1334,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig !deserializeBinary, null) : null; - if (res != null) { - val = res.value(); - readVer = res.version(); + if (getRes != null) { + val = getRes.value(); + readVer = getRes.version(); } } else { @@ -1356,7 +1363,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig keepCacheObjects, deserializeBinary, false, - needVer ? readVer : null); + getRes, + readVer, + 0, + 0, + needVer); } else missed.put(key, ver); @@ -1534,7 +1545,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig keepCacheObjects, deserializeBinary, false, - needVer ? loadVer : null); + needVer ? loadVer : null, + 0, + 0); } } else { @@ -1556,7 +1569,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig keepCacheObjects, deserializeBinary, false, - needVer ? loadVer : null); + needVer ? loadVer : null, + 0, + 0); } } } @@ -1663,6 +1678,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheObject val = null; GridCacheVersion readVer = null; + EntryGetResult getRes = null; try { Object transformClo = @@ -1671,7 +1687,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig F.first(txEntry.entryProcessors()) : null; if (needVer) { - EntryGetResult res = cached.innerGetVersioned( + getRes = cached.innerGetVersioned( null, IgniteTxLocalAdapter.this, /*swap*/cacheCtx.isSwapOrOffheapEnabled(), @@ -1685,9 +1701,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txEntry.keepBinary(), null); - if (res != null) { - val = res.value(); - readVer = res.version(); + if (getRes != null) { + val = getRes.value(); + readVer = getRes.version(); } } else{ @@ -1722,7 +1738,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig keepCacheObjects, deserializeBinary, false, - readVer); + getRes, + readVer, + 0, + 0, + needVer); if (readVer != null) txEntry.entryReadVersion(readVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 2954bdb..d46dee0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -686,7 +686,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Override public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val, + @Override public EntryGetResult versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer, @Nullable ReaderArguments readerArgs, http://git-wip-us.apache.org/repos/asf/ignite/blob/f5e601e2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index f22ca6d..b234631 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -36,12 +36,14 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.expiry.ModifiedExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -54,6 +56,7 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -1015,6 +1018,45 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } /** + * Put entry to server node and check how its expires in client NearCache. + * + * @throws Exception If failed. + */ + public void testNearExpiresOnClient() throws Exception { + if(cacheMode() != PARTITIONED) + return; + + factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS,1)); + + nearCache = true; + + startGrids(); + + IgniteConfiguration clientCfg = getConfiguration("client").setClientMode(true); + + ((TcpDiscoverySpi)clientCfg.getDiscoverySpi()).setForceServerMode(false); + + Ignite client = startGrid("client", clientCfg); + + IgniteCache<Object, Object> cache = client.cache(null); + + Integer key = 1; + + // Put on server node. + jcache(0).put(key, 1); + + // Make entry cached in client NearCache. + assertEquals(1, cache.get(key)); + + assertEquals(1, cache.localPeek(key, CachePeekMode.NEAR)); + + waitExpired(key); + + // Check client NearCache. + assertNull(cache.localPeek(key, CachePeekMode.NEAR)); + } + + /** * @return Test keys. * @throws Exception If failed. */ @@ -1270,4 +1312,4 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs return S.toString(TestPolicy.class, this); } } -} \ No newline at end of file +}
