This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f05862b IGNITE-6930 FreeLists onheap caching to minimize count of WAL
records
f05862b is described below
commit f05862b5ef4f6a54d503a549a73651f7df7f5dde
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon Oct 14 20:33:31 2019 +0300
IGNITE-6930 FreeLists onheap caching to minimize count of WAL records
---
.../org/apache/ignite/IgniteSystemProperties.java | 8 +
.../processors/cache/mvcc/txlog/TxLog.java | 8 +-
.../cache/persistence/GridCacheOffheapManager.java | 17 +-
.../IgniteCacheDatabaseSharedManager.java | 3 +-
.../persistence/freelist/AbstractFreeList.java | 34 +-
.../cache/persistence/freelist/CacheFreeList.java | 7 +-
.../cache/persistence/freelist/PagesList.java | 439 ++++++++++++++++++++-
.../cache/persistence/metastorage/MetaStorage.java | 10 +-
.../partstorage/PartitionMetaStorage.java | 2 +-
.../partstorage/PartitionMetaStorageImpl.java | 6 +-
.../persistence/tree/reuse/ReuseListImpl.java | 20 +-
.../db/checkpoint/CheckpointFreeListTest.java | 74 ++++
...eWalFlushMultiNodeFailoverAbstractSelfTest.java | 7 +-
.../db/wal/WalRecoveryTxLogicalRecordsTest.java | 113 ++++--
.../persistence/freelist/FreeListCachingTest.java | 218 ++++++++++
.../database/BPlusTreeReuseSelfTest.java | 10 +-
.../processors/database/CacheFreeListSelfTest.java | 3 +-
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
18 files changed, 903 insertions(+), 79 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ad394e8..600495e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1277,6 +1277,14 @@ public final class IgniteSystemProperties {
"IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT";
/**
+ * Disable onheap caching of pages lists (free lists and reuse lists).
+ * If persistence is enabled changes to page lists are not stored to page
memory immediately, they are cached in
+ * onheap buffer and flushes to page memory on a checkpoint. This property
allows to disable such onheap caching.
+ * Default value is <code>false</code>.
+ */
+ public static final String IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING =
"IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
index 0c659f9..3d2a8fd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -177,7 +178,8 @@ public class TxLog implements DbCheckpointListener {
wal,
reuseListRoot,
isNew,
- txLogReuseListLockLsnr
+ txLogReuseListLockLsnr,
+ ctx
);
tree = new TxLogTree(
@@ -236,11 +238,11 @@ public class TxLog implements DbCheckpointListener {
private void saveReuseListMetadata(Context ctx) throws
IgniteCheckedException {
Executor executor = ctx.executor();
if (executor == null)
- reuseList.saveMetadata();
+ reuseList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
else {
executor.execute(() -> {
try {
- reuseList.saveMetadata();
+ reuseList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index df847ab..b321321 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -159,7 +159,8 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
ctx.wal(),
reuseListRoot.pageId().pageId(),
reuseListRoot.isAllocated(),
-
diagnosticMgr.pageLockTracker().createPageLockTracker(reuseListName)
+
diagnosticMgr.pageLockTracker().createPageLockTracker(reuseListName),
+ ctx.kernalContext()
);
RootPage metastoreRoot = metas.treeRoot;
@@ -254,7 +255,7 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
*/
private void syncMetadata(Context ctx, Executor execSvc, boolean
needSnapshot) throws IgniteCheckedException {
if (execSvc == null) {
- reuseList.saveMetadata();
+ reuseList.saveMetadata(grp.statisticsHolderData());
for (CacheDataStore store : partDataStores.values())
saveStoreMetadata(store, ctx, false, needSnapshot);
@@ -262,7 +263,7 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
else {
execSvc.execute(() -> {
try {
- reuseList.saveMetadata();
+ reuseList.saveMetadata(grp.statisticsHolderData());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -294,7 +295,7 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
RowStore rowStore0 = store.rowStore();
if (rowStore0 != null) {
- ((CacheFreeList)rowStore0.freeList()).saveMetadata();
+
((CacheFreeList)rowStore0.freeList()).saveMetadata(grp.statisticsHolderData());
PartitionMetaStorage<SimpleDataRow> partStore =
store.partStorage();
@@ -385,7 +386,7 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
}
if (changed)
- partStore.saveMetadata();
+ partStore.saveMetadata(grp.statisticsHolderData());
changed |= io.setUpdateCounter(partMetaPageAddr,
updCntr);
changed |= io.setGlobalRemoveId(partMetaPageAddr,
rmvId);
@@ -1692,7 +1693,8 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
ctx.wal(),
reuseRoot.pageId().pageId(),
reuseRoot.isAllocated(),
-
ctx.diagnostic().pageLockTracker().createPageLockTracker(freeListName)
+
ctx.diagnostic().pageLockTracker().createPageLockTracker(freeListName),
+ ctx.kernalContext()
) {
/** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws
IgniteCheckedException {
@@ -1715,7 +1717,8 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
ctx.wal(),
partMetastoreReuseListRoot.pageId().pageId(),
partMetastoreReuseListRoot.isAllocated(),
-
ctx.diagnostic().pageLockTracker().createPageLockTracker(partitionMetaStoreName)
+
ctx.diagnostic().pageLockTracker().createPageLockTracker(partitionMetaStoreName),
+ ctx.kernalContext()
) {
/** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws
IgniteCheckedException {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 73459dd..a1a7913 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -267,7 +267,8 @@ public class IgniteCacheDatabaseSharedManager extends
GridCacheSharedManagerAdap
persistenceEnabled ? cctx.wal() : null,
0L,
true,
- lsnr
+ lsnr,
+ cctx.kernalContext()
);
freeListMap.put(memPlcCfg.getName(), freeList);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
index cb32219..f28a6d0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.processors.cache.persistence.freelist;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
@@ -77,6 +79,9 @@ public abstract class AbstractFreeList<T extends Storable>
extends PagesList imp
/** */
private final AtomicReferenceArray<Stripe[]> buckets = new
AtomicReferenceArray<>(BUCKETS);
+ /** Onheap bucket page list caches. */
+ private final AtomicReferenceArray<PagesCache> bucketCaches = new
AtomicReferenceArray<>(BUCKETS);
+
/** */
private final int MIN_SIZE_FOR_DATA_PAGE;
@@ -427,9 +432,10 @@ public abstract class AbstractFreeList<T extends Storable>
extends PagesList imp
IgniteWriteAheadLogManager wal,
long metaPageId,
boolean initNew,
- PageLockListener lockLsnr
+ PageLockListener lockLsnr,
+ GridKernalContext ctx
) throws IgniteCheckedException {
- super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId,
lockLsnr);
+ super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId,
lockLsnr, ctx);
rmvRow = new RemoveRowHandler(cacheId == 0);
@@ -540,6 +546,11 @@ public abstract class AbstractFreeList<T extends Storable>
extends PagesList imp
return bucket;
}
+ /** {@inheritDoc} */
+ @Override protected int getBucketIndex(int freeSpace) {
+ return freeSpace > MIN_PAGE_FREE_SPACE ? bucket(freeSpace, false) : -1;
+ }
+
/**
* @param part Partition.
* @return Page ID.
@@ -830,7 +841,14 @@ public abstract class AbstractFreeList<T extends Storable>
extends PagesList imp
/** {@inheritDoc} */
@Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[]
upd) {
- return buckets.compareAndSet(bucket, exp, upd);
+ boolean res = buckets.compareAndSet(bucket, exp, upd);
+
+ if (log.isDebugEnabled()) {
+ log.debug("CAS bucket [list=" + name + ", bucket=" + bucket + ",
old=" + Arrays.toString(exp) +
+ ", new=" + Arrays.toString(upd) + ", res=" + res + ']');
+ }
+
+ return res;
}
/** {@inheritDoc} */
@@ -838,6 +856,16 @@ public abstract class AbstractFreeList<T extends Storable>
extends PagesList imp
return bucket == REUSE_BUCKET;
}
+ /** {@inheritDoc} */
+ @Override protected PagesCache getBucketCache(int bucket, boolean create) {
+ PagesCache pagesCache = bucketCaches.get(bucket);
+
+ if (pagesCache == null && create &&
!bucketCaches.compareAndSet(bucket, null, pagesCache = new PagesCache()))
+ pagesCache = bucketCaches.get(bucket);
+
+ return pagesCache;
+ }
+
/**
* @return Number of empty data pages in free list.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeList.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeList.java
index 45e0c48..3f7051f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeList.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeList.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.freelist;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -48,7 +49,8 @@ public class CacheFreeList extends
AbstractFreeList<CacheDataRow> {
IgniteWriteAheadLogManager wal,
long metaPageId,
boolean initNew,
- PageLockListener lockLsnr
+ PageLockListener lockLsnr,
+ GridKernalContext ctx
) throws IgniteCheckedException {
super(
cacheId,
@@ -59,7 +61,8 @@ public class CacheFreeList extends
AbstractFreeList<CacheDataRow> {
wal,
metaPageId,
initNew,
- lockLsnr
+ lockLsnr,
+ ctx
);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
index a4767b3..19df27e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
@@ -21,10 +21,15 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -45,8 +50,6 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersion
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
import
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
-import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
import org.apache.ignite.internal.util.GridArrays;
import org.apache.ignite.internal.util.GridLongList;
@@ -77,6 +80,10 @@ public abstract class PagesList extends DataStructure {
Math.max(8, Runtime.getRuntime().availableProcessors()));
/** */
+ private final boolean pagesListCachingDisabledSysProp =
+
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING,
false);
+
+ /** */
protected final AtomicLong[] bucketsSize;
/** */
@@ -91,9 +98,18 @@ public abstract class PagesList extends DataStructure {
/** Name (for debug purposes). */
protected final String name;
+ /** Flag to enable/disable onheap list caching. */
+ private volatile boolean onheapListCachingEnabled;
+
/** */
private final PageHandler<Void, Boolean> cutTail = new CutTail();
+ /** */
+ private final PageHandler<Void, Boolean> putBucket = new PutBucket();
+
+ /** Logger. */
+ protected final IgniteLogger log;
+
/**
*
*/
@@ -128,6 +144,41 @@ public abstract class PagesList extends DataStructure {
}
/**
+ *
+ */
+ private final class PutBucket extends PageHandler<Void, Boolean> {
+ /** {@inheritDoc} */
+ @Override public Boolean run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ Boolean walPlc,
+ Void ignore,
+ int oldBucket,
+ IoStatisticsHolder statHolder
+ ) throws IgniteCheckedException {
+ decrementBucketSize(oldBucket);
+
+ // Recalculate bucket because page free space can be changed
concurrently.
+ int freeSpace = ((AbstractDataPageIO)iox).getFreeSpace(pageAddr);
+
+ int newBucket = getBucketIndex(freeSpace);
+
+ if (newBucket != oldBucket && log.isDebugEnabled()) {
+ log.debug("Bucket changed when moving from heap to PageMemory
[list=" + name + ", oldBucket=" + oldBucket +
+ ", newBucket=" + newBucket + ", pageId=" + pageId + ']');
+ }
+
+ if (newBucket >= 0)
+ put(null, pageId, page, pageAddr, newBucket, statHolder);
+
+ return TRUE;
+ }
+ }
+
+ /**
* @param cacheId Cache ID.
* @param name Name (for debug purpose).
* @param pageMem Page memory.
@@ -142,7 +193,8 @@ public abstract class PagesList extends DataStructure {
int buckets,
IgniteWriteAheadLogManager wal,
long metaPageId,
- PageLockListener lockLsnr
+ PageLockListener lockLsnr,
+ GridKernalContext ctx
) {
super(cacheId, null, pageMem, wal, lockLsnr);
@@ -154,6 +206,10 @@ public abstract class PagesList extends DataStructure {
for (int i = 0; i < buckets; i++)
bucketsSize[i] = new AtomicLong();
+
+ onheapListCachingEnabled = isCachingApplicable();
+
+ log = ctx.log(PagesList.class);
}
/**
@@ -258,19 +314,30 @@ public abstract class PagesList extends DataStructure {
}
/**
+ * @return {@code True} if onheap caching is applicable for this pages
list. {@code False} if caching is disabled
+ * explicitly by system property or if page list belongs to in-memory data
region (in this case onheap caching
+ * makes no sense).
+ */
+ private boolean isCachingApplicable() {
+ return !pagesListCachingDisabledSysProp && (wal != null);
+ }
+
+ /**
* Save metadata without exclusive lock on it.
*
* @throws IgniteCheckedException If failed.
*/
- public void saveMetadata() throws IgniteCheckedException {
+ public void saveMetadata(IoStatisticsHolder statHolder) throws
IgniteCheckedException {
long nextPageId = metaPageId;
assert nextPageId != 0;
+ flushBucketsCache(statHolder);
+
if (!changed)
return;
- //This guaranteed that any concurrently changes of list will be
detected.
+ // This guaranteed that any concurrently changes of list will be
detected.
changed = false;
try {
@@ -279,13 +346,55 @@ public abstract class PagesList extends DataStructure {
markUnusedPagesDirty(unusedPageId);
}
catch (Throwable e) {
- changed = true;//Return changed flag due to exception.
+ changed = true; // Return changed flag due to exception.
throw e;
}
}
/**
+ * Flush onheap cached pages lists to page memory.
+ */
+ private void flushBucketsCache(IoStatisticsHolder statHolder) throws
IgniteCheckedException {
+ if (!isCachingApplicable())
+ return;
+
+ onheapListCachingEnabled = false;
+
+ try {
+ for (int bucket = 0; bucket < buckets; bucket++) {
+ PagesCache pagesCache = getBucketCache(bucket, false);
+
+ if (pagesCache == null)
+ continue;
+
+ GridLongList pages = pagesCache.flush();
+
+ if (pages != null) {
+ for (int i = 0; i < pages.size(); i++) {
+ long pageId = pages.get(i);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Move page from heap to PageMemory
[list=" + name + ", bucket=" + bucket +
+ ", pageId=" + pageId + ']');
+ }
+
+ Boolean res = write(pageId, putBucket, bucket, null,
statHolder);
+
+ if (res == null) {
+ // Return page to onheap pages list if can't lock
it.
+ pagesCache.add(pageId);
+ }
+ }
+ }
+ }
+ }
+ finally {
+ onheapListCachingEnabled = true;
+ }
+ }
+
+ /**
* Write free list data to page memory.
*
* @param nextPageId First free page id.
@@ -408,6 +517,13 @@ public abstract class PagesList extends DataStructure {
}
/**
+ * Gets bucket index by page freespace.
+ *
+ * @return Bucket index or -1 if page doesn't belong to any bucket.
+ */
+ protected abstract int getBucketIndex(int freeSpace);
+
+ /**
* @param bucket Bucket index.
* @return Bucket.
*/
@@ -428,6 +544,12 @@ public abstract class PagesList extends DataStructure {
protected abstract boolean isReuseBucket(int bucket);
/**
+ * @param bucket Bucket index.
+ * @return Bucket cache.
+ */
+ protected abstract PagesCache getBucketCache(int bucket, boolean create);
+
+ /**
* @param io IO.
* @param prevId Previous page ID.
* @param prev Previous page buffer.
@@ -494,6 +616,11 @@ public abstract class PagesList extends DataStructure {
for (; ; ) {
Stripe[] tails = getBucket(bucket);
+ if (log.isDebugEnabled()) {
+ log.debug("Update tail [list=" + name + ", bucket=" +
bucket + ", oldTailId=" + oldTailId +
+ ", newTailId=" + newTailId + ", tails=" +
Arrays.toString(tails));
+ }
+
// Tail must exist to be updated.
assert !F.isEmpty(tails) : "Missing tails [bucket=" + bucket +
", tails=" + Arrays.toString(tails) +
", metaPage=" + U.hexLong(metaPageId) + ']';
@@ -674,6 +801,17 @@ public abstract class PagesList extends DataStructure {
if (bag != null && bag.isEmpty()) // Skip allocating stripe for empty
bag.
return;
+ if (bag == null && onheapListCachingEnabled &&
+ putDataPage(getBucketCache(bucket, true), dataId, dataPage,
dataAddr, bucket)) {
+ // Successfully put page to the onheap pages list cache.
+ if (log.isDebugEnabled()) {
+ log.debug("Put page to pages list cache [list=" + name + ",
bucket=" + bucket +
+ ", dataId=" + dataId + ']');
+ }
+
+ return;
+ }
+
for (int lockAttempt = 0; ;) {
Stripe stripe = getPageForPut(bucket, bag);
@@ -731,6 +869,11 @@ public abstract class PagesList extends DataStructure {
putDataPage(tailId, tailPage, tailAddr, io, dataId,
dataPage, dataAddr, bucket, statHolder);
if (ok) {
+ if (log.isDebugEnabled()) {
+ log.debug("Put page to pages list [list=" + name +
", bucket=" + bucket +
+ ", dataId=" + dataId + ", tailId=" + tailId +
']');
+ }
+
stripe.empty = false;
return;
@@ -794,6 +937,42 @@ public abstract class PagesList extends DataStructure {
}
/**
+ * @param dataId Data page ID.
+ * @param dataPage Data page pointer.
+ * @param dataAddr Data page address.
+ * @param bucket Bucket.
+ * @return {@code true} If succeeded.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean putDataPage(
+ PagesCache pagesCache,
+ final long dataId,
+ final long dataPage,
+ final long dataAddr,
+ int bucket
+ ) throws IgniteCheckedException {
+ if (pagesCache.add(dataId)) {
+ incrementBucketSize(bucket);
+
+ AbstractDataPageIO dataIO = PageIO.getPageIO(dataAddr);
+
+ if (dataIO.getFreeListPageId(dataAddr) != 0L) {
+ dataIO.setFreeListPageId(dataAddr, 0L);
+
+ // Actually, there is no real need for this WAL record, but it
has relatively low cost and provides
+ // anytime consistency between page memory and WAL (without
this record WAL is consistent with
+ // page memory only at the time of checkpoint, but it doesn't
affect recovery guarantees).
+ if (needWalDeltaRecord(dataId, dataPage, null))
+ wal.log(new DataPageSetFreeListPageRecord(grpId, dataId,
0L));
+ }
+
+ return true;
+ }
+ else
+ return false;
+ }
+
+ /**
* @param pageId Page ID.
* @param page Page pointer.
* @param pageAddr Page address.
@@ -1101,6 +1280,21 @@ public abstract class PagesList extends DataStructure {
*/
protected long takeEmptyPage(int bucket, @Nullable IOVersions initIoVers,
IoStatisticsHolder statHolder) throws IgniteCheckedException {
+ PagesCache pagesCache = getBucketCache(bucket, false);
+
+ long pageId;
+
+ if (pagesCache != null && (pageId = pagesCache.poll()) != 0L) {
+ decrementBucketSize(bucket);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Take page from pages list cache [list=" + name + ",
bucket=" + bucket +
+ ", pageId=" + pageId + ']');
+ }
+
+ return pageId;
+ }
+
for (int lockAttempt = 0; ;) {
Stripe stripe = getPageForTake(bucket);
@@ -1151,7 +1345,7 @@ public abstract class PagesList extends DataStructure {
continue;
}
- long pageId = io.takeAnyPage(tailAddr);
+ pageId = io.takeAnyPage(tailAddr);
if (pageId != 0L) {
decrementBucketSize(bucket);
@@ -1229,6 +1423,11 @@ public abstract class PagesList extends DataStructure {
reuseList.addForRecycle(new SingletonReuseBag(recycleId));
}
+ if (log.isDebugEnabled()) {
+ log.debug("Take page from pages list [list=" + name + ",
bucket=" + bucket +
+ ", dataPageId=" + dataPageId + ", tailId=" + tailId +
']');
+ }
+
return dataPageId;
}
finally {
@@ -1303,9 +1502,38 @@ public abstract class PagesList extends DataStructure {
throws IgniteCheckedException {
final long pageId = dataIO.getFreeListPageId(dataAddr);
- assert pageId != 0;
+ if (pageId == 0L) { // Page cached in onheap list.
+ assert isCachingApplicable() : "pageId==0L, but caching is not
applicable for this pages list: " + name;
+
+ PagesCache pagesCache = getBucketCache(bucket, false);
+
+ // Pages cache can be null here if page was taken for put from
free list concurrently.
+ if (pagesCache == null || !pagesCache.removePage(dataId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Remove page from pages list cache failed
[list=" + name + ", bucket=" + bucket +
+ ", dataId=" + dataId + "]: " + ((pagesCache == null) ?
"cache is null" : "page not found"));
+ }
+
+ return false;
+ }
+
+ decrementBucketSize(bucket);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Remove page from pages list cache [list=" + name +
", bucket=" + bucket +
+ ", dataId=" + dataId + ']');
+ }
+
+ return true;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Remove page from pages list [list=" + name + ",
bucket=" + bucket + ", dataId=" + dataId +
+ ", pageId=" + pageId + ']');
+ }
final long page = acquirePage(pageId, statHolder);
+
try {
long nextId;
@@ -1640,6 +1868,196 @@ public abstract class PagesList extends DataStructure {
}
}
+ /** Class to store page-list cache onheap. */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public static class PagesCache {
+ /** Pages cache max size. */
+ private static final int MAX_SIZE =
+
IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_CACHING_MAX_CACHE_SIZE",
64);
+
+ /** Stripes count. Must be power of 2. */
+ private static final int STRIPES_COUNT =
+
IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_CACHING_STRIPES_COUNT", 4);
+
+ /** Threshold of flush calls on empty cache to allow GC of stripes
(flush invoked twice per checkpoint). */
+ private static final int EMPTY_FLUSH_GC_THRESHOLD =
+
IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_CACHING_EMPTY_FLUSH_GC_THRESHOLD",
10);
+
+ /** Mutexes for each stripe. */
+ private final Object[] stripeLocks = new Object[STRIPES_COUNT];
+
+ /** Page lists. */
+ private final GridLongList[] stripes = new GridLongList[STRIPES_COUNT];
+
+ /** Atomic updater for nextStripeIdx field. */
+ private static final AtomicIntegerFieldUpdater<PagesCache>
nextStripeUpdater = AtomicIntegerFieldUpdater
+ .newUpdater(PagesCache.class, "nextStripeIdx");
+
+ /** Atomic updater for size field. */
+ private static final AtomicIntegerFieldUpdater<PagesCache> sizeUpdater
= AtomicIntegerFieldUpdater
+ .newUpdater(PagesCache.class, "size");
+
+ /** Access counter to provide round-robin stripes polling. */
+ private volatile int nextStripeIdx;
+
+ /** Cache size. */
+ private volatile int size;
+
+ /** Count of flush calls with empty cache. */
+ private volatile int emptyFlushCnt;
+
+ /**
+ * Default constructor.
+ */
+ public PagesCache() {
+ assert U.isPow2(STRIPES_COUNT) : STRIPES_COUNT;
+
+ for (int i = 0; i < STRIPES_COUNT; i++)
+ stripeLocks[i] = new Object();
+ }
+
+ /**
+ * Remove page from the list.
+ *
+ * @param pageId Page id.
+ * @return {@code True} if page was found and succesfully removed,
{@code false} if page not found.
+ */
+ public boolean removePage(long pageId) {
+ int stripeIdx = (int)pageId & (STRIPES_COUNT - 1);
+
+ synchronized (stripeLocks[stripeIdx]) {
+ GridLongList stripe = stripes[stripeIdx];
+
+ boolean rmvd = stripe != null && stripe.removeValue(0, pageId)
>= 0;
+
+ if (rmvd)
+ sizeUpdater.decrementAndGet(this);
+
+ return rmvd;
+ }
+ }
+
+ /**
+ * Poll next page from the list.
+ *
+ * @return pageId.
+ */
+ public long poll() {
+ if (size == 0)
+ return 0L;
+
+ for (int i = 0; i < STRIPES_COUNT; i++) {
+ int stripeIdx = nextStripeUpdater.getAndIncrement(this) &
(STRIPES_COUNT - 1);
+
+ synchronized (stripeLocks[stripeIdx]) {
+ GridLongList stripe = stripes[stripeIdx];
+
+ if (stripe != null && !stripe.isEmpty()) {
+ sizeUpdater.decrementAndGet(this);
+
+ return stripe.remove();
+ }
+ }
+ }
+
+ return 0L;
+ }
+
+ /**
+ * Flush all stripes to one list and clear stripes.
+ */
+ @SuppressWarnings("NonAtomicOperationOnVolatileField")
+ public GridLongList flush() {
+ GridLongList res = null;
+
+ if (size == 0) {
+ boolean stripesChanged = false;
+
+ if (++emptyFlushCnt >= EMPTY_FLUSH_GC_THRESHOLD) {
+ for (int i = 0; i < STRIPES_COUNT; i++) {
+ synchronized (stripeLocks[i]) {
+ GridLongList stripe = stripes[i];
+
+ if (stripe != null) {
+ if (stripe.isEmpty())
+ stripes[i] = null;
+ else {
+ // Pages were concurrently added to the
stripe.
+ stripesChanged = true;
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (!stripesChanged)
+ return null;
+ }
+
+ emptyFlushCnt = 0;
+
+ for (int i = 0; i < STRIPES_COUNT; i++) {
+ synchronized (stripeLocks[i]) {
+ GridLongList stripe = stripes[i];
+
+ if (stripe != null && !stripe.isEmpty()) {
+ if (res == null)
+ res = new GridLongList(size);
+
+ sizeUpdater.addAndGet(this, -stripe.size());
+
+ res.addAll(stripe);
+
+ stripe.clear();
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Add pageId to the tail of the list.
+ *
+ * @param pageId Page id.
+ * @return {@code True} if page can be added, {@code false} if list is
full.
+ */
+ public synchronized boolean add(long pageId) {
+ assert pageId != 0L;
+
+ if (size >= MAX_SIZE)
+ return false;
+
+ int stripeIdx = (int)pageId & (STRIPES_COUNT - 1);
+
+ synchronized (stripeLocks[stripeIdx]) {
+ GridLongList stripe = stripes[stripeIdx];
+
+ if (stripe == null)
+ stripes[stripeIdx] = stripe = new GridLongList(MAX_SIZE /
STRIPES_COUNT);
+
+ if (stripe.size() >= MAX_SIZE / STRIPES_COUNT)
+ return false;
+ else {
+ stripe.add(pageId);
+
+ sizeUpdater.incrementAndGet(this);
+
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Cache size.
+ */
+ public int size() {
+ return size;
+ }
+ }
+
/**
*
*/
@@ -1676,5 +2094,10 @@ public abstract class PagesList extends DataStructure {
@Override public int hashCode() {
return Objects.hash(tailId, empty);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return Long.toString(tailId);
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 40a2a98..8db6d2c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
@@ -251,7 +252,8 @@ public class MetaStorage implements DbCheckpointListener,
ReadWriteMetastorage {
wal,
reuseListRoot.pageId().pageId(),
reuseListRoot.isAllocated(),
-
diagnosticMgr.pageLockTracker().createPageLockTracker(freeListName)
+
diagnosticMgr.pageLockTracker().createPageLockTracker(freeListName),
+ cctx.kernalContext()
) {
@Override protected long allocatePageNoReuse() throws
IgniteCheckedException {
return pageMem.allocatePage(grpId, partId, FLAG_DATA);
@@ -599,14 +601,14 @@ public class MetaStorage implements DbCheckpointListener,
ReadWriteMetastorage {
Executor executor = ctx.executor();
if (executor == null) {
- partStorage.saveMetadata();
+ partStorage.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
saveStoreMetadata();
}
else {
executor.execute(() -> {
try {
- partStorage.saveMetadata();
+ partStorage.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -626,7 +628,7 @@ public class MetaStorage implements DbCheckpointListener,
ReadWriteMetastorage {
/** {@inheritDoc} */
@Override public void beforeCheckpointBegin(Context ctx) throws
IgniteCheckedException {
- partStorage.saveMetadata();
+ partStorage.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorage.java
index 40aafae..da17fd1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorage.java
@@ -47,5 +47,5 @@ public interface PartitionMetaStorage<T extends Storable> {
/**
* Saves storage metadata.
*/
- public void saveMetadata() throws IgniteCheckedException;
+ public void saveMetadata(IoStatisticsHolder statHolder) throws
IgniteCheckedException;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorageImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorageImpl.java
index b6402ad..db1c796 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorageImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstorage/PartitionMetaStorageImpl.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.cache.persistence.partstorage;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.IncompleteObject;
@@ -56,9 +57,10 @@ public class PartitionMetaStorageImpl<T extends Storable>
extends AbstractFreeLi
IgniteWriteAheadLogManager wal,
long metaPageId,
boolean initNew,
- PageLockListener lsnr
+ PageLockListener lsnr,
+ GridKernalContext ctx
) throws IgniteCheckedException {
- super(cacheId, name, memMetrics, memPlc, reuseList, wal, metaPageId,
initNew, lsnr);
+ super(cacheId, name, memMetrics, memPlc, reuseList, wal, metaPageId,
initNew, lsnr, ctx);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/reuse/ReuseListImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/reuse/ReuseListImpl.java
index 34b5779..088a6ee 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/reuse/ReuseListImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/reuse/ReuseListImpl.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.cache.persistence.tree.reuse;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import
org.apache.ignite.internal.processors.cache.persistence.freelist.PagesList;
@@ -36,6 +37,9 @@ public class ReuseListImpl extends PagesList implements
ReuseList {
/** */
private volatile Stripe[] bucket;
+ /** Onheap pages cache. */
+ private final PagesCache bucketCache = new PagesCache();
+
/**
* @param cacheId Cache ID.
* @param name Name (for debug purpose).
@@ -52,7 +56,8 @@ public class ReuseListImpl extends PagesList implements
ReuseList {
IgniteWriteAheadLogManager wal,
long metaPageId,
boolean initNew,
- PageLockListener lockLsnr
+ PageLockListener lockLsnr,
+ GridKernalContext ctx
) throws IgniteCheckedException {
super(
cacheId,
@@ -61,7 +66,8 @@ public class ReuseListImpl extends PagesList implements
ReuseList {
1,
wal,
metaPageId,
- lockLsnr
+ lockLsnr,
+ ctx
);
reuseList = this;
@@ -97,11 +103,21 @@ public class ReuseListImpl extends PagesList implements
ReuseList {
}
/** {@inheritDoc} */
+ @Override protected int getBucketIndex(int freeSpace) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
@Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[]
upd) {
return bucketUpdater.compareAndSet(this, exp, upd);
}
/** {@inheritDoc} */
+ @Override protected PagesCache getBucketCache(int bucket, boolean create) {
+ return bucketCache;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return "ReuseList [name=" + name + ']';
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
index e83148b..100a959 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFreeListTest.java
@@ -32,11 +32,14 @@ import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -45,6 +48,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
@@ -56,6 +60,7 @@ import
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -211,9 +216,15 @@ public class CheckpointFreeListTest extends
GridCommonAbstractTest {
}
/**
+ * Note: Test assumes that PDS size didn't change between the first
checkpoint and after several node stops.
+ * It's not true anymore with free-list caching since the only final
free-list state is persisted on checkpoint.
+ * Some changed, but currently empty buckets are not persisted and PDS
size is smaller after the first checkpoint.
+ * Test makes sense only with disabled caching.
+ *
* @throws Exception if fail.
*/
@Test
+ @WithSystemProperty(key =
IgniteSystemProperties.IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING, value = "true")
public void testRestoreFreeListCorrectlyAfterRandomStop() throws Exception
{
IgniteEx ignite0 = startGrid(0);
ignite0.cluster().active(true);
@@ -298,6 +309,69 @@ public class CheckpointFreeListTest extends
GridCommonAbstractTest {
}
/**
+ * Test checks that free-list works and pages cache flushes correctly
under the high concurrent load.
+ */
+ @Test
+ public void testFreeListUnderLoadMultipleCheckpoints() throws Throwable {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ int minValSize = 64;
+ int maxValSize = 128;
+ int valsCnt = maxValSize - minValSize;
+ int keysCnt = 1_000;
+
+ byte[][] vals = new byte[valsCnt][];
+
+ for (int i = 0; i < valsCnt; i++)
+ vals[i] = new byte[minValSize + i];
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAffinity(new
RendezvousAffinityFunction().setPartitions(2)) // Maximize contention per
partition.
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC));
+
+ AtomicBoolean done = new AtomicBoolean();
+ AtomicReference<Throwable> error = new AtomicReference<>();
+
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ Random rnd = new Random();
+
+ try {
+ while (!done.get()) {
+ int key = rnd.nextInt(keysCnt);
+ byte[] val = vals[rnd.nextInt(valsCnt)];
+
+ // Put with changed value size - worst case for free list,
since row will be removed first and
+ // then inserted again.
+ cache.put(key, val);
+ }
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }, 20, "cache-put");
+
+ for (int i = 0; i < SF.applyLB(10, 2); i++) {
+ if (error.get() != null)
+ break;
+
+ forceCheckpoint(ignite);
+
+ doSleep(1_000L);
+ }
+
+ done.set(true);
+
+ fut.get();
+
+ stopAllGrids();
+
+ if (error.get() != null)
+ throw error.get();
+ }
+
+ /**
* @param entriesToPut Entiries to put.
* @param nodeStartBarrier Marker of node was started.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 3c76b47..59f6bc0 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -185,11 +185,14 @@ public abstract class
IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+ // We should have value size large enough to switch WAL segment by
ITRS/4 puts.
+ String valPrefix = "testValue" + new String(new
char[512]).replace('\0', '#');
+
for (int i = 0; i < ITRS; i++) {
while (!Thread.currentThread().isInterrupted()) {
try (Transaction tx = ig.transactions().txStart(
TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.REPEATABLE_READ)) {
- cache.put(i, "testValue" + i);
+ cache.put(i, valPrefix + i);
tx.commit();
@@ -240,7 +243,7 @@ public abstract class
IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
cache = grid0.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < ITRS; i++)
- assertEquals(cache.get(i), "testValue" + i);
+ assertEquals(cache.get(i), valPrefix + i);
}
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
index 5b7a38b..bc547ef 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -43,12 +44,14 @@ import
org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
@@ -708,8 +711,9 @@ public class WalRecoveryTxLogicalRecordsTest extends
GridCommonAbstractTest {
* @throws Exception If failed.
*/
private List<Integer> allocatedPages(Ignite ignite, String cacheName)
throws Exception {
- FilePageStoreManager storeMgr =
-
(FilePageStoreManager)((IgniteEx)ignite).context().cache().context().pageStore();
+ GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache();
+
+ FilePageStoreManager storeMgr =
(FilePageStoreManager)cacheProc.context().pageStore();
int parts = ignite.affinity(cacheName).partitions();
@@ -718,6 +722,22 @@ public class WalRecoveryTxLogicalRecordsTest extends
GridCommonAbstractTest {
for (int p = 0; p < parts; p++) {
PageStore store = storeMgr.getStore(CU.cacheId(cacheName), p);
+ cacheProc.context().database().checkpointReadLock();
+
+ try {
+ GridDhtLocalPartition part =
cacheProc.cache(cacheName).context().topology().localPartition(p);
+
+ if (part.dataStore().rowStore() != null) {
+ AbstractFreeList freeList =
(AbstractFreeList)part.dataStore().rowStore().freeList();
+
+ // Flush free-list onheap cache to page memory.
+ freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
+ }
+ }
+ finally {
+ cacheProc.context().database().checkpointReadUnlock();
+ }
+
store.sync();
res.add(store.pages());
@@ -971,10 +991,10 @@ public class WalRecoveryTxLogicalRecordsTest extends
GridCommonAbstractTest {
ids[i] = bucket[i].tailId;
}
-// AtomicIntegerArray cnts = GridTestUtils.getFieldValue(reuseList,
PagesList.class, "cnts");
-// assertEquals(1, cnts.length());
+ AtomicLong[] bucketsSize = GridTestUtils.getFieldValue(reuseList,
PagesList.class, "bucketsSize");
+ assertEquals(1, bucketsSize.length);
- return new T2<>(ids, 0);
+ return new T2<>(ids, (int)bucketsSize[0].get());
}
/**
@@ -1017,10 +1037,12 @@ public class WalRecoveryTxLogicalRecordsTest extends
GridCommonAbstractTest {
/**
* @param ignite Node.
* @param cacheName Cache name.
- * @return Cache free lists data.
+ * @return Cache free lists data (partition number to map of buckets to
tails and buckets size).
*/
- private Map<Integer, T2<Map<Integer, long[]>, int[]>>
getFreeListData(Ignite ignite, String cacheName) {
- GridCacheContext ctx =
((IgniteEx)ignite).context().cache().cache(cacheName).context();
+ private Map<Integer, T2<Map<Integer, long[]>, int[]>>
getFreeListData(Ignite ignite, String cacheName) throws IgniteCheckedException {
+ GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache();
+
+ GridCacheContext ctx = cacheProc.cache(cacheName).context();
List<GridDhtLocalPartition> parts = ctx.topology().localPartitions();
@@ -1032,37 +1054,44 @@ public class WalRecoveryTxLogicalRecordsTest extends
GridCommonAbstractTest {
boolean foundNonEmpty = false;
boolean foundTails = false;
- for (GridDhtLocalPartition part : parts) {
- AbstractFreeList freeList =
GridTestUtils.getFieldValue(part.dataStore(), "freeList");
+ cacheProc.context().database().checkpointReadLock();
- if (freeList == null)
- // Lazy store.
- continue;
+ try {
+ for (GridDhtLocalPartition part : parts) {
+ AbstractFreeList freeList =
(AbstractFreeList)part.dataStore().rowStore().freeList();
- AtomicReferenceArray<PagesList.Stripe[]> buckets =
GridTestUtils.getFieldValue(freeList,
- AbstractFreeList.class, "buckets");
- //AtomicIntegerArray cnts = GridTestUtils.getFieldValue(freeList,
PagesList.class, "cnts");
+ if (freeList == null)
+ // Lazy store.
+ continue;
- assertNotNull(buckets);
- //assertNotNull(cnts);
- assertTrue(buckets.length() > 0);
- //assertEquals(cnts.length(), buckets.length());
+ // Flush free-list onheap cache to page memory.
+ freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
- Map<Integer, long[]> tailsPerBucket = new HashMap<>();
+ AtomicReferenceArray<PagesList.Stripe[]> buckets =
GridTestUtils.getFieldValue(freeList,
+ AbstractFreeList.class, "buckets");
- for (int i = 0; i < buckets.length(); i++) {
- PagesList.Stripe[] tails = buckets.get(i);
+ AtomicLong[] bucketsSize =
GridTestUtils.getFieldValue(freeList, PagesList.class, "bucketsSize");
- long ids[] = null;
+ assertNotNull(buckets);
+ assertNotNull(bucketsSize);
+ assertTrue(buckets.length() > 0);
+ assertEquals(bucketsSize.length, buckets.length());
- if (tails != null) {
- ids = new long[tails.length];
+ Map<Integer, long[]> tailsPerBucket = new HashMap<>();
- for (int j = 0; j < tails.length; j++)
- ids[j] = tails[j].tailId;
- }
+ for (int i = 0; i < buckets.length(); i++) {
+ PagesList.Stripe[] tails = buckets.get(i);
- tailsPerBucket.put(i, ids);
+ long ids[] = null;
+
+ if (tails != null) {
+ ids = new long[tails.length];
+
+ for (int j = 0; j < tails.length; j++)
+ ids[j] = tails[j].tailId;
+ }
+
+ tailsPerBucket.put(i, ids);
if (tails != null) {
assertTrue(tails.length > 0);
@@ -1071,19 +1100,23 @@ public class WalRecoveryTxLogicalRecordsTest extends
GridCommonAbstractTest {
}
}
-// int[] cntsPerBucket = new int[cnts.length()];
-//
-// for (int i = 0; i < cnts.length(); i++) {
-// cntsPerBucket[i] = cnts.get(i);
-//
-// if (cntsPerBucket[i] > 0)
-// foundNonEmpty = true;
-// }
+ int[] cntsPerBucket = new int[bucketsSize.length];
+
+ for (int i = 0; i < bucketsSize.length; i++) {
+ cntsPerBucket[i] = (int)bucketsSize[i].get();
- res.put(part.id(), new T2<>(tailsPerBucket, (int[])null));
+ if (cntsPerBucket[i] > 0)
+ foundNonEmpty = true;
+ }
+
+ res.put(part.id(), new T2<>(tailsPerBucket, cntsPerBucket));
+ }
+ }
+ finally {
+ cacheProc.context().database().checkpointReadUnlock();
}
- //assertTrue(foundNonEmpty);
+ assertTrue(foundNonEmpty);
assertTrue(foundTails);
return res;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListCachingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListCachingTest.java
new file mode 100644
index 0000000..6e97d89
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListCachingTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test onheap caching of freelists.
+ */
+public class FreeListCachingTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(300L * 1024 * 1024)
+ ));
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testFreeListCaching() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ int partCnt = 10;
+
+ GridCacheProcessor cacheProc = ignite.context().cache();
+ GridCacheDatabaseSharedManager dbMgr =
(GridCacheDatabaseSharedManager)cacheProc.context().database();
+
+ dbMgr.enableCheckpoints(false).get();
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAffinity(new
RendezvousAffinityFunction().setPartitions(partCnt))
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC));
+
+ GridCacheOffheapManager offheap =
(GridCacheOffheapManager)cacheProc.cache(DEFAULT_CACHE_NAME).context().group()
+ .offheap();
+
+ for (int i = 0; i < 5_000; i++) {
+ for (int p = 0; p < partCnt; p++) {
+ Integer key = i * partCnt + p;
+ cache.put(key, new byte[i + 1]);
+ cache.remove(key);
+ }
+ }
+
+ offheap.cacheDataStores().forEach(cacheData -> {
+ PagesList list = (PagesList)cacheData.rowStore().freeList();
+
+ AtomicLong[] bucketsSize = list.bucketsSize;
+
+ // All buckets except reuse bucket must be empty after puts and
removes of the same key.
+ for (int i = 0; i < bucketsSize.length; i++) {
+ if (list.isReuseBucket(i))
+ assertTrue(bucketsSize[i].get() > 0);
+ else
+ assertEquals(0, bucketsSize[i].get());
+ }
+ });
+
+ for (int i = 0; i < 100; i++) {
+ for (int p = 0; p < partCnt; p++)
+ cache.put(i * partCnt + p, new byte[(i + p) * 10]);
+ }
+
+ for (int i = 0; i < 50; i += 2) {
+ for (int p = 0; p < partCnt; p++)
+ cache.remove(i * partCnt + p);
+ }
+
+ Map<Integer, List<Long>> partsBucketsSize = new HashMap<>();
+
+ offheap.cacheDataStores().forEach(cacheData -> {
+ PagesList list = (PagesList)cacheData.rowStore().freeList();
+
+ AtomicLong[] bucketsSize = list.bucketsSize;
+
+ List<Long> bucketsSizeList = new ArrayList<>(bucketsSize.length);
+
+ partsBucketsSize.put(cacheData.partId(), bucketsSizeList);
+
+ long notReuseSize = 0;
+
+ for (int i = 0; i < bucketsSize.length; i++) {
+ bucketsSizeList.add(bucketsSize[i].get());
+
+ PagesList.Stripe[] bucket = list.getBucket(i);
+
+ // All buckets are expected to be cached onheap except reuse
bucket, since reuse bucket is also used
+ // by indexes bypassing caching.
+ if (!list.isReuseBucket(i)) {
+ notReuseSize += bucketsSize[i].get();
+
+ assertNull("Expected null bucket [partId=" +
cacheData.partId() + ", i=" + i + ", bucket=" +
+ bucket + ']', bucket);
+
+ PagesList.PagesCache pagesCache = list.getBucketCache(i,
false);
+
+ assertEquals("Wrong pages cache size [partId=" +
cacheData.partId() + ", i=" + i + ']',
+ bucketsSize[i].get(), pagesCache == null ? 0 :
pagesCache.size());
+ }
+ }
+
+ assertTrue(notReuseSize > 0);
+ });
+
+ dbMgr.enableCheckpoints(true).get();
+
+ forceCheckpoint(ignite);
+
+ offheap.cacheDataStores().forEach(cacheData -> {
+ PagesList list = (PagesList)cacheData.rowStore().freeList();
+
+ AtomicLong[] bucketsSize = list.bucketsSize;
+
+ for (int i = 0; i < bucketsSize.length; i++) {
+ long bucketSize = bucketsSize[i].get();
+
+ PagesList.Stripe[] bucket = list.getBucket(i);
+
+ // After checkpoint all buckets must flush onheap cache to
page memory.
+ if (bucketSize > 0) {
+ assertNotNull("Expected not null bucket [partId=" +
cacheData.partId() + ", i=" + i + ']',
+ bucket);
+ }
+
+ PagesList.PagesCache pagesCache = list.getBucketCache(i,
false);
+
+ assertEquals("Wrong pages cache size [partId=" +
cacheData.partId() + ", i=" + i + ']',
+ 0, pagesCache == null ? 0 : pagesCache.size());
+
+ assertEquals("Bucket size changed after checkpoint [partId=" +
cacheData.partId() + ", i=" + i + ']',
+ (long)partsBucketsSize.get(cacheData.partId()).get(i),
bucketSize);
+ }
+ });
+
+ dbMgr.enableCheckpoints(false).get();
+
+ for (int i = 0; i < 50; i++) {
+ for (int p = 0; p < partCnt; p++)
+ cache.put(i * partCnt + p, new byte[(i + p) * 10]);
+ }
+
+ offheap.cacheDataStores().forEach(cacheData -> {
+ PagesList list = (PagesList)cacheData.rowStore().freeList();
+
+ int totalCacheSize = 0;
+
+ for (int i = 0; i < list.bucketsSize.length; i++) {
+ PagesList.PagesCache pagesCache = list.getBucketCache(i,
false);
+
+ totalCacheSize += pagesCache == null ? 0 : pagesCache.size();
+ }
+
+ assertTrue("Some buckets should be cached [partId=" +
cacheData.partId() + ']', totalCacheSize > 0);
+ });
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
index e8ba160..2c7d514 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.database;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
import
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import static org.apache.ignite.internal.pagemem.PageIdUtils.effectivePageId;
@@ -46,7 +48,8 @@ public class BPlusTreeReuseSelfTest extends BPlusTreeSelfTest
{
pageMem,
null,
rootId,
- initNew
+ initNew,
+ new GridTestKernalContext(log)
);
}
@@ -77,9 +80,10 @@ public class BPlusTreeReuseSelfTest extends
BPlusTreeSelfTest {
PageMemory pageMem,
IgniteWriteAheadLogManager wal,
long metaPageId,
- boolean initNew
+ boolean initNew,
+ GridKernalContext ctx
) throws IgniteCheckedException {
- super(cacheId, name, pageMem, wal, metaPageId, initNew, new
TestPageLockListener());
+ super(cacheId, name, pageMem, wal, metaPageId, initNew, new
TestPageLockListener(), ctx);
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
index b9957e1..e4a8c9b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
@@ -528,7 +528,8 @@ public class CacheFreeListSelfTest extends
GridCommonAbstractTest {
null,
metaPageId,
true,
- null
+ null,
+ new GridTestKernalContext(log)
);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 15b32d8..3cdfff2 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -76,6 +76,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.Ignite
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteStandaloneWalIteratorInvalidCrcTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteWithoutArchiverWalIteratorInvalidCrcTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
+import
org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListCachingTest;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.FilteredWalIteratorTest;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneWalRecordsIteratorTest;
import
org.apache.ignite.internal.processors.cache.persistence.wal.scanner.WalScannerTest;
@@ -215,6 +216,8 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, CheckpointFreeListTest.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, FreeListCachingTest.class,
ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite,
IgniteWalIteratorSwitchSegmentTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteWalIteratorExceptionDuringReadTest.class, ignoredTests);