Repository: ignite Updated Branches: refs/heads/ignite-2610 [created] 1adc02a58
IGNITE-2610 - WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1adc02a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1adc02a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1adc02a5 Branch: refs/heads/ignite-2610 Commit: 1adc02a584c51008fa17f75271fd3e94912043e9 Parents: 46b6a76 Author: Alexey Goncharuk <[email protected]> Authored: Mon Feb 15 18:03:36 2016 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Feb 15 18:03:36 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 47 +-- .../processors/cache/GridCacheTtlManager.java | 73 +++- .../distributed/dht/GridDhtCacheEntry.java | 2 +- .../atomic/GridDhtAtomicOffHeapCacheEntry.java | 9 + .../distributed/near/GridNearCacheEntry.java | 4 +- .../cache/query/GridCacheQueryManager.java | 367 ++++++++++++------- .../IgniteCacheAtomicExpiryPolicyTest.java | 5 + ...gniteCacheAtomicOffheapExpiryPolicyTest.java | 35 ++ .../IgniteCacheExpiryPolicyAbstractTest.java | 245 +++++++++---- 9 files changed, 549 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c1eeb5e..bf75fc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -529,7 +529,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); // Set unswapped value. - update(val, e.expireTime(), e.ttl(), e.version()); + update(val, e.expireTime(), e.ttl(), e.version(), false); // Must update valPtr again since update() will reset it. if (cctx.offheapTiered() && e.offheapPointer() > 0) @@ -945,7 +945,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean hadValPtr = hasOffHeapPointer(); // Don't change version for read-through. - update(ret, expTime, ttl, nextVer); + update(ret, expTime, ttl, nextVer, true); if (hadValPtr && cctx.offheapTiered()) cctx.swap().removeOffheap(key); @@ -1040,7 +1040,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme deletedUnlocked(true); } - update(ret, expTime, ttl, nextVer); + update(ret, expTime, ttl, nextVer, true); touch = true; @@ -1194,7 +1194,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; - update(val, expireTime, ttl, newVer); + update(val, expireTime, ttl, newVer, true); drReplicate(drType, val, newVer); @@ -1356,7 +1356,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean hadValPtr = hasOffHeapPointer(); - update(null, 0, 0, newVer); + update(null, 0, 0, newVer, true); if (cctx.offheapTiered() && hadValPtr) { boolean rmv = cctx.swap().removeOffheap(key); @@ -1572,7 +1572,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else clearIndex(null); - update(old, expireTime, ttl, ver); + update(old, expireTime, ttl, ver, true); } // Apply metrics. @@ -1719,7 +1719,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert ttl != CU.TTL_ZERO; - update(updated, expireTime, ttl, ver); + update(updated, expireTime, ttl, ver, true); if (evt) { CacheObject evtOld = null; @@ -1756,7 +1756,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // in load methods without actually holding entry lock. clearIndex(old); - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true); if (cctx.offheapTiered() && hasValPtr) { boolean rmv = cctx.swap().removeOffheap(key); @@ -2131,7 +2131,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else clearIndex(null); - update(oldVal, initExpireTime, initTtl, ver); + update(oldVal, initExpireTime, initTtl, ver, true); if (deletedUnlocked() && oldVal != null && !isInternal()) deletedUnlocked(false); @@ -2345,7 +2345,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // in load methods without actually holding entry lock. updateIndex(updated, newExpireTime, newVer, oldVal); - update(updated, newExpireTime, newTtl, newVer); + update(updated, newExpireTime, newTtl, newVer, true); updateCntr0 = nextPartCounter(topVer); @@ -2430,7 +2430,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean hasValPtr = hasOffHeapPointer(); // Clear value on backup. Entry will be removed from cache when it got evicted from queue. - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer); + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true); assert newSysTtl == CU.TTL_NOT_CHANGED; assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; @@ -2707,7 +2707,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) { if (!deletedUnlocked()) { - update(null, 0L, 0L, ver); + update(null, 0L, 0L, ver, true); deletedUnlocked(true); @@ -2895,24 +2895,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param ttl Time to live. * @param ver Update version. */ - protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) { + protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver, boolean addTracked) { assert ver != null; assert Thread.holdsLock(this); assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl; long oldExpireTime = expireTimeExtras(); - if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl()) + if (addTracked && oldExpireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) cctx.ttl().removeTrackedEntry(this); value(val); ttlAndExpireTimeExtras(ttl, expireTime); - if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) - cctx.ttl().addTrackedEntry(this); - this.ver = ver; + + if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) + cctx.ttl().addTrackedEntry(this); } /** @@ -2950,7 +2950,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param ttl Time to live. */ - private void updateTtl(long ttl) { + protected void updateTtl(long ttl) { assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; assert Thread.holdsLock(this); @@ -3224,7 +3224,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Override public synchronized CacheObject rawPut(CacheObject val, long ttl) { CacheObject old = this.val; - update(val, CU.toExpireTime(ttl), ttl, nextVersion()); + update(val, CU.toExpireTime(ttl), ttl, nextVersion(), true); return old; } @@ -3252,7 +3252,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateIndex(val, expTime, ver, null); // Version does not change for load ops. - update(val, expTime, ttl, ver); + update(val, expTime, ttl, ver, true); boolean skipQryNtf = false; @@ -3338,7 +3338,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme update(val, unswapped.expireTime(), unswapped.ttl(), - unswapped.version() + unswapped.version(), + true ); return true; @@ -3397,7 +3398,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } // Version does not change for load ops. - update(val, expTime, ttl, newVer); + update(val, expTime, ttl, newVer, true); return newVer; } @@ -3612,7 +3613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (!obsolete()) { if (cctx.deferredDelete() && !detached() && !isInternal()) { if (!deletedUnlocked()) { - update(null, 0L, 0L, ver0 = ver); + update(null, 0L, 0L, ver0 = ver, true); deletedUnlocked(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index bdb1f18..d119317 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; @@ -74,6 +76,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { * @param entry Entry to add. */ public void addTrackedEntry(GridCacheMapEntry entry) { + assert Thread.holdsLock(entry); + assert cleanupWorker != null; + pendingEntries.add(new EntryWrapper(entry)); } @@ -82,10 +87,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { */ public void removeTrackedEntry(GridCacheMapEntry entry) { assert Thread.holdsLock(entry); + assert cleanupWorker != null; pendingEntries.remove(new EntryWrapper(entry)); } + /** + * @return The size of pending entries. + */ + public int pendingSize() { + return pendingEntries.sizex(); + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); @@ -150,6 +163,46 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { } /** + * @param cctx Cache context. + * @param key1 Left key to compare. + * @param key2 Right key to compare. + * @return Comparison result. + */ + private static int compareKeys(GridCacheContext cctx, CacheObject key1, CacheObject key2) { + int key1Hash = key1.hashCode(); + int key2Hash = key2.hashCode(); + + int res = Integer.compare(key1Hash, key2Hash); + + if (res == 0) { + key1 = (CacheObject)cctx.unwrapTemporary(key1); + key2 = (CacheObject)cctx.unwrapTemporary(key2); + + try { + byte[] key1ValBytes = key1.valueBytes(cctx.cacheObjectContext()); + byte[] key2ValBytes = key2.valueBytes(cctx.cacheObjectContext()); + + // Must not do fair array comparison. + res = Integer.compare(key1ValBytes.length, key2ValBytes.length); + + if (res == 0) { + for (int i = 0; i < key1ValBytes.length; i++) { + res = Byte.compare(key1ValBytes[i], key2ValBytes[i]); + + if (res != 0) + break; + } + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + return res; + } + + /** * Entry wrapper. */ private static class EntryWrapper implements Comparable<EntryWrapper> { @@ -174,8 +227,12 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { @Override public int compareTo(EntryWrapper o) { int res = Long.compare(expireTime, o.expireTime); - if (res == 0) - res = Long.compare(entry.startVersion(), o.entry.startVersion()); + if (res == 0) { + // Must compare entries of the same cache. + assert entry.context() == o.entry.context(); + + res = compareKeys(entry.context(), entry.key(), o.entry.key()); + } return res; } @@ -190,7 +247,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { EntryWrapper that = (EntryWrapper)o; - return expireTime == that.expireTime && entry.startVersion() == that.entry.startVersion(); + return expireTime == that.expireTime && compareKeys(entry.context(), entry.key(), that.entry.key()) == 0; } @@ -198,10 +255,15 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { @Override public int hashCode() { int res = (int)(expireTime ^ (expireTime >>> 32)); - res = 31 * res + (int)(entry.startVersion() ^ (entry.startVersion() >>> 32)); + res = 31 * res + entry.key().hashCode(); return res; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(EntryWrapper.class, this); + } } /** @@ -230,10 +292,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { @Override public boolean add(EntryWrapper e) { boolean res = super.add(e); - assert res; + assert res : "Failed to add entry wrapper:" + e; size.increment(); - return res; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 14e3d3e..fae8219 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -580,7 +580,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { clearIndex(prev); // Give to GC. - update(null, 0L, 0L, ver); + update(null, 0L, 0L, ver, true); if (swap) { releaseSwap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java index 85cfb80..9321449 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java @@ -21,6 +21,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.jetbrains.annotations.Nullable; /** * DHT atomic cache entry for off-heap tiered or off-heap values modes. @@ -52,6 +54,13 @@ public class GridDhtAtomicOffHeapCacheEntry extends GridDhtAtomicCacheEntry { } /** {@inheritDoc} */ + @Override protected void updateTtl(long ttl) { + super.updateTtl(ttl); + + valPtr = 0; + } + + /** {@inheritDoc} */ @Override protected long offHeapPointer() { return valPtr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 026fb4d..943a91a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -166,7 +166,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { if (isNew() || !valid(topVer)) { // Version does not change for load ops. - update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version()); + update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true); if (cctx.deferredDelete() && !isNew() && !isInternal()) { boolean deleted = val == null; @@ -402,7 +402,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) { primaryNode(primaryNodeId, topVer); - update(val, expireTime, ttl, ver); + update(val, expireTime, ttl, ver, true); if (cctx.deferredDelete() && !isInternal()) { boolean deleted = val == null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 0d8f795..751279d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -809,161 +809,67 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte prj0 = prj0.keepBinary(); - final IgniteInternalCache<K, V> prj = prj0; + final IgniteInternalCache prj = prj0; final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter(); try { injectResources(keyValFilter); - final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); - - final GridCacheAdapter cache = dht != null ? dht : cctx.cache(); - final ExpiryPolicy plc = cctx.expiry(); final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); final boolean backups = qry.includeBackups() || cctx.isReplicated(); - final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = - new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { - private IgniteBiTuple<K, V> next; - - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - - private Iterator<K> iter; + Iterator<KeyCacheObject> keyIter; - private GridDhtLocalPartition locPart; + GridDhtLocalPartition locPart = null; - { - Integer part = qry.partition(); + Integer part = qry.partition(); - if (part == null || dht == null) - iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); - else if (part < 0 || part >= cctx.affinity().partitions()) - iter = F.emptyIterator(); - else { - locPart = dht.topology().localPartition(part, topVer, false); - - // double check for owning state - if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || - locPart.state() != OWNING) - throw new GridDhtUnreservedPartitionException(part, - cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved"); - - iter = new Iterator<K>() { - private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator(); + if (part == null || cctx.isLocal()) + keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); + else if (part < 0 || part >= cctx.affinity().partitions()) + keyIter = F.emptyIterator(); + else { + final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); - @Override public boolean hasNext() { - return iter0.hasNext(); - } + locPart = dht.topology().localPartition(part, topVer, false); - @Override public K next() { - KeyCacheObject key = iter0.next(); + // double check for owning state + if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) + throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), + "Partition can not be reserved"); - return (K)cctx.unwrapBinaryIfNeeded(key, true); - } + final GridDhtLocalPartition locPart0 = locPart; - @Override public void remove() { - iter0.remove(); - } - }; - } + keyIter = new Iterator<KeyCacheObject>() { + private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator(); - advance(); + @Override public boolean hasNext() { + return iter0.hasNext(); } - @Override public boolean onHasNext() { - return next != null; + @Override public KeyCacheObject next() { + return iter0.next(); } - @Override public IgniteBiTuple<K, V> onNext() { - if (next == null) - throw new NoSuchElementException(); - - IgniteBiTuple<K, V> next0 = next; - - advance(); - - return next0; + @Override public void remove() { + iter0.remove(); } + }; + } - private void advance() { - IgniteBiTuple<K, V> next0 = null; - - while (iter.hasNext()) { - next0 = null; - - K key = iter.next(); - - V val; - - try { - GridCacheEntryEx entry = cache.peekEx(key); - - CacheObject cacheVal = - entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null; - - val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true); - } - catch (GridCacheEntryRemovedException e) { - val = null; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); - - val = null; - } - - if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { - dht.sendTtlUpdateRequest(expiryPlc); - - expiryPlc = cctx.cache().expiryPolicy(plc); - } - - if (val != null) { - next0 = F.t(key, val); - - if (checkPredicate(next0)) - break; - else - next0 = null; - } - } - - next = next0 != null ? - new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : - null; - - if (next == null) - sendTtlUpdate(); - } + final GridDhtLocalPartition locPart0 = locPart; + final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = + new PeekValueExpiryAwareIterator<K, V>(keyIter, plc, topVer, keyValFilter, qry.keepBinary()) { @Override protected void onClose() { - sendTtlUpdate(); + super.onClose(); - if (locPart != null) - locPart.release(); - } - - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { - dht.sendTtlUpdateRequest(expiryPlc); - - expiryPlc = null; - } - } - - private boolean checkPredicate(Map.Entry<K, V> e) { - if (keyValFilter != null) { - Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, qry.keepBinary()); - - return keyValFilter.apply(e0.getKey(), e0.getValue()); - } - - return true; + if (locPart0 != null) + locPart0.release(); } }; @@ -975,10 +881,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte iters.add(heapIt); if (cctx.isOffHeapEnabled()) - iters.add(offheapIterator(qry, backups)); + iters.add(offheapIterator(qry, topVer, backups, plc)); if (cctx.swap().swapEnabled()) - iters.add(swapIterator(qry, backups)); + iters.add(swapIterator(qry, topVer, backups, plc)); it = new CompoundIterator<>(iters); } @@ -1032,8 +938,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Swap iterator. * @throws IgniteCheckedException If failed. */ - private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, boolean backups) - throws IgniteCheckedException { + private GridIterator<IgniteBiTuple<K, V>> swapIterator( + GridCacheQueryAdapter<?> qry, + AffinityTopologyVersion topVer, + boolean backups, + ExpiryPolicy expPlc + ) throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); Integer part = qry.partition(); @@ -1041,6 +951,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) : cctx.swap().rawSwapIterator(part); + if (expPlc != null) + return scanExpiryIterator( + it, + topVer, + filter, + expPlc, + qry.keepBinary()); + return scanIterator(it, filter, qry.keepBinary()); } @@ -1049,9 +967,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param backups Include backups. * @return Offheap iterator. */ - private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean backups) { + private GridIterator<IgniteBiTuple<K, V>> offheapIterator( + GridCacheQueryAdapter<?> qry, + AffinityTopologyVersion topVer, + boolean backups, + ExpiryPolicy expPlc + ) { IgniteBiPredicate<K, V> filter = qry.scanFilter(); + if (expPlc != null) { + return scanExpiryIterator( + cctx.swap().rawOffHeapIterator(qry.partition(), true, backups), + topVer, + filter, + expPlc, + qry.keepBinary()); + } + if (cctx.offheapTiered() && filter != null) { OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary()); @@ -1125,6 +1057,38 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte }; } + private GridIteratorAdapter<IgniteBiTuple<K, V>> scanExpiryIterator( + final Iterator<Map.Entry<byte[], byte[]>> it, + AffinityTopologyVersion topVer, + @Nullable final IgniteBiPredicate<K, V> filter, + ExpiryPolicy expPlc, + final boolean keepBinary + ) { + Iterator <KeyCacheObject> keyIter = new Iterator<KeyCacheObject>() { + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return it.hasNext(); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject next() { + try { + return cctx.toCacheKeyObject(it.next().getKey()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void remove() { + it.remove(); + } + }; + + return new PeekValueExpiryAwareIterator<>(keyIter, expPlc, topVer, filter, keepBinary); + } + /** * @param o Object to inject resources to. * @throws IgniteCheckedException If failure occurred while injecting resources. @@ -3115,4 +3079,145 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte false, keepBinary); } + + private class PeekValueExpiryAwareIterator<K, V> extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> { + /** */ + private final ExpiryPolicy plc; + + /** */ + private final GridCacheAdapter cache; + + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final GridDhtCacheAdapter dht; + + /** */ + private final IgniteBiPredicate<K, V> keyValFilter; + + /** */ + private final boolean keepBinary; + + /** */ + private IgniteBiTuple<K, V> next; + + /** */ + private IgniteCacheExpiryPolicy expiryPlc; + + /** */ + private Iterator<KeyCacheObject> keyIt; + + public PeekValueExpiryAwareIterator( + Iterator<KeyCacheObject> keyIt, + ExpiryPolicy plc, + AffinityTopologyVersion topVer, + IgniteBiPredicate<K, V> keyValFilter, + boolean keepBinary + ) { + this.keyIt = keyIt; + this.plc = plc; + this.topVer = topVer; + this.keyValFilter = keyValFilter; + + dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); + cache = dht != null ? dht : cctx.cache(); + + this.keepBinary = keepBinary; + expiryPlc = cctx.cache().expiryPolicy(plc); + + advance(); + } + + @Override public boolean onHasNext() { + return next != null; + } + + @Override public IgniteBiTuple<K, V> onNext() { + if (next == null) + throw new NoSuchElementException(); + + IgniteBiTuple<K, V> next0 = next; + + advance(); + + return next0; + } + + private void advance() { + IgniteBiTuple<K, V> next0 = null; + + while (keyIt.hasNext()) { + next0 = null; + + KeyCacheObject key = keyIt.next(); + + V val; + + try { + GridCacheEntryEx entry = cache.peekEx(key); + + CacheObject cacheVal = + entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null; + + val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true); + } + catch (GridCacheEntryRemovedException e) { + val = null; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } + + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = cctx.cache().expiryPolicy(plc); + } + + if (val != null) { + next0 = F.t( + (K)cctx.unwrapBinaryIfNeeded(key, keepBinary), + (V)cctx.unwrapBinaryIfNeeded(val, keepBinary)); + + if (checkPredicate(next0)) + break; + else + next0 = null; + } + } + + next = next0 != null ? + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : + null; + + if (next == null) + sendTtlUpdate(); + } + + @Override protected void onClose() { + sendTtlUpdate(); + } + + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + } + + private boolean checkPredicate(Map.Entry<K, V> e) { + if (keyValFilter != null) { + Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, keepBinary); + + return keyValFilter.apply(e0.getKey(), e0.getValue()); + } + + return true; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java index c2ee607..7ff3f26 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java @@ -54,4 +54,9 @@ public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAb @Override protected NearCacheConfiguration nearConfiguration() { return null; } + + /** {@inheritDoc} */ + @Override public void testAccess() throws Exception { + super.testAccess(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java new file mode 100644 index 0000000..f218c14 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.apache.ignite.cache.CacheMemoryMode; + +/** + * + */ +public class IgniteCacheAtomicOffheapExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return CacheMemoryMode.OFFHEAP_TIERED; + } + + /** {@inheritDoc} */ + @Override public void testAccess() throws Exception { + super.testAccess(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 7d22206..f57d860 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -33,21 +33,30 @@ import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import javax.cache.expiry.EternalExpiryPolicy; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.expiry.ModifiedExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -95,7 +104,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs storeMap.clear(); } - /** {@inheritDoc} */ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { CacheConfiguration cfg = super.cacheConfiguration(gridName); @@ -105,6 +113,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs cfg.setExpiryPolicyFactory(factory); + cfg.setMemoryMode(memoryMode()); + + if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED) + cfg.setOffHeapMaxMemory(0); + if (disableEagerTtl) cfg.setEagerTtl(false); @@ -112,7 +125,44 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } /** - * @throws Exception If failed. + * @return Cache memory mode. + */ + protected CacheMemoryMode memoryMode() { + return CacheMemoryMode.ONHEAP_TIERED; + } + + /** + * @throws Exception if failed. + */ + public void testCreateUpdate0() throws Exception { + startGrids(1); + + long ttl = 60L; + + final String key = "key1"; + + final IgniteCache<String, String> cache = jcache(); + + for (int i = 0; i < 1000; i++) { + final IgniteCache<String, String> cache0 = cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(TimeUnit.HOURS, ttl))); + + cache0.put(key, key); + + info("PUT DONE"); + } + + int pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize(); + + assertTrue("Too many pending entries: " + pSize, pSize <= 1); + + cache.remove(key); + + pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize(); + + assertEquals(0, pSize); + } + + /** * @throws Exception If failed. */ public void testZeroOnCreate() throws Exception { factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO); @@ -307,48 +357,50 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs startGrids(); - for (final Integer key : keys()) { - log.info("Test access [key=" + key + ']'); - - access(key); - } - - accessGetAll(); - - for (final Integer key : keys()) { - log.info("Test filterAccessRemove access [key=" + key + ']'); - - filterAccessRemove(key); - } - - for (final Integer key : keys()) { - log.info("Test filterAccessReplace access [key=" + key + ']'); - - filterAccessReplace(key); - } - - if (atomicityMode() == TRANSACTIONAL) { - TransactionConcurrency[] txModes = {PESSIMISTIC}; - - for (TransactionConcurrency txMode : txModes) { - for (final Integer key : keys()) { - log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']'); - - txGet(key, txMode); - } - } - - for (TransactionConcurrency txMode : txModes) { - log.info("Test txGetAll [txMode=" + txMode + ']'); - - txGetAll(txMode); - } - } +// for (final Integer key : keys()) { +// log.info("Test access [key=" + key + ']'); +// +// access(key); +// } +// +// accessGetAll(); +// +// for (final Integer key : keys()) { +// log.info("Test filterAccessRemove access [key=" + key + ']'); +// +// filterAccessRemove(key); +// } +// +// for (final Integer key : keys()) { +// log.info("Test filterAccessReplace access [key=" + key + ']'); +// +// filterAccessReplace(key); +// } +// +// if (atomicityMode() == TRANSACTIONAL) { +// TransactionConcurrency[] txModes = {PESSIMISTIC}; +// +// for (TransactionConcurrency txMode : txModes) { +// for (final Integer key : keys()) { +// log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']'); +// +// txGet(key, txMode); +// } +// } +// +// for (TransactionConcurrency txMode : txModes) { +// log.info("Test txGetAll [txMode=" + txMode + ']'); +// +// txGetAll(txMode); +// } +// } IgniteCache<Integer, Integer> cache = jcache(0); Collection<Integer> putKeys = keys(); + info("Put keys: " + putKeys); + for (final Integer key : putKeys) cache.put(key, key); @@ -359,10 +411,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs while (it.hasNext()) itKeys.add(it.next().getKey()); + info("It keys: " + itKeys); + assertTrue(itKeys.size() >= putKeys.size()); - for (Integer key : itKeys) + for (Integer key : itKeys) { + info("Checking iterator key: " + key); + checkTtl(key, 62_000L, true); + } } /** @@ -1016,7 +1073,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs ClusterNode node = grid(i).cluster().localNode(); for (Integer key : keys) { - Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP); + Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP); if (val != null) { log.info("Unexpected value [grid=" + i + @@ -1059,51 +1116,53 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache(); - GridCacheEntryEx e = cache.peekEx(key); + if (cache.context().isNear()) + cache = cache.context().near().dht(); - if (e == null && cache.context().isNear()) - e = cache.context().near().dht().peekEx(key); + while (true) { + try { + GridCacheEntryEx e = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ? + cache.peekEx(key) : cache.entryEx(key); - if (e != null && e.deleted()) { - assertEquals(0, e.ttl()); + if (e != null && e.deleted()) { + assertEquals(0, e.ttl()); - assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); + assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); - continue; - } + continue; + } - if (e == null) - assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); - else { - found = true; + if (e == null) + assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); + else { + e.unswap(); - if (wait) { - final GridCacheEntryEx e0 = e; + found = true; - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - try { - return e0.ttl() == ttl; - } - catch (Exception e) { - fail("Unexpected error: " + e); + if (wait) + waitTtl(cache, key, ttl); - return true; - } - } - }, 3000); - } + boolean primary = cache.affinity().isPrimary(grid.localNode(), key); + boolean backup = cache.affinity().isBackup(grid.localNode(), key); - boolean primary = cache.affinity().isPrimary(grid.localNode(), key); - boolean backup = cache.affinity().isBackup(grid.localNode(), key); + assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e + + ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl()); - assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e + - ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl()); + if (ttl > 0) + assertTrue(e.expireTime() > 0); + else + assertEquals(0, e.expireTime()); + } - if (ttl > 0) - assertTrue(e.expireTime() > 0); - else - assertEquals(0, e.expireTime()); + break; + } + catch (GridCacheEntryRemovedException ignore) { + // Retry. + } + catch (GridDhtInvalidPartitionException ignore) { + // No need to check. + break; + } } } @@ -1111,6 +1170,40 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } /** + * @param cache Cache. + * @param key Key. + * @param ttl TTL to wait. + * @throws IgniteInterruptedCheckedException If wait has been interrupted. + */ + private void waitTtl(final GridCacheAdapter<Object, Object> cache, final Object key, final long ttl) + throws IgniteInterruptedCheckedException { + GridTestUtils.waitForCondition(new PAX() { + @Override public boolean applyx() throws IgniteCheckedException { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ? + cache.peekEx(key) : cache.entryEx(key); + + assert entry != null; + + entry.unswap(); + + return entry.ttl() == ttl; + } + catch (GridCacheEntryRemovedException ignore) { + // Retry. + } + catch (GridDhtInvalidPartitionException ignore) { + return true; + } + } + } + }, 3000); + } + + /** * */ private static class GetEntryProcessor implements EntryProcessor<Integer, Integer, Integer> { @@ -1153,6 +1246,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs /** {@inheritDoc} */ @Override public Duration getExpiryForAccess() { + U.dumpStack(); + return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null; }
