Repository: ignite Updated Branches: refs/heads/ignite-2.2 bdaeecca9 -> 42c336827
IGNITE-6204 Backport optimizations of checkpointing algorithm into 2.2 (cherry picked from commit 737874f) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ad99680 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ad99680 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ad99680 Branch: refs/heads/ignite-2.2 Commit: 8ad996802c8a9898fd22fdf74bd1c97120f985ed Parents: bdaeecc Author: Ivan Rakov <[email protected]> Authored: Tue Aug 29 13:24:20 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Sep 15 11:26:45 2017 +0300 ---------------------------------------------------------------------- .../configuration/CheckpointWriteOrder.java | 33 ++++++ .../PersistentStoreConfiguration.java | 31 ++++- .../internal/pagemem/store/PageStore.java | 5 + .../GridCacheDatabaseSharedManager.java | 106 +++++++++++++---- .../cache/persistence/file/FilePageStore.java | 56 +++++---- .../persistence/file/FilePageStoreFactory.java | 35 ++++++ .../persistence/file/FilePageStoreManager.java | 17 +-- .../cache/persistence/file/FilePageStoreV2.java | 53 +++++++++ .../file/FileVersionCheckingFactory.java | 116 +++++++++++++++++++ ...gnitePdsRecoveryAfterFileCorruptionTest.java | 2 +- ...nitePersistenceSequentialCheckpointTest.java | 44 +++++++ .../IgnitePersistentStoreCacheGroupsTest.java | 31 ++--- 12 files changed, 461 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java new file mode 100644 index 0000000..31feaf6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.configuration; + +/** + * This enum defines order of writing pages to disk storage during checkpoint. + */ +public enum CheckpointWriteOrder { + /** + * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable). + */ + RANDOM, + + /** + * All checkpoint pages are collected into single list and sorted by page index. + * Provides almost sequential disk writes, which can be much faster on some SSD models. + */ + SEQUENTIAL +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index e8a0ff4..888bf42 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -16,12 +16,11 @@ */ package org.apache.ignite.configuration; +import java.io.Serializable; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.S; -import java.io.Serializable; - /** * Configures Apache Ignite Persistent store. */ @@ -45,7 +44,10 @@ public class PersistentStoreConfiguration implements Serializable { public static final int DFLT_RATE_TIME_INTERVAL_MILLIS = 60_000; /** Default number of checkpointing threads. */ - public static final int DFLT_CHECKPOINTING_THREADS = 1; + public static final int DFLT_CHECKPOINTING_THREADS = 4; + + /** Default checkpoint write order. */ + public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.SEQUENTIAL; /** Default number of checkpoints to be kept in WAL after checkpoint is finished */ public static final int DFLT_WAL_HISTORY_SIZE = 20; @@ -95,6 +97,9 @@ public class PersistentStoreConfiguration implements Serializable { /** */ private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS; + /** Checkpoint write order. */ + private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER; + /** Number of checkpoints to keep */ private int walHistSize = DFLT_WAL_HISTORY_SIZE; @@ -587,6 +592,26 @@ public class PersistentStoreConfiguration implements Serializable { return walAutoArchiveAfterInactivity; } + /** + * This property defines order of writing pages to disk storage during checkpoint. + * + * @return Checkpoint write order. + */ + public CheckpointWriteOrder getCheckpointWriteOrder() { + return checkpointWriteOrder; + } + + /** + * This property defines order of writing pages to disk storage during checkpoint. + * + * @param checkpointWriteOrder Checkpoint write order. + */ + public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder) { + this.checkpointWriteOrder = checkpointWriteOrder; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(PersistentStoreConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java index 4698a6b..f6e577c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java @@ -95,4 +95,9 @@ public interface PageStore { * @throws IgniteCheckedException If sync failed (IO error occurred). */ public void ensure() throws IgniteCheckedException; + + /** + * @return Page store version. + */ + public int version(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d147f36..351ec71 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -49,7 +49,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -65,6 +64,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.PersistenceMetrics; +import org.apache.ignite.configuration.CheckpointWriteOrder; import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; @@ -137,6 +137,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.mxbean.PersistenceMetricsMXBean; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -384,11 +385,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize(); if (persistenceCfg.getCheckpointingThreads() > 1) - asyncRunner = new ThreadPoolExecutor( + asyncRunner = new IgniteThreadPoolExecutor( + "checkpoint-runner", + cctx.igniteInstanceName(), persistenceCfg.getCheckpointingThreads(), persistenceCfg.getCheckpointingThreads(), - 30L, - TimeUnit.SECONDS, + 30_000, new LinkedBlockingQueue<Runnable>() ); @@ -1301,8 +1303,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan boolean apply = status.needRestoreMemory(); if (apply) { - U.quietAndWarn(log, "Ignite node crashed in the middle of checkpoint. Will restore memory state and " + - "enforce checkpoint on node start."); + U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " + + "finish checkpoint on node start."); cctx.pageStore().beginRecover(); } @@ -2082,12 +2084,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan WALPointer cpPtr = null; - GridMultiCollectionWrapper<FullPageId> cpPages; - final CheckpointProgress curr; + IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple; + tracker.onLockWaitStart(); + boolean hasPages; + checkpointLock.writeLock().lock(); try { @@ -2150,19 +2154,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (curr.nextSnapshot) snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); - IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints(); - - // Todo it maybe more optimally - Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2()); - - for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) { - for (int i = 0; i < col.collectionsSize(); i++) - cpPagesList.addAll(col.innerCollection(i)); - } + cpPagesTuple = beginAllCheckpoints(); - cpPages = new GridMultiCollectionWrapper<>(cpPagesList); + hasPages = hasPageForWrite(cpPagesTuple.get1()); - if (!F.isEmpty(cpPages)) { + if (hasPages) { // No page updates for this checkpoint are allowed from now on. cpPtr = cctx.wal().log(cpRec); @@ -2178,7 +2174,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan curr.cpBeginFut.onDone(); - if (!F.isEmpty(cpPages)) { + if (hasPages) { assert cpPtr != null; // Sync log outside the checkpoint write lock. @@ -2196,6 +2192,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointHist.addCheckpointEntry(cpEntry); + GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple); + if (printCheckpointStats) if (log.isInfoEnabled()) log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " + @@ -2227,6 +2225,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Check that at least one collection is not empty. + * + * @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages. + */ + private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) { + boolean hasPages = false; + + for (Collection c : cpPagesCollWrapper) + if (!c.isEmpty()) { + hasPages = true; + + break; + } + + return hasPages; + } + + /** * @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty pages. */ private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() { @@ -2293,6 +2309,56 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } + /** + * Reorders list of checkpoint pages and splits them into needed number of sublists according to + * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and + * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}. + * + * @param cpPagesTuple Checkpoint pages tuple. + */ + private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded( + IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple + ) { + List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2()); + + for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) { + for (int i = 0; i < col.collectionsSize(); i++) + cpPagesList.addAll(col.innerCollection(i)); + } + + if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) { + Collections.sort(cpPagesList, new Comparator<FullPageId>() { + @Override public int compare(FullPageId o1, FullPageId o2) { + int cmp = Long.compare(o1.groupId(), o2.groupId()); + if (cmp != 0) + return cmp; + + return Long.compare(PageIdUtils.effectivePageId(o1.pageId()), + PageIdUtils.effectivePageId(o2.pageId())); + } + }); + } + + int cpThreads = persistenceCfg.getCheckpointingThreads(); + + int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4; + // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads. + + Collection[] pagesSubListArr = new Collection[pagesSubLists]; + + for (int i = 0; i < pagesSubLists; i++) { + int totalSize = cpPagesList.size(); + + int from = totalSize * i / (pagesSubLists); + + int to = totalSize * (i + 1) / (pagesSubLists); + + pagesSubListArr[i] = cpPagesList.subList(from, to); + } + + return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr); + } + /** Pages write task */ private class WriteCheckpointPages implements Runnable { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index a7ca13c..e6c5379 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -45,10 +45,10 @@ public class FilePageStore implements PageStore { private static final long SIGNATURE = 0xF19AC4FE60C530B8L; /** File version. */ - private static final int VERSION = 1; + public static final int VERSION = 1; /** Allocated field offset. */ - public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/; + static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/; /** */ private final File cfgFile; @@ -57,7 +57,7 @@ public class FilePageStore implements PageStore { private final byte type; /** Database configuration. */ - private final MemoryConfiguration dbCfg; + protected final MemoryConfiguration dbCfg; /** Factory to provide I/O interfaces for read/write operations with files */ private final FileIOFactory ioFactory; @@ -103,20 +103,36 @@ public class FilePageStore implements PageStore { /** {@inheritDoc} */ @Override public boolean exists() { - return cfgFile.exists() && cfgFile.length() > HEADER_SIZE; + return cfgFile.exists() && cfgFile.length() > headerSize(); } /** + * Size of page store header. + */ + public int headerSize() { + return HEADER_SIZE; + } + + /** + * Page store version. + */ + public int version() { + return VERSION; + } + + /** + * Creates header for current version file store. Doesn't init the store. + * * @param type Type. * @param pageSize Page size. * @return Byte buffer instance. */ - public static ByteBuffer header(byte type, int pageSize) { - ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + public ByteBuffer header(byte type, int pageSize) { + ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN); hdr.putLong(SIGNATURE); - hdr.putInt(VERSION); + hdr.putInt(version()); hdr.put(type); @@ -142,7 +158,7 @@ public class FilePageStore implements PageStore { } //there is 'super' page in every file - return HEADER_SIZE + dbCfg.getPageSize(); + return headerSize() + dbCfg.getPageSize(); } /** @@ -150,7 +166,7 @@ public class FilePageStore implements PageStore { */ private long checkFile() throws IgniteCheckedException { try { - ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN); while (hdr.remaining() > 0) fileIO.read(hdr); @@ -166,9 +182,9 @@ public class FilePageStore implements PageStore { int ver = hdr.getInt(); - if (VERSION != ver) + if (version() != ver) throw new IgniteCheckedException("Failed to verify store file (invalid file version)" + - " [expectedVersion=" + VERSION + + " [expectedVersion=" + version() + ", fileVersion=" + ver + "]"); byte type = hdr.get(); @@ -187,10 +203,10 @@ public class FilePageStore implements PageStore { long fileSize = cfgFile.length(); - if (fileSize == HEADER_SIZE) // Every file has a special meta page. - fileSize = pageSize + HEADER_SIZE; + if (fileSize == headerSize()) // Every file has a special meta page. + fileSize = pageSize + headerSize(); - if ((fileSize - HEADER_SIZE) % pageSize != 0) + if ((fileSize - headerSize()) % pageSize != 0) throw new IgniteCheckedException("Failed to verify store file (invalid file size)" + " [fileSize=" + U.hexLong(fileSize) + ", pageSize=" + U.hexLong(pageSize) + ']'); @@ -346,9 +362,9 @@ public class FilePageStore implements PageStore { init(); try { - assert buf.remaining() == HEADER_SIZE; + assert buf.remaining() == headerSize(); - int len = HEADER_SIZE; + int len = headerSize(); long off = 0; @@ -425,7 +441,7 @@ public class FilePageStore implements PageStore { long off = pageOffset(pageId); - assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE) || recover : + assert (off >= 0 && off + pageSize <= allocated.get() + headerSize()) || recover : "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId); assert pageBuf.capacity() == pageSize; @@ -463,7 +479,7 @@ public class FilePageStore implements PageStore { /** {@inheritDoc} */ @Override public long pageOffset(long pageId) { - return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE; + return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize(); } /** {@inheritDoc} */ @@ -494,7 +510,7 @@ public class FilePageStore implements PageStore { long off = allocPage(); - return off / pageSize; + return (off - headerSize()) / pageSize; } /** @@ -519,6 +535,6 @@ public class FilePageStore implements PageStore { if (!inited) return 0; - return (int)(allocated.get() / pageSize); + return (int)(allocated.get() - headerSize()) / pageSize; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java new file mode 100644 index 0000000..d97ab26 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java @@ -0,0 +1,35 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; + +/** + * + */ +public interface FilePageStoreFactory { + /** + * Creates instance of FilePageStore based on given file. + * + * @param type Data type, can be {@link PageIdAllocator#FLAG_IDX} or {@link PageIdAllocator#FLAG_DATA}. + * @param file File Page store file. + */ + public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index e2ad070..0041ea6 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -365,21 +365,16 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen if (dirExisted && !idxFile.exists()) grpsWithoutIdx.add(grpDesc.groupId()); - FilePageStore idxStore = new FilePageStore( - PageMemory.FLAG_IDX, - idxFile, - pstCfg.getFileIOFactory(), - cctx.kernalContext().config().getMemoryConfiguration()); + FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory( + pstCfg.getFileIOFactory(), igniteCfg.getMemoryConfiguration()); + + FilePageStore idxStore = pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile); FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()]; for (int partId = 0; partId < partStores.length; partId++) { - FilePageStore partStore = new FilePageStore( - PageMemory.FLAG_DATA, - new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)), - pstCfg.getFileIOFactory(), - cctx.kernalContext().config().getMemoryConfiguration() - ); + FilePageStore partStore = pageStoreFactory.createPageStore( + PageMemory.FLAG_DATA, new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId))); partStores[partId] = partStore; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java new file mode 100644 index 0000000..5d044ec --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import org.apache.ignite.configuration.MemoryConfiguration; + +/** + * + */ +public class FilePageStoreV2 extends FilePageStore { + /** File version. */ + public static final int VERSION = 2; + + /** Header size. */ + private final int hdrSize; + + /** + * @param type Type. + * @param file File. + * @param factory Factory. + * @param cfg Config. + */ + public FilePageStoreV2(byte type, File file, FileIOFactory factory, MemoryConfiguration cfg) { + super(type, file, factory, cfg); + + hdrSize = cfg.getPageSize(); + } + + /** {@inheritDoc} */ + @Override public int headerSize() { + return hdrSize; + } + + /** {@inheritDoc} */ + @Override public int version() { + return VERSION; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java new file mode 100644 index 0000000..53bd802 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java @@ -0,0 +1,116 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.MemoryConfiguration; + +/** + * Checks version in files if it's present on the disk, creates store with latest version otherwise. + */ +public class FileVersionCheckingFactory implements FilePageStoreFactory { + /** Property to override latest version. Should be used only in tests. */ + public static final String LATEST_VERSION_OVERRIDE_PROPERTY = "file.page.store.latest.version.override"; + + /** Latest page store version. */ + public final static int LATEST_VERSION = 2; + + /** Factory to provide I/O interfaces for read/write operations with files. */ + private final FileIOFactory fileIOFactory; + + /** Memory configuration. */ + private final MemoryConfiguration memCfg; + + /** + * @param fileIOFactory File io factory. + * @param memCfg Memory configuration. + */ + public FileVersionCheckingFactory( + FileIOFactory fileIOFactory, MemoryConfiguration memCfg) { + this.fileIOFactory = fileIOFactory; + this.memCfg = memCfg; + } + + /** {@inheritDoc} */ + @Override public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException { + if (!file.exists()) + return createPageStore(type, file, latestVersion()); + + try (FileIO fileIO = fileIOFactory.create(file, "r")) { + int minHdr = FilePageStore.HEADER_SIZE; + + if (fileIO.size() < minHdr) + return createPageStore(type, file, latestVersion()); + + ByteBuffer hdr = ByteBuffer.allocate(minHdr).order(ByteOrder.LITTLE_ENDIAN); + + while (hdr.remaining() > 0) + fileIO.read(hdr); + + hdr.rewind(); + + hdr.getLong(); // Read signature + + int ver = hdr.getInt(); + + return createPageStore(type, file, ver); + } + catch (IOException e) { + throw new IgniteCheckedException("Error while creating file page store [file=" + file + "]:", e); + } + } + + /** + * Resolves latest page store version. + */ + public int latestVersion() { + int latestVer = LATEST_VERSION; + + try { + latestVer = Integer.parseInt(System.getProperty(LATEST_VERSION_OVERRIDE_PROPERTY)); + } catch (NumberFormatException e) { + // No override. + } + + return latestVer; + } + + /** + * Instantiates specific version of FilePageStore. + * + * @param type Type. + * @param file File. + * @param ver Version. + */ + public FilePageStore createPageStore(byte type, File file, int ver) throws IgniteCheckedException { + switch (ver) { + case FilePageStore.VERSION: + return new FilePageStore(type, file, fileIOFactory, memCfg); + + case FilePageStoreV2.VERSION: + return new FilePageStoreV2(type, file, fileIOFactory, memCfg); + + default: + throw new IllegalArgumentException("Unknown version of file page store: " + ver); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java index c248c35..11d5eef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java @@ -194,7 +194,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract long size = fileIO.size(); - fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE); + fileIO.write(ByteBuffer.allocate((int)size - filePageStore.headerSize()), filePageStore.headerSize()); fileIO.force(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java new file mode 100644 index 0000000..9295000 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java @@ -0,0 +1,44 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.configuration.CheckpointWriteOrder; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; + +/** + * + */ +public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointingThreads(4) + .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int entriesCount() { + return 1000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java index a945c73..b39b8cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java @@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest MemoryConfiguration memCfg = new MemoryConfiguration(); memCfg.setPageSize(1024); - memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024); + memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024); cfg.setMemoryConfiguration(memCfg); @@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest super.afterTest(); } + /** Entries count. */ + protected int entriesCount() { + return 10; + } + /** * @throws Exception If failed. */ @@ -236,7 +241,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) cache.put(i, cacheName + i); } @@ -253,10 +258,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(cacheName + i, cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } // Wait for expiration. @@ -340,7 +345,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) cache.put(i, new Person("" + i, cacheName)); } } @@ -353,10 +358,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(new Person("" + i, cacheName), cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } } @@ -373,10 +378,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest List<Cache.Entry<Integer, Person>> persons = cache.query(qry.setArgs(cacheName)).getAll(); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(new Person("" + i, cacheName), persons.get(i).getValue()); - assertEquals(10, persons.size()); + assertEquals(entriesCount(), persons.size()); } } @@ -413,13 +418,13 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < entriesCount(); i++) { cache.put(i, cacheName + i); assertEquals(cacheName + i, cache.get(i)); } - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } stopAllGrids(); @@ -433,10 +438,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(cacheName + i, cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } }
