This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b9644393603 IGNITE-22069 Optimize persistent entries expiration -
Fixes #11319.
b9644393603 is described below
commit b96443936038a0fece129a6112ece79964473f83
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Sat Apr 27 10:43:53 2024 +0300
IGNITE-22069 Optimize persistent entries expiration - Fixes #11319.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../jmh/cache/JmhCacheExpireBenchmark.java | 16 ++-
.../internal/processors/cache/GridCacheUtils.java | 10 --
.../cache/IgniteCacheOffheapManagerImpl.java | 48 +++++----
.../cache/persistence/GridCacheOffheapManager.java | 73 +++++++------
.../cache/persistence/tree/BPlusTree.java | 37 +++++--
.../cache/transactions/TransactionProxyImpl.java | 13 ++-
.../processors/cache/tree/PendingEntriesTree.java | 12 +++
.../cache/GridCacheTtlManagerSelfTest.java | 27 +++++
.../expiry/IgniteCacheExpiryPolicyTestSuite.java | 3 +-
.../cache/expiry/PendingTreeCleaningTest.java | 118 +++++++++++++++++++++
10 files changed, 281 insertions(+), 76 deletions(-)
diff --git
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
index 4c25bbc12b4..79b9bf12a78 100644
---
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
+++
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
@@ -24,7 +24,10 @@ import javax.cache.expiry.Duration;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -33,6 +36,7 @@ import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
@@ -66,6 +70,10 @@ public class JmhCacheExpireBenchmark {
/** Cache with expire policy. */
private IgniteCache<Integer, Integer> cacheExp;
+ /** Persistence enabled. */
+ @Param({"FALSE", "TRUE"})
+ private String persistence;
+
/** */
@Benchmark
public void putWithExpire() {
@@ -87,7 +95,13 @@ public class JmhCacheExpireBenchmark {
*/
@Setup(Level.Trial)
public void setup() {
- ignite = Ignition.start(new
IgniteConfiguration().setIgniteInstanceName("test"));
+ ignite = Ignition.start(new
IgniteConfiguration().setIgniteInstanceName("test")
+ .setDataStorageConfiguration(new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new
DataRegionConfiguration().setPersistenceEnabled(Boolean.parseBoolean(persistence))
+ ))
+ );
+
+ ignite.cluster().state(ClusterState.ACTIVE);
cacheReg = ignite.getOrCreateCache(new
CacheConfiguration<>("CACHE_REG"));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index dd87b339ed3..0a12ac3959c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -860,16 +860,6 @@ public class GridCacheUtils {
ctx.ttl().expire(TTL_BATCH_SIZE);
}
- /**
- * @param ctx Shared cache context.
- */
- public static <K, V> void unwindEvicts(GridCacheSharedContext<K, V> ctx) {
- assert ctx != null;
-
- for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts())
- unwindEvicts(cacheCtx);
- }
-
/**
* @param asc {@code True} for ascending.
* @return Descending order comparator.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c1f23ebe791..edfe8d0bc75 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -271,13 +271,7 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
if (pendingEntries != null) {
PendingRow row = new PendingRow(cacheId);
- GridCursor<PendingRow> cursor = pendingEntries.find(row,
row, PendingEntriesTree.WITHOUT_KEY);
-
- while (cursor.next()) {
- boolean res = pendingEntries.removex(cursor.get());
-
- assert res;
- }
+ while (pendingEntries.removex(row, row, -1));
}
}
}
@@ -1135,26 +1129,36 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
return 0;
try {
- List<PendingRow> rows = pendingEntries.remove(
- new PendingRow(cacheId, Long.MIN_VALUE, 0), new
PendingRow(cacheId, U.currentTimeMillis(), 0), amount);
+ int cleared = 0;
+
+ do {
+ List<PendingRow> rows = pendingEntries.remove(new
PendingRow(cacheId, Long.MIN_VALUE, 0),
+ new PendingRow(cacheId, U.currentTimeMillis(), 0),
amount - cleared);
+
+ if (rows.isEmpty())
+ break;
- for (PendingRow row : rows) {
- if (row.key.partition() == -1)
- row.key.partition(cctx.affinity().partition(row.key));
+ for (PendingRow row : rows) {
+ if (row.key.partition() == -1)
+
row.key.partition(cctx.affinity().partition(row.key));
- assert row.key != null && row.link != 0 && row.expireTime
!= 0 : row;
+ assert row.key != null && row.link != 0 &&
row.expireTime != 0 : row;
- if (obsoleteVer == null)
- obsoleteVer = cctx.cache().nextVersion();
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.cache().nextVersion();
- GridCacheEntryEx entry = cctx.cache().entryEx(row.key
instanceof KeyCacheObjectImpl
- ? new
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) :
row.key);
+ GridCacheEntryEx entry = cctx.cache().entryEx(row.key
instanceof KeyCacheObjectImpl
+ ? new
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) :
row.key);
+
+ if (entry != null)
+ c.apply(entry, obsoleteVer);
+ }
- if (entry != null)
- c.apply(entry, obsoleteVer);
+ cleared += rows.size();
}
+ while (amount < 0 || cleared < amount);
- return rows.size();
+ return cleared;
}
finally {
busyLock.leaveBusy();
@@ -1977,7 +1981,7 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
* This entry key is used to indicate that an expired entry has already
been deleted from
* PendingEntriesTree and doesn't need to participate in
PendingEntriesTree cleanup again.
*/
- private static class ExpiredKeyCacheObject extends KeyCacheObjectImpl {
+ protected static class ExpiredKeyCacheObject extends KeyCacheObjectImpl {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -1988,7 +1992,7 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
private long link;
/** */
- private ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long
expireTime, long link) {
+ public ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long
expireTime, long link) {
super(keyCacheObj.val, keyCacheObj.valBytes,
keyCacheObj.partition());
this.expireTime = expireTime;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 12fd42aed61..9de2efe0e31 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
@@ -67,6 +68,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
import
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
@@ -1107,7 +1109,28 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
try {
int cleared = 0;
+ // Use random shift to reduce contention.
+ int shift =
ThreadLocalRandom.current().nextInt(F.size(cacheDataStores().iterator()));
+
+ int cnt = 0;
+ for (CacheDataStore store : cacheDataStores()) {
+ if (cnt++ < shift) // On the first iteration skip entries
before <shift>.
+ continue;
+
+ cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c,
unwindThrottlingTimeout, amount - cleared);
+
+ if (amount != -1 && cleared >= amount)
+ return true;
+ }
+
+ if (shift == 0)
+ return false;
+
+ cnt = 0;
for (CacheDataStore store : cacheDataStores()) {
+ if (cnt++ >= shift) // On the second iteration skip entries
after <shift>.
+ break;
+
cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c,
unwindThrottlingTimeout, amount - cleared);
if (amount != -1 && cleared >= amount)
@@ -2625,17 +2648,7 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
if (pendingTree != null) {
PendingRow row = new PendingRow(cacheId);
- GridCursor<PendingRow> cursor = pendingTree.find(row, row,
PendingEntriesTree.WITHOUT_KEY);
-
- while (cursor.next()) {
- PendingRow row0 = cursor.get();
-
- assert row0.link != 0 : row;
-
- boolean res = pendingTree.removex(row0);
-
- assert res;
- }
+ while (pendingTree.removex(row, row, -1));
}
delegate0.clear(cacheId);
@@ -2723,43 +2736,37 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
long now = U.currentTimeMillis();
- GridCursor<PendingRow> cur;
-
- if (grp.sharedGroup())
- cur = pendingTree.find(new PendingRow(cctx.cacheId()),
new PendingRow(cctx.cacheId(), now, 0));
- else
- cur = pendingTree.find(null, new
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
-
- if (!cur.next())
- return 0;
-
- GridCacheVersion obsoleteVer = null;
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() :
CU.UNDEFINED_CACHE_ID;
int cleared = 0;
do {
- PendingRow row = cur.get();
+ List<PendingRow> rows = pendingTree.remove(new
PendingRow(cacheId, Long.MIN_VALUE, 0),
+ new PendingRow(cacheId, now, 0), amount - cleared);
+
+ if (rows.isEmpty())
+ break;
- if (amount != -1 && cleared > amount)
- return cleared;
+ for (PendingRow row : rows) {
+ row.key.partition(partId);
- assert row.key != null && row.link != 0 &&
row.expireTime != 0 : row;
+ assert row.key != null && row.link != 0 &&
row.expireTime != 0 : row;
- row.key.partition(partId);
+ GridCacheVersion obsoleteVer = null;
- if (pendingTree.removex(row)) {
if (obsoleteVer == null)
obsoleteVer = cctx.cache().nextVersion();
- GridCacheEntryEx e1 =
cctx.cache().entryEx(row.key);
+ GridCacheEntryEx entry =
cctx.cache().entryEx(row.key instanceof KeyCacheObjectImpl
+ ? new
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) :
row.key);
- if (e1 != null)
- c.apply(e1, obsoleteVer);
+ if (entry != null)
+ c.apply(entry, obsoleteVer);
}
- cleared++;
+ cleared += rows.size();
}
- while (cur.next());
+ while (amount < 0 || cleared < amount);
return cleared;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 055138e9172..9816a18aeba 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -724,7 +724,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
// Search row should point to the rightmost element, otherwise we
won't find it on the inner node.
if (res == FOUND && r.needReplaceInner == TRUE)
- r.row = getRow(io, leafAddr, highIdx);
+ r.row = getRow(io, leafAddr, highIdx, r.x);
return res;
}
@@ -2160,7 +2160,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
assert canGetRowFromInner : "Not supported";
assert limit >= 0 : limit;
- RemoveRange rmvOp = new RemoveRange(lower, upper, true, limit);
+ RemoveRange rmvOp = new RemoveRange(lower, upper, true, null, limit);
doRemove(rmvOp);
@@ -2169,6 +2169,20 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
return Collections.unmodifiableList(rmvOp.removedRows);
}
+ /**
+ * @param lower Lower bound (inclusive).
+ * @param upper Upper bound (inclusive).
+ * @param x Implementation specific argument.
+ * @param limit Limit of processed entries by single call, {@code 0} or
negative value for no limit.
+ * @return {@code True} if removed at least one row.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected boolean removex(L lower, L upper, Object x, int limit) throws
IgniteCheckedException {
+ Boolean res = (Boolean)doRemove(new RemoveRange(lower, upper, false,
x, limit));
+
+ return res != null ? res : false;
+ }
+
/** {@inheritDoc} */
@Override public void invoke(L row, Object z, InvokeClosure<T> c) throws
IgniteCheckedException {
checkDestroyed();
@@ -4716,6 +4730,9 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
/** */
final boolean needOld;
+ /** */
+ final Object x;
+
/** */
final PageHandler<Remove, Result> rmvFromLeafHnd;
@@ -4724,18 +4741,20 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @param needOld {@code True} If need return old value.
*/
private Remove(L row, boolean needOld) {
- this(row, needOld, rmvFromLeaf);
+ this(row, needOld, null, rmvFromLeaf);
}
/**
* @param row Row.
* @param needOld {@code True} If need return old value.
+ * @param x Implementation specific argument.
* @param rmvFromLeaf Remove from leaf page handler.
*/
- private Remove(L row, boolean needOld, PageHandler<Remove, Result>
rmvFromLeaf) {
+ private Remove(L row, boolean needOld, Object x, PageHandler<Remove,
Result> rmvFromLeaf) {
super(row);
this.needOld = needOld;
+ this.x = x;
rmvFromLeafHnd = rmvFromLeaf;
}
@@ -5193,7 +5212,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
assert !isRemoved() : "already removed";
// Detach the row.
- rmvd = needOld ? getRow(io, pageAddr, idx) : (T)Boolean.TRUE;
+ rmvd = needOld ? getRow(io, pageAddr, idx, x) : (T)Boolean.TRUE;
doRemove(pageId, page, pageAddr, walPlc, io, cnt, idx);
@@ -6602,9 +6621,11 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @param lower Lower bound (inclusive).
* @param upper Upper bound (inclusive).
* @param needOld {@code True} If need return old value.
+ * @param x Implementation specific argument, {@code null} always
means that we need a full detached data row.
+ * @param limit Limit of processed entries by single call, {@code 0}
or negative value for no limit.
*/
- protected RemoveRange(L lower, L upper, boolean needOld, int limit) {
- super(lower, needOld, rmvRangeFromLeaf);
+ protected RemoveRange(L lower, L upper, boolean needOld, Object x, int
limit) {
+ super(lower, needOld, x, rmvRangeFromLeaf);
this.lower = lower;
this.upper = upper;
@@ -6667,7 +6688,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
// Delete from right to left to reduce the number of items moved
during the delete operation.
for (int i = highIdx; i >= idx; i--) {
if (needOld)
- removedRows.add(getRow(io, pageAddr, i));
+ removedRows.add(getRow(io, pageAddr, i, x));
doRemove(pageId, page, pageAddr, walPlc, io, cnt - highIdx +
i, i);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 3cc390e1477..7a2bd504d8b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -26,10 +26,12 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -144,7 +146,16 @@ public class TransactionProxyImpl<K, V> implements
TransactionProxy, Externaliza
*/
private void leave() {
try {
- CU.unwindEvicts(cctx);
+ GridIntList cacheIds = tx.txState().cacheIds();
+
+ for (int i = 0; i < cacheIds.size(); i++) {
+ int cacheId = cacheIds.get(i);
+
+ GridCacheContext<K, V> ctx = cctx.cacheContext(cacheId);
+
+ if (ctx != null)
+ CU.unwindEvicts(ctx);
+ }
tx.leaveSystemSection();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
index 282237e050a..a54c357e51c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
@@ -120,6 +120,18 @@ public class PendingEntriesTree extends
BPlusTree<PendingRow, PendingRow> {
return Long.compare(link, row.link);
}
+ /**
+ * @param lower Lower bound (inclusive).
+ * @param upper Upper bound (inclusive).
+ * @param limit Limit of processed entries by single call, {@code 0} or
negative value for no limit.
+ * @return {@code True} if removed at least one row.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean removex(PendingRow lower, PendingRow upper, int limit)
throws IgniteCheckedException {
+ return removex(lower, upper, WITHOUT_KEY, limit);
+ }
+
+
/** {@inheritDoc} */
@Override public PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr,
int idx, Object flag)
throws IgniteCheckedException {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
index 151bb9f130d..a3c9acdfa86 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -28,7 +29,10 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
@@ -38,10 +42,13 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -50,14 +57,28 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
* TTL manager self test.
*/
+@RunWith(Parameterized.class)
public class GridCacheTtlManagerSelfTest extends GridCommonAbstractTest {
/** Test cache mode. */
protected CacheMode cacheMode;
+ /** */
+ @Parameterized.Parameter
+ public boolean pds;
+
+ /** */
+ @Parameterized.Parameters(name = "pds={0}")
+ public static Collection<?> parameters() {
+ return F.asList(false, true);
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setDataStorageConfiguration(new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(pds)));
+
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(cacheMode);
@@ -93,6 +114,8 @@ public class GridCacheTtlManagerSelfTest extends
GridCommonAbstractTest {
final IgniteKernal g = (IgniteKernal)startGrid(0);
+ g.cluster().state(ClusterState.ACTIVE);
+
try {
final String key = "key";
@@ -116,6 +139,7 @@ public class GridCacheTtlManagerSelfTest extends
GridCommonAbstractTest {
}
finally {
stopAllGrids();
+ cleanPersistenceDir();
}
}
@@ -186,6 +210,8 @@ public class GridCacheTtlManagerSelfTest extends
GridCommonAbstractTest {
final int records = 1500;
+ g.cluster().state(ClusterState.ACTIVE);
+
IgniteCache<Object, Object> cache =
g.cache(DEFAULT_CACHE_NAME).withExpiryPolicy(
new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)));
@@ -213,6 +239,7 @@ public class GridCacheTtlManagerSelfTest extends
GridCommonAbstractTest {
}
finally {
BPlusTree.testHndWrapper = null;
+ cleanPersistenceDir();
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 4900dad3892..3e0cdb5ee33 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -57,7 +57,8 @@ import org.junit.runners.Suite;
IgniteCacheExpireWhileRebalanceTest.class,
- ExpiryPolicyInfoLoggingTest.class
+ ExpiryPolicyInfoLoggingTest.class,
+ PendingTreeCleaningTest.class,
})
public class IgniteCacheExpiryPolicyTestSuite {
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/PendingTreeCleaningTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/PendingTreeCleaningTest.java
new file mode 100644
index 00000000000..37cccc54cab
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/PendingTreeCleaningTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import java.util.Collection;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class PendingTreeCleaningTest extends GridCommonAbstractTest {
+ /** */
+ @Parameterized.Parameter
+ public boolean pds;
+
+ /** */
+ @Parameterized.Parameters(name = "pds={0}")
+ public static Collection<?> parameters() {
+ return F.asList(false, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(pds));
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPendingTreeCleaningOnCacheDestroy() throws Exception {
+ IgniteEx ignite = startGrid();
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ String cache1 = "cache1";
+ String cache2 = "cache2";
+ String grp = "grp";
+
+ ignite.getOrCreateCache(new
CacheConfiguration<>(cache1).setGroupName(grp)
+ .setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(Duration.ONE_DAY))));
+
+ ignite.getOrCreateCache(new
CacheConfiguration<>(cache2).setGroupName(grp)
+ .setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(Duration.ONE_DAY))));
+
+ int cnt = 10_000;
+
+ try (IgniteDataStreamer<Object, Object> ds =
ignite.dataStreamer(cache1)) {
+ for (int i = 0; i < cnt; i++)
+ ds.addData(i, i);
+ }
+
+ try (IgniteDataStreamer<Object, Object> ds =
ignite.dataStreamer(cache2)) {
+ for (int i = 0; i < cnt; i++)
+ ds.addData(i, i);
+ }
+
+ CacheGroupContext gctx =
ignite.context().cache().cache(cache1).context().group();
+
+ assertEquals(cnt * 2, gctx.offheap().expiredSize());
+
+ ignite.destroyCache(cache2);
+
+ assertEquals(cnt, gctx.offheap().expiredSize());
+ }
+}