This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 85cf73f  IGNITE-12124 Fixed possible NullPointerException/Error 
related to the cache stop with configured TTL
85cf73f is described below

commit 85cf73fb964152b1edd5baa730bcf9e34e761819
Author: sk0x50 <[email protected]>
AuthorDate: Fri Nov 29 15:16:06 2019 +0300

    IGNITE-12124 Fixed possible NullPointerException/Error related to the cache 
stop with configured TTL
    
    Signed-off-by: Andrey Gura <[email protected]>
---
 .../processors/cache/GridCacheProcessor.java       |  6 ++
 .../cache/GridCacheSharedTtlCleanupManager.java    | 94 ++++++++++++++--------
 .../processors/cache/GridCacheTtlManager.java      |  9 +++
 .../cache/persistence/db/IgnitePdsWithTtlTest.java |  8 ++
 4 files changed, 85 insertions(+), 32 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 18a4212..2f1e5bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2718,6 +2718,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                                 
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
 
+                                // TTL manager has to be unregistered before 
the checkpointReadLock is acquired.
+                                GridCacheAdapter<?, ?> cache = 
caches.get(action.request().cacheName());
+
+                                if (cache != null)
+                                    cache.context().ttl().unregister();
+
                                 sharedCtx.database().checkpointReadLock();
 
                                 try {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index 4094559..bd9b675 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -17,13 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
@@ -44,17 +46,15 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
     /** Cleanup worker. */
     private CleanupWorker cleanupWorker;
 
-    /** Mutex on worker thread creation. */
-    private final Object mux = new Object();
+    /** Lock on worker thread creation. */
+    private final ReentrantLock lock = new ReentrantLock();
 
-    /** List of registered ttl managers. */
-    private List<GridCacheTtlManager> mgrs = new CopyOnWriteArrayList<>();
+    /** Map of registered ttl managers, where the cache id is used as the key. 
*/
+    private final Map<Integer, GridCacheTtlManager> mgrs = new 
ConcurrentHashMap<>();
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
-        synchronized (mux) {
-            stopCleanupWorker();
-        }
+        stopCleanupWorker();
     }
 
     /**
@@ -63,12 +63,10 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
      * @param mgr ttl manager of cache.
      * */
     public void register(GridCacheTtlManager mgr) {
-        synchronized (mux) {
-            if (cleanupWorker == null)
-                startCleanupWorker();
+        if (mgrs.isEmpty())
+            startCleanupWorker();
 
-            mgrs.add(mgr);
-        }
+        mgrs.put(mgr.context().cacheId(), mgr);
     }
 
     /**
@@ -77,12 +75,10 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
      * @param mgr ttl manager of cache.
      * */
     public void unregister(GridCacheTtlManager mgr) {
-        synchronized (mux) {
-            mgrs.remove(mgr);
+        mgrs.remove(mgr.context().cacheId());
 
-            if (mgrs.isEmpty())
-                stopCleanupWorker();
-        }
+        if (mgrs.isEmpty())
+            stopCleanupWorker();
     }
 
     /**
@@ -91,27 +87,51 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
     public boolean eagerTtlEnabled() {
         assert cctx != null : "Manager is not started";
 
-        return cleanupWorker != null;
+        lock.lock();
+
+        try {
+            return cleanupWorker != null;
+        }
+        finally {
+            lock.unlock();
+        }
     }
 
     /**
      *
      */
     private void startCleanupWorker() {
-        cleanupWorker = new CleanupWorker();
+        lock.lock();
 
-        new IgniteThread(cleanupWorker).start();
+        try {
+            if (cleanupWorker != null)
+                return;
+
+            cleanupWorker = new CleanupWorker();
+
+            new IgniteThread(cleanupWorker).start();
+        }
+        finally {
+            lock.unlock();
+        }
     }
 
     /**
      *
      */
     private void stopCleanupWorker() {
-        if (null != cleanupWorker) {
-            U.cancel(cleanupWorker);
-            U.join(cleanupWorker, log);
+        lock.lock();
+
+        try {
+            if (null != cleanupWorker) {
+                U.cancel(cleanupWorker);
+                U.join(cleanupWorker, log);
 
-            cleanupWorker = null;
+                cleanupWorker = null;
+            }
+        }
+        finally {
+            lock.unlock();
         }
     }
 
@@ -143,14 +163,24 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
 
                 assert !cctx.kernalContext().recoveryMode();
 
+                final AtomicBoolean expiredRemains = new AtomicBoolean();
+
                 while (!isCancelled()) {
-                    boolean expiredRemains = false;
+                    expiredRemains.set(false);
 
-                    for (GridCacheTtlManager mgr : mgrs) {
+                    for (Map.Entry<Integer, GridCacheTtlManager> mgr : 
mgrs.entrySet()) {
                         updateHeartbeat();
 
-                        if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
-                            expiredRemains = true;
+                        Integer processedCacheID = mgr.getKey();
+
+                        // Need to be sure that the cache to be processed will 
not be unregistered and,
+                        // therefore, stopped during the process of expiration 
is in progress.
+                        mgrs.computeIfPresent(processedCacheID, (id, m) -> {
+                            if (m.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+                                expiredRemains.set(true);
+
+                            return m;
+                        });
 
                         if (isCancelled())
                             return;
@@ -158,7 +188,7 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
 
                     updateHeartbeat();
 
-                    if (!expiredRemains)
+                    if (!expiredRemains.get())
                         U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
 
                     onIdle();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index fa6385d..72e403b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -121,6 +121,15 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
     @Override protected void onKernalStop0(boolean cancel) {
         if (pendingEntries != null)
             pendingEntries.clear();
+    }
+
+    /**
+     * Unregister this TTL manager of cache from periodical check on expired 
entries.
+     */
+    public void unregister() {
+        // Ignoring attempt to unregister manager that has never been started.
+        if (!starting.get())
+            return;
 
         cctx.shared().ttl().unregister(this);
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
index afe2b34..e04f2a5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -172,6 +172,8 @@ public class IgnitePdsWithTtlTest extends 
GridCommonAbstractTest {
 
         waitAndCheckExpired(srv, srv.cache(CACHE_NAME + "-" + (cacheCnt - 1)));
 
+        srv.cluster().active(false);
+
         stopAllGrids();
     }
 
@@ -206,6 +208,8 @@ public class IgnitePdsWithTtlTest extends 
GridCommonAbstractTest {
 
         waitAndCheckExpired(srv, cache);
 
+        srv.cluster().active(false);
+
         stopAllGrids();
     }
 
@@ -230,6 +234,8 @@ public class IgnitePdsWithTtlTest extends 
GridCommonAbstractTest {
 
         waitAndCheckExpired(srv, cache);
 
+        srv.cluster().active(false);
+
         stopAllGrids();
     }
 
@@ -268,6 +274,8 @@ public class IgnitePdsWithTtlTest extends 
GridCommonAbstractTest {
 
             stopGrid(1);
             startGrid(1);
+
+            srv.cluster().active(false);
         }
         finally {
             stopAllGrids();

Reply via email to