Repository: ignite
Updated Branches:
  refs/heads/master 78c371208 -> 1bc605866


ignite-3621 Use single ttl cleanup worker thread for all caches


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1bc60586
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1bc60586
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1bc60586

Branch: refs/heads/master
Commit: 1bc6058669023042f10d715e0736c6c87a521c56
Parents: 78c3712
Author: Andrey Martianov <amartia...@gridgain.com>
Authored: Tue Sep 20 17:41:49 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Sep 20 17:41:49 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |   2 +
 .../cache/GridCacheSharedContext.java           |  24 +++-
 .../cache/GridCacheSharedTtlCleanupManager.java | 132 +++++++++++++++++++
 .../processors/cache/GridCacheTtlManager.java   | 115 ++++------------
 .../GridCacheTtlManagerNotificationTest.java    | 107 ++++++++++++++-
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +
 ...eCacheOnlyOneTtlCleanupThreadExistsTest.java | 102 ++++++++++++++
 .../loadtests/hashmap/GridCacheTestContext.java |   2 +
 8 files changed, 384 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6640db8..0a0b40a 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1870,6 +1870,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         GridCachePartitionExchangeManager exchMgr = new 
GridCachePartitionExchangeManager();
         GridCacheIoManager ioMgr = new GridCacheIoManager();
         CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
+        GridCacheSharedTtlCleanupManager ttl = new 
GridCacheSharedTtlCleanupManager();
 
         CacheJtaManagerAdapter jta = JTA.createOptional();
 
@@ -1882,6 +1883,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             exchMgr,
             topMgr,
             ioMgr,
+            ttl,
             jta,
             storeSesLsnrs
         );

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 0cdf0a4..8f39235 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -93,6 +93,9 @@ public class GridCacheSharedContext<K, V> {
     /** Affinity manager. */
     private CacheAffinitySharedManager affMgr;
 
+    /** Ttl cleanup manager. */
+    private GridCacheSharedTtlCleanupManager ttlMgr;
+
     /** Cache contexts map. */
     private ConcurrentMap<Integer, GridCacheContext<K, V>> ctxMap;
 
@@ -135,6 +138,7 @@ public class GridCacheSharedContext<K, V> {
      * @param exchMgr Exchange manager.
      * @param affMgr Affinity manager.
      * @param ioMgr IO manager.
+     * @param ttlMgr Ttl cleanup manager.
      * @param jtaMgr JTA manager.
      * @param storeSesLsnrs Store session listeners.
      */
@@ -147,12 +151,13 @@ public class GridCacheSharedContext<K, V> {
         GridCachePartitionExchangeManager<K, V> exchMgr,
         CacheAffinitySharedManager<K, V> affMgr,
         GridCacheIoManager ioMgr,
+        GridCacheSharedTtlCleanupManager ttlMgr,
         CacheJtaManagerAdapter jtaMgr,
         Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
 
-        setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, 
affMgr, ioMgr);
+        setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, 
affMgr, ioMgr, ttlMgr);
 
         this.storeSesLsnrs = storeSesLsnrs;
 
@@ -248,7 +253,8 @@ public class GridCacheSharedContext<K, V> {
             new GridCacheDeploymentManager<K, V>(),
             new GridCachePartitionExchangeManager<K, V>(),
             affMgr,
-            ioMgr);
+            ioMgr,
+            ttlMgr);
 
         this.mgrs = mgrs;
 
@@ -272,13 +278,14 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @param mgrs Managers list.
      * @param txMgr Transaction manager.
+     * @param jtaMgr JTA manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
      * @param depMgr Deployment manager.
      * @param exchMgr Exchange manager.
      * @param affMgr Affinity manager.
      * @param ioMgr IO manager.
-     * @param jtaMgr JTA manager.
+     * @param ttlMgr Ttl cleanup manager.
      */
     private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
         IgniteTxManager txMgr,
@@ -288,7 +295,8 @@ public class GridCacheSharedContext<K, V> {
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
         CacheAffinitySharedManager affMgr,
-        GridCacheIoManager ioMgr) {
+        GridCacheIoManager ioMgr,
+        GridCacheSharedTtlCleanupManager ttlMgr) {
         this.mvccMgr = add(mgrs, mvccMgr);
         this.verMgr = add(mgrs, verMgr);
         this.txMgr = add(mgrs, txMgr);
@@ -297,6 +305,7 @@ public class GridCacheSharedContext<K, V> {
         this.exchMgr = add(mgrs, exchMgr);
         this.affMgr = add(mgrs, affMgr);
         this.ioMgr = add(mgrs, ioMgr);
+        this.ttlMgr = add(mgrs, ttlMgr);
     }
 
     /**
@@ -493,6 +502,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Ttl cleanup manager.
+     * */
+    public GridCacheSharedTtlCleanupManager ttl() {
+        return ttlMgr;
+    }
+
+    /**
      * @return Cache deployment manager.
      */
     public GridCacheDeploymentManager<K, V> deploy() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
new file mode 100644
index 0000000..d7d2cad
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Periodically removes expired entities from caches with {@link 
CacheConfiguration#isEagerTtl()} flag set.
+ */
+public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdapter {
+    /** Ttl cleanup worker thread sleep interval, ms. */
+    private static final long CLEANUP_WORKER_SLEEP_INTERVAL = 500;
+
+    /** Limit of expired entries processed by worker for certain cache in one 
pass. */
+    private static final int CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT = 1000;
+
+    /** Cleanup worker. */
+    private CleanupWorker cleanupWorker;
+
+    /** Mutex on worker thread creation. */
+    private final Object mux = new Object();
+
+    /** List of registered ttl managers. */
+    private List<GridCacheTtlManager> mgrs = new CopyOnWriteArrayList<>();
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        synchronized (mux) {
+            stopCleanupWorker();
+        }
+    }
+
+    /**
+     * Register ttl manager of cache for periodical check on expired entries.
+     *
+     * @param mgr ttl manager of cache.
+     * */
+    public void register(GridCacheTtlManager mgr) {
+        synchronized (mux) {
+            if (cleanupWorker == null)
+                startCleanupWorker();
+
+            mgrs.add(mgr);
+        }
+    }
+
+    /**
+     * Unregister ttl manager of cache from periodical check on expired 
entries.
+     *
+     * @param mgr ttl manager of cache.
+     * */
+    public void unregister(GridCacheTtlManager mgr) {
+        synchronized (mux) {
+            mgrs.remove(mgr);
+
+            if (mgrs.isEmpty())
+                stopCleanupWorker();
+        }
+    }
+
+    /**
+     *
+     */
+    private void startCleanupWorker() {
+        cleanupWorker = new CleanupWorker();
+
+        new IgniteThread(cleanupWorker).start();
+    }
+
+    /**
+     *
+     */
+    private void stopCleanupWorker() {
+        if (null != cleanupWorker) {
+            U.cancel(cleanupWorker);
+            U.join(cleanupWorker, log);
+
+            cleanupWorker = null;
+        }
+    }
+
+    /**
+     * Entry cleanup worker.
+     */
+    private class CleanupWorker extends GridWorker {
+        /**
+         * Creates cleanup worker.
+         */
+        CleanupWorker() {
+            super(cctx.gridName(), "ttl-cleanup-worker", 
cctx.logger(GridCacheSharedTtlCleanupManager.class));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                boolean expiredRemains = false;
+
+                for (GridCacheTtlManager mgr : mgrs) {
+                    if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+                        expiredRemains = true;
+
+                    if (isCancelled())
+                        return;
+                }
+
+                if (!expiredRemains)
+                    U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 8ff0358..996544f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -17,20 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
 
@@ -43,19 +38,6 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
     /** Entries pending removal. */
     private final GridConcurrentSkipListSetEx pendingEntries = new 
GridConcurrentSkipListSetEx();
 
-    /** Cleanup worker. */
-    private CleanupWorker cleanupWorker;
-
-    /** Mutex. */
-    private final Object mux = new Object();
-
-    /** Next expire time. */
-    private volatile long nextExpireTime;
-
-    /** Next expire time updater. */
-    private static final AtomicLongFieldUpdater<GridCacheTtlManager> 
nextExpireTimeUpdater =
-        AtomicLongFieldUpdater.newUpdater(GridCacheTtlManager.class, 
"nextExpireTime");
-
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
@@ -68,19 +50,14 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
         if (cleanupDisabled)
             return;
 
-        cleanupWorker = new CleanupWorker();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        if (cleanupWorker != null)
-            new IgniteThread(cleanupWorker).start();
+        cctx.shared().ttl().register(this);
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
-        U.cancel(cleanupWorker);
-        U.join(cleanupWorker, log);
+        pendingEntries.clear();
+
+        cctx.shared().ttl().unregister(this);
     }
 
     /**
@@ -90,27 +67,10 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
      */
     public void addTrackedEntry(GridCacheMapEntry entry) {
         assert Thread.holdsLock(entry);
-        assert cleanupWorker != null;
 
         EntryWrapper e = new EntryWrapper(entry);
 
         pendingEntries.add(e);
-
-        while (true) {
-            long nextExpireTime = this.nextExpireTime;
-
-            if (e.expireTime < nextExpireTime) {
-                if (nextExpireTimeUpdater.compareAndSet(this, nextExpireTime, 
e.expireTime)) {
-                    synchronized (mux) {
-                        mux.notifyAll();
-                    }
-
-                    break;
-                }
-            }
-            else
-                break;
-        }
     }
 
     /**
@@ -118,7 +78,6 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
      */
     public void removeTrackedEntry(GridCacheMapEntry entry) {
         assert Thread.holdsLock(entry);
-        assert cleanupWorker != null;
 
         pendingEntries.remove(new EntryWrapper(entry));
     }
@@ -141,15 +100,27 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
      * Expires entries by TTL.
      */
     public void expire() {
+        expire(-1);
+    }
+
+    /**
+     * Processes specified amount of expired entries.
+     *
+     * @param amount Limit of processed entries by single call, {@code -1} for 
no limit.
+     * @return {@code True} if unprocessed expired entries remains.
+     */
+    public boolean expire(int amount) {
         long now = U.currentTimeMillis();
 
         GridCacheVersion obsoleteVer = null;
 
-        for (int size = pendingEntries.sizex(); size > 0; size--) {
+        int limit = (-1 != amount) ? amount : pendingEntries.sizex();
+
+        for (int cnt = limit; cnt > 0; cnt--) {
             EntryWrapper e = pendingEntries.firstx();
 
             if (e == null || e.expireTime > now)
-                return;
+                return false; // All expired entries are processed.
 
             if (pendingEntries.remove(e)) {
                 if (obsoleteVer == null)
@@ -158,7 +129,6 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
                 if (log.isTraceEnabled())
                     log.trace("Trying to remove expired entry from cache: " + 
e);
 
-
                 boolean touch = false;
 
                 GridCacheEntryEx entry = e.ctx.cache().entryEx(e.key);
@@ -181,53 +151,14 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
                     entry.context().evicts().touch(entry, null);
             }
         }
-    }
-
-    /**
-     * Entry cleanup worker.
-     */
-    private class CleanupWorker extends GridWorker {
-        /**
-         * Creates cleanup worker.
-         */
-        CleanupWorker() {
-            super(cctx.gridName(), "ttl-cleanup-worker-" + cctx.name(), 
cctx.logger(GridCacheTtlManager.class));
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                expire();
-
-                long waitTime;
-
-                while (true) {
-                    long curTime = U.currentTimeMillis();
 
-                    GridCacheTtlManager.EntryWrapper first = 
pendingEntries.firstx();
-
-                    if (first == null) {
-                        waitTime = 500;
-                        nextExpireTime = curTime + 500;
-                    }
-                    else {
-                        long expireTime = first.expireTime;
-
-                        waitTime = expireTime - curTime;
-                        nextExpireTime = expireTime;
-                    }
-
-                    synchronized (mux) {
-                        if (pendingEntries.firstx() == first) {
-                            if (waitTime > 0)
-                                mux.wait(waitTime);
+        if (amount != -1) {
+            EntryWrapper e = pendingEntries.firstx();
 
-                            break;
-                        }
-                    }
-                }
-            }
+            return e != null && e.expireTime <= now;
         }
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
index 85a491e..79f8a65 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.expiry.CreatedExpiryPolicy;
@@ -24,6 +26,7 @@ import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -44,6 +47,12 @@ import static java.util.concurrent.TimeUnit.SECONDS;
  *
  */
 public class GridCacheTtlManagerNotificationTest extends 
GridCommonAbstractTest {
+    /** Count of caches in multi caches test. */
+    private static final int CACHES_CNT = 10;
+
+    /** Prefix for cache name fir multi caches test. */
+    private static final String CACHE_PREFIX = "cache-";
+
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
 
@@ -60,14 +69,30 @@ public class GridCacheTtlManagerNotificationTest extends 
GridCommonAbstractTest
 
         cfg.setDiscoverySpi(discoSpi);
 
+        CacheConfiguration[] ccfgs = new CacheConfiguration[CACHES_CNT + 1];
+
+        ccfgs[0] = createCacheConfiguration(null);
+
+        for (int i = 0; i < CACHES_CNT; i++)
+            ccfgs[i + 1] = createCacheConfiguration(CACHE_PREFIX + i);
+
+        cfg.setCacheConfiguration(ccfgs);
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration createCacheConfiguration(String name) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(cacheMode);
         ccfg.setEagerTtl(true);
+        ccfg.setName(name);
 
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
+        return ccfg;
     }
 
     /**
@@ -104,8 +129,10 @@ public class GridCacheTtlManagerNotificationTest extends 
GridCommonAbstractTest
     }
 
     /**
-     * Add in several threads value to cache with different expiration policy.
-     * Wait for expiration of keys with small expiration duration.
+     * Adds in several threads value to cache with different expiration policy.
+     * Waits for expiration of keys with small expiration duration.
+     *
+     * @throws Exception If failed.
      */
     public void testThatNotificationWorkAsExpectedInMultithreadedMode() throws 
Exception {
         final CyclicBarrier barrier = new CyclicBarrier(21);
@@ -152,16 +179,83 @@ public class GridCacheTtlManagerNotificationTest extends 
GridCommonAbstractTest
         }
     }
 
+    /**
+     * Adds in several threads value to several caches with different 
expiration policy.
+     * Waits for expiration of keys with small expiration duration.
+     *
+     * @throws Exception If failed.
+     */
+    public void testThatNotificationWorkAsExpectedManyCaches() throws 
Exception {
+        final int smallDuration = 4_000;
+
+        final int cnt = 1_000;
+        final int cacheCnt = CACHES_CNT;
+        final int threadCnt = 2;
+
+        final CyclicBarrier barrier = new CyclicBarrier(2 * threadCnt * 
cacheCnt + 1);
+        final AtomicInteger keysRangeGen = new AtomicInteger();
+        final AtomicInteger evtCnt = new AtomicInteger(0);
+        final List<IgniteCache<Object, Object>> caches = new 
ArrayList<>(cacheCnt);
+
+        try (final Ignite g = startGrid(0)) {
+            for (int i = 0; i < cacheCnt; i++) {
+                IgniteCache<Object, Object> cache = g.cache("cache-" + i);
+
+                caches.add(cache);
+            }
+
+            g.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    evtCnt.incrementAndGet();
+
+                    return true;
+                }
+            }, EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+            for (int i = 0; i < cacheCnt; i++) {
+                GridTestUtils.runMultiThreadedAsync(
+                    new CacheFiller(caches.get(i), 100_000, barrier, 
keysRangeGen, cnt),
+                    threadCnt,
+                    "put-large-duration");
+
+                GridTestUtils.runMultiThreadedAsync(
+                    new CacheFiller(caches.get(i), smallDuration, barrier, 
keysRangeGen, cnt),
+                    threadCnt,
+                    "put-small-duration");
+            }
+
+            barrier.await();
+
+            Thread.sleep(1_000);
+
+            barrier.await();
+
+            for (int i = 0; i < cacheCnt; i++)
+                assertEquals("Unexpected size of " + CACHE_PREFIX + i, 2 * 
threadCnt * cnt, caches.get(i).size());
+
+            Thread.sleep(2 * smallDuration);
+
+            for (int i = 0; i < cacheCnt; i++)
+                assertEquals("Unexpected size of " + CACHE_PREFIX + i, 
threadCnt * cnt, caches.get(i).size());
+
+            assertEquals("Unexpected count of expired entries", threadCnt * 
CACHES_CNT * cnt, evtCnt.get());
+        }
+    }
+
     /** */
     private static class CacheFiller implements Runnable {
         /** Barrier. */
         private final CyclicBarrier barrier;
+
         /** Keys range generator. */
         private final AtomicInteger keysRangeGenerator;
+
         /** Count. */
         private final int cnt;
+
         /** Cache. */
         private final IgniteCache<Object, Object> cache;
+
         /** Expiration duration. */
         private final int expirationDuration;
 
@@ -187,6 +281,7 @@ public class GridCacheTtlManagerNotificationTest extends 
GridCommonAbstractTest
                 barrier.await();
 
                 ExpiryPolicy plc1 = new CreatedExpiryPolicy(new 
Duration(MILLISECONDS, expirationDuration));
+
                 int keyStart = keysRangeGenerator.getAndIncrement() * cnt;
 
                 for (int i = keyStart; i < keyStart + cnt; i++)
@@ -195,7 +290,7 @@ public class GridCacheTtlManagerNotificationTest extends 
GridCommonAbstractTest
                 barrier.await();
             }
             catch (Exception e) {
-                e.printStackTrace();
+                throw new IgniteException(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 28cb2da..e371dc7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -72,7 +72,9 @@ public class IgniteCacheExpiryPolicyTestSuite extends 
TestSuite {
 
         suite.addTestSuite(IgniteCacheExpireAndUpdateConsistencyTest.class);
 
+        // Eager ttl expiration tests.
         suite.addTestSuite(GridCacheTtlManagerNotificationTest.class);
+        suite.addTestSuite(IgniteCacheOnlyOneTtlCleanupThreadExistsTest.class);
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
new file mode 100644
index 0000000..84f5144
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks that one and only one Ttl cleanup worker thread must exists, and only
+ * if at least one cache with set 'eagerTtl' flag exists.
+ */
+public class IgniteCacheOnlyOneTtlCleanupThreadExistsTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME1 = "cache-1";
+
+    /** */
+    private static final String CACHE_NAME2 = "cache-2";
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOnlyOneTtlCleanupThreadExists() throws Exception {
+        try (final Ignite g = startGrid(0)) {
+            checkCleanupThreadExists(false);
+
+            g.createCache(createCacheConfiguration(CACHE_NAME1, false));
+
+            checkCleanupThreadExists(false);
+
+            g.createCache(createCacheConfiguration(CACHE_NAME2, true));
+
+            checkCleanupThreadExists(true);
+
+            g.destroyCache(CACHE_NAME1);
+
+            checkCleanupThreadExists(true);
+
+            g.createCache(createCacheConfiguration(CACHE_NAME1, true));
+
+            checkCleanupThreadExists(true);
+
+            g.destroyCache(CACHE_NAME1);
+
+            checkCleanupThreadExists(true);
+
+            g.destroyCache(CACHE_NAME2);
+
+            checkCleanupThreadExists(false);
+        }
+    }
+
+    /**
+     * @param name Cache name.
+     * @param eagerTtl Eager ttl falg.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration createCacheConfiguration(String name, boolean 
eagerTtl) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setEagerTtl(eagerTtl);
+        ccfg.setName(name);
+
+        return ccfg;
+    }
+
+    /**
+     * @param exists {@code True} if ttl cleanup worker thread expected.
+     * @throws Exception If failed.
+     */
+    private void checkCleanupThreadExists(boolean exists) throws Exception {
+        int cnt = 0;
+
+        for (Thread t : Thread.getAllStackTraces().keySet()) {
+            if (t.getName().contains("ttl-cleanup-worker"))
+                cnt++;
+        }
+
+        if (cnt > 1)
+            fail("More then one ttl cleanup worker threads exists");
+
+        if (exists)
+            assertEquals("Ttl cleanup thread does not exist", cnt, 1);
+        else
+            assertEquals("Ttl cleanup thread exists", cnt, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index fb82e20..6c2c4c1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeMan
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
 import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
+import 
org.apache.ignite.internal.processors.cache.GridCacheSharedTtlCleanupManager;
 import 
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager;
@@ -68,6 +69,7 @@ public class GridCacheTestContext<K, V> extends 
GridCacheContext<K, V> {
                 new GridCachePartitionExchangeManager<K, V>(),
                 new CacheAffinitySharedManager<K, V>(),
                 new GridCacheIoManager(),
+                new GridCacheSharedTtlCleanupManager(),
                 new CacheNoopJtaManager(),
                 null
             ),

Reply via email to