Repository: incubator-ignite Updated Branches: refs/heads/ignite-728 [created] 931aeef16
ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/931aeef1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/931aeef1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/931aeef1 Branch: refs/heads/ignite-728 Commit: 931aeef16034d6c7ee5758268c2a43e756862b6b Parents: 0a28058 Author: agura <[email protected]> Authored: Thu Apr 23 21:26:31 2015 +0300 Committer: agura <[email protected]> Committed: Thu Apr 23 21:29:24 2015 +0300 ---------------------------------------------------------------------- .../eviction/sorted/SortedEvictionPolicy.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 10 +- .../processors/cache/GridCacheTtlManager.java | 164 ++++++++++++------- .../processors/cache/GridCacheUtils.java | 5 +- 4 files changed, 114 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/931aeef1/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java index 0065244..7965c97 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java @@ -381,7 +381,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE private static final long serialVersionUID = 0L; /** Size. */ - private volatile LongAdder8 size = new LongAdder8(); + private final LongAdder8 size = new LongAdder8(); /** * @param comp Comparator. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/931aeef1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c9f55f5..9dc61f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2515,7 +2515,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Override public void onMarkedObsolete() { - // No-op. + cctx.ttl().removeTrackedEntry(this); } /** {@inheritDoc} */ @@ -3295,13 +3295,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { try { synchronized (this) { - CacheObject expiredVal = saveValueForIndexUnlocked(); - - boolean hasOldBytes = valPtr != 0; + if (expireTimeExtras() <= U.currentTimeMillis() && checkExpired()) { + CacheObject expiredVal = saveValueForIndexUnlocked(); - boolean expired = checkExpired(); + boolean hasOldBytes = valPtr != 0; - if (expired) { if (!obsolete()) { if (cctx.deferredDelete() && !detached() && !isInternal()) { if (!deletedUnlocked()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/931aeef1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 5198b53..d8af2b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.thread.*; -import java.util.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; /** * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set. @@ -34,14 +35,11 @@ import java.util.*; @SuppressWarnings("NakedNotify") public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ - private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>(); + private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx(); /** Cleanup worker thread. */ private CleanupWorker cleanupWorker; - /** Sync mutex. */ - private final Object mux = new Object(); - /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl()) @@ -68,24 +66,13 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { * @param entry Entry to add. */ public void addTrackedEntry(GridCacheMapEntry entry) { - EntryWrapper wrapper = new EntryWrapper(entry); - - pendingEntries.add(wrapper); - - // If entry is on the first position, notify waiting thread. - if (wrapper == pendingEntries.firstx()) { - synchronized (mux) { - mux.notifyAll(); - } - } + pendingEntries.add(new EntryWrapper(entry)); } /** * @param entry Entry to remove. */ public void removeTrackedEntry(GridCacheMapEntry entry) { - // Remove must be called while holding lock on entry before updating expire time. - // No need to wake up waiting thread in this case. pendingEntries.remove(new EntryWrapper(entry)); } @@ -97,6 +84,45 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { } /** + * Expires entries by TTL. + * + * @param sizeLimited Size limited. + */ + public void expire(boolean sizeLimited) { + long now = U.currentTimeMillis(); + + GridCacheVersion obsoleteVer = null; + + int size = pendingEntries.sizex(); + + while (!sizeLimited || size-- > 0) { + EntryWrapper e = pendingEntries.pollFirst(); + + if (e == null) + break; + + if (e.expireTime > now) { + pendingEntries.add(e); + + break; + } + + if (obsoleteVer == null) + obsoleteVer = cctx.versions().next(); + + if (log.isDebugEnabled()) + log.debug("Trying to remove expired entry from cache: " + e); + + if (e.entry.onTtlExpired(obsoleteVer)) { + e.entry.context().cache().removeEntry(e.entry); + + if (e.entry.context().cache().configuration().isStatisticsEnabled()) + e.entry.context().cache().metrics0().onEvict(); + } + } + } + + /** * Entry cleanup worker. */ private class CleanupWorker extends GridWorker { @@ -110,52 +136,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { while (!isCancelled()) { - long now = U.currentTimeMillis(); - - GridCacheVersion obsoleteVer = null; - - for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext(); ) { - EntryWrapper wrapper = it.next(); - - if (wrapper.expireTime <= now) { - if (log.isDebugEnabled()) - log.debug("Trying to remove expired entry from cache: " + wrapper); - - if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); - - if (wrapper.entry.onTtlExpired(obsoleteVer)) - wrapper.entry.context().cache().removeEntry(wrapper.entry); + expire(false); - if (wrapper.entry.context().cache().configuration().isStatisticsEnabled()) - wrapper.entry.context().cache().metrics0().onEvict(); + EntryWrapper first = pendingEntries.firstx(); - it.remove(); - } - else - break; - } + if (first != null) { + long waitTime = first.expireTime - U.currentTimeMillis(); - synchronized (mux) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTrackedEntry(..)' method. - EntryWrapper first = pendingEntries.firstx(); - - if (first != null) { - long waitTime = first.expireTime - U.currentTimeMillis(); - - if (waitTime > 0) - mux.wait(waitTime); - else - break; - } - else - mux.wait(5000); - } + if (waitTime > 0) + U.sleep(waitTime); } + else + U.sleep(500); } } } @@ -214,4 +206,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { return res; } } + + /** + * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition: + * <ul> + * <li>{@code #add()}</li> + * <li>{@code #remove()}</li> + * <li>{@code #pollFirst()}</li> + * <ul/> + */ + private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> { + /** */ + private static final long serialVersionUID = 0L; + + /** Size. */ + private final LongAdder8 size = new LongAdder8(); + + /** + * @return Size based on performed operations. + */ + public int sizex() { + return size.intValue(); + } + + /** {@inheritDoc} */ + @Override public boolean add(EntryWrapper e) { + boolean res = super.add(e); + + assert res; + + size.increment(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + boolean res = super.remove(o); + + if (res) + size.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Nullable @Override public EntryWrapper pollFirst() { + EntryWrapper e = super.pollFirst(); + + if (e != null) + size.decrement(); + + return e; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/931aeef1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 51385ed..8ac0793 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1196,6 +1196,8 @@ public class GridCacheUtils { if (ctx.isNear()) ctx.near().dht().context().evicts().unwind(); + + ctx.ttl().expire(true); } /** @@ -1205,11 +1207,12 @@ public class GridCacheUtils { assert ctx != null; for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) { - cacheCtx.evicts().unwind(); if (cacheCtx.isNear()) cacheCtx.near().dht().context().evicts().unwind(); + + cacheCtx.ttl().expire(true); } }
