This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 85cf73f IGNITE-12124 Fixed possible NullPointerException/Error
related to the cache stop with configured TTL
85cf73f is described below
commit 85cf73fb964152b1edd5baa730bcf9e34e761819
Author: sk0x50 <[email protected]>
AuthorDate: Fri Nov 29 15:16:06 2019 +0300
IGNITE-12124 Fixed possible NullPointerException/Error related to the cache
stop with configured TTL
Signed-off-by: Andrey Gura <[email protected]>
---
.../processors/cache/GridCacheProcessor.java | 6 ++
.../cache/GridCacheSharedTtlCleanupManager.java | 94 ++++++++++++++--------
.../processors/cache/GridCacheTtlManager.java | 9 +++
.../cache/persistence/db/IgnitePdsWithTtlTest.java | 8 ++
4 files changed, 85 insertions(+), 32 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 18a4212..2f1e5bc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2718,6 +2718,12 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+ // TTL manager has to be unregistered before
the checkpointReadLock is acquired.
+ GridCacheAdapter<?, ?> cache =
caches.get(action.request().cacheName());
+
+ if (cache != null)
+ cache.context().ttl().unregister();
+
sharedCtx.database().checkpointReadLock();
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index 4094559..bd9b675 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -17,13 +17,15 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
@@ -44,17 +46,15 @@ public class GridCacheSharedTtlCleanupManager extends
GridCacheSharedManagerAdap
/** Cleanup worker. */
private CleanupWorker cleanupWorker;
- /** Mutex on worker thread creation. */
- private final Object mux = new Object();
+ /** Lock on worker thread creation. */
+ private final ReentrantLock lock = new ReentrantLock();
- /** List of registered ttl managers. */
- private List<GridCacheTtlManager> mgrs = new CopyOnWriteArrayList<>();
+ /** Map of registered ttl managers, where the cache id is used as the key.
*/
+ private final Map<Integer, GridCacheTtlManager> mgrs = new
ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
- synchronized (mux) {
- stopCleanupWorker();
- }
+ stopCleanupWorker();
}
/**
@@ -63,12 +63,10 @@ public class GridCacheSharedTtlCleanupManager extends
GridCacheSharedManagerAdap
* @param mgr ttl manager of cache.
* */
public void register(GridCacheTtlManager mgr) {
- synchronized (mux) {
- if (cleanupWorker == null)
- startCleanupWorker();
+ if (mgrs.isEmpty())
+ startCleanupWorker();
- mgrs.add(mgr);
- }
+ mgrs.put(mgr.context().cacheId(), mgr);
}
/**
@@ -77,12 +75,10 @@ public class GridCacheSharedTtlCleanupManager extends
GridCacheSharedManagerAdap
* @param mgr ttl manager of cache.
* */
public void unregister(GridCacheTtlManager mgr) {
- synchronized (mux) {
- mgrs.remove(mgr);
+ mgrs.remove(mgr.context().cacheId());
- if (mgrs.isEmpty())
- stopCleanupWorker();
- }
+ if (mgrs.isEmpty())
+ stopCleanupWorker();
}
/**
@@ -91,27 +87,51 @@ public class GridCacheSharedTtlCleanupManager extends
GridCacheSharedManagerAdap
public boolean eagerTtlEnabled() {
assert cctx != null : "Manager is not started";
- return cleanupWorker != null;
+ lock.lock();
+
+ try {
+ return cleanupWorker != null;
+ }
+ finally {
+ lock.unlock();
+ }
}
/**
*
*/
private void startCleanupWorker() {
- cleanupWorker = new CleanupWorker();
+ lock.lock();
- new IgniteThread(cleanupWorker).start();
+ try {
+ if (cleanupWorker != null)
+ return;
+
+ cleanupWorker = new CleanupWorker();
+
+ new IgniteThread(cleanupWorker).start();
+ }
+ finally {
+ lock.unlock();
+ }
}
/**
*
*/
private void stopCleanupWorker() {
- if (null != cleanupWorker) {
- U.cancel(cleanupWorker);
- U.join(cleanupWorker, log);
+ lock.lock();
+
+ try {
+ if (null != cleanupWorker) {
+ U.cancel(cleanupWorker);
+ U.join(cleanupWorker, log);
- cleanupWorker = null;
+ cleanupWorker = null;
+ }
+ }
+ finally {
+ lock.unlock();
}
}
@@ -143,14 +163,24 @@ public class GridCacheSharedTtlCleanupManager extends
GridCacheSharedManagerAdap
assert !cctx.kernalContext().recoveryMode();
+ final AtomicBoolean expiredRemains = new AtomicBoolean();
+
while (!isCancelled()) {
- boolean expiredRemains = false;
+ expiredRemains.set(false);
- for (GridCacheTtlManager mgr : mgrs) {
+ for (Map.Entry<Integer, GridCacheTtlManager> mgr :
mgrs.entrySet()) {
updateHeartbeat();
- if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
- expiredRemains = true;
+ Integer processedCacheID = mgr.getKey();
+
+ // Need to be sure that the cache to be processed will
not be unregistered and,
+ // therefore, stopped during the process of expiration
is in progress.
+ mgrs.computeIfPresent(processedCacheID, (id, m) -> {
+ if (m.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+ expiredRemains.set(true);
+
+ return m;
+ });
if (isCancelled())
return;
@@ -158,7 +188,7 @@ public class GridCacheSharedTtlCleanupManager extends
GridCacheSharedManagerAdap
updateHeartbeat();
- if (!expiredRemains)
+ if (!expiredRemains.get())
U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
onIdle();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index fa6385d..72e403b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -121,6 +121,15 @@ public class GridCacheTtlManager extends
GridCacheManagerAdapter {
@Override protected void onKernalStop0(boolean cancel) {
if (pendingEntries != null)
pendingEntries.clear();
+ }
+
+ /**
+ * Unregister this TTL manager of cache from periodical check on expired
entries.
+ */
+ public void unregister() {
+ // Ignoring attempt to unregister manager that has never been started.
+ if (!starting.get())
+ return;
cctx.shared().ttl().unregister(this);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
index afe2b34..e04f2a5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -172,6 +172,8 @@ public class IgnitePdsWithTtlTest extends
GridCommonAbstractTest {
waitAndCheckExpired(srv, srv.cache(CACHE_NAME + "-" + (cacheCnt - 1)));
+ srv.cluster().active(false);
+
stopAllGrids();
}
@@ -206,6 +208,8 @@ public class IgnitePdsWithTtlTest extends
GridCommonAbstractTest {
waitAndCheckExpired(srv, cache);
+ srv.cluster().active(false);
+
stopAllGrids();
}
@@ -230,6 +234,8 @@ public class IgnitePdsWithTtlTest extends
GridCommonAbstractTest {
waitAndCheckExpired(srv, cache);
+ srv.cluster().active(false);
+
stopAllGrids();
}
@@ -268,6 +274,8 @@ public class IgnitePdsWithTtlTest extends
GridCommonAbstractTest {
stopGrid(1);
startGrid(1);
+
+ srv.cluster().active(false);
}
finally {
stopAllGrids();