http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index fb4ec1e..ce01431 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -70,8 +70,6 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.EventType; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.GridKernalContext; @@ -82,9 +80,8 @@ import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.DirectMemoryRegion; -import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; -import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; @@ -96,19 +93,21 @@ import org.apache.ignite.internal.pagemem.wal.record.CacheState; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; @@ -125,17 +124,16 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; +import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; -import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; -import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -167,11 +165,9 @@ import static java.nio.file.StandardOpenOption.READ; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; -import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; -import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID; import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize; /** @@ -182,6 +178,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC"; + /** */ + public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP"; + /** MemoryPolicyConfiguration name reserved for meta store. */ public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc"; @@ -199,6 +198,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private final String throttlingPolicyOverride = IgniteSystemProperties.getString( IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED); + /** */ + private final boolean skipCheckpointOnNodeStop = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); + /** Checkpoint lock hold count. */ private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { @@ -454,6 +456,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cfg.setInitialSize(storageCfg.getSystemRegionInitialSize()); cfg.setMaxSize(storageCfg.getSystemRegionMaxSize()); cfg.setPersistenceEnabled(true); + return cfg; } @@ -496,15 +499,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan .resolveFolders() .getLockedFileLockHolder(); - fileLockHolder = preLocked == null ? - new FileLockHolder(storeMgr.workDir().getPath(), kernalCtx, log) : preLocked; - - if (log.isDebugEnabled()) - log.debug("Try to capture file lock [nodeId=" + - cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]"); - - if (!fileLockHolder.isLocked()) - fileLockHolder.tryLock(lockWaitTime); + acquireFileLock(preLocked); cleanupTempCheckpointDirectory(); @@ -532,17 +527,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override public void cleanupRestoredCaches() { - if (dataRegionMap == null) + if (dataRegionMap.isEmpty()) return; for (CacheGroupDescriptor grpDesc : cctx.cache().cacheGroupDescriptors().values()) { String regionName = grpDesc.config().getDataRegionName(); - DataRegion region = dataRegionMap.get(regionName == null ? DFLT_DATA_REG_DEFAULT_NAME : regionName); + DataRegion region = regionName != null ? dataRegionMap.get(regionName) : dfltDataRegion; if (region == null) continue; + if (log.isInfoEnabled()) + log.info("Page memory " + region + " for " + grpDesc + " has invalidated."); + int partitions = grpDesc.config().getAffinity().partitions(); if (region.pageMemory() instanceof PageMemoryEx) { @@ -550,6 +548,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (int partId = 0; partId < partitions; partId++) memEx.invalidate(grpDesc.groupId(), partId); + + memEx.invalidate(grpDesc.groupId(), PageIdAllocator.INDEX_PARTITION); } } @@ -579,6 +579,39 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * @param preLocked Pre-locked file lock holder. + */ + private void acquireFileLock(FileLockHolder preLocked) throws IgniteCheckedException { + if (cctx.kernalContext().clientNode()) + return; + + fileLockHolder = preLocked == null ? + new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log) : preLocked; + + if (!fileLockHolder.isLocked()) { + if (log.isDebugEnabled()) + log.debug("Try to capture file lock [nodeId=" + + cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]"); + + fileLockHolder.tryLock(lockWaitTime); + } + } + + /** + * + */ + private void releaseFileLock() { + if (cctx.kernalContext().clientNode() || fileLockHolder == null) + return; + + if (log.isDebugEnabled()) + log.debug("Release file lock [nodeId=" + + cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]"); + + fileLockHolder.close(); + } + + /** * Retreives checkpoint history form specified {@code dir}. * * @return List of checkpoints. @@ -661,40 +694,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private void readMetastore() throws IgniteCheckedException { try { - DataStorageConfiguration memCfg = cctx.kernalContext().config().getDataStorageConfiguration(); - - DataRegionConfiguration plcCfg = createMetastoreDataRegionConfig(memCfg); - - File allocPath = buildAllocPath(plcCfg); - - DirectMemoryProvider memProvider = allocPath == null ? - new UnsafeMemoryProvider(log) : - new MappedFileMemoryProvider( - log, - allocPath); - - DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(plcCfg); - - PageMemoryEx storePageMem = (PageMemoryEx)createPageMemory(memProvider, memCfg, plcCfg, memMetrics, false); - - DataRegion regCfg = new DataRegion(storePageMem, plcCfg, memMetrics, createPageEvictionTracker(plcCfg, storePageMem)); - CheckpointStatus status = readCheckpointStatus(); - cctx.pageStore().initializeForMetastorage(); - - storePageMem.start(); - checkpointReadLock(); try { - restoreMemory(status, true, storePageMem, Collections.emptySet()); + dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().start(); - metaStorage = new MetaStorage(cctx, regCfg, memMetrics, true); + performBinaryMemoryRestore(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false); - metaStorage.init(this); + metaStorage = createMetastorage(true); - applyLastUpdates(status, true); + applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false); fillWalDisabledGroups(); @@ -703,7 +714,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan finally { metaStorage = null; - storePageMem.stop(true); + dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().stop(false); cctx.pageStore().cleanupPageStoreIfMatch(new Predicate<Integer>() { @Override public boolean test(Integer grpId) { @@ -729,21 +740,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan snapshotMgr = cctx.snapshot(); - if (!cctx.kernalContext().clientNode() && persistenceCfg.getCheckpointThreads() > 1) { - asyncRunner = new IgniteThreadPoolExecutor( - CHECKPOINT_RUNNER_THREAD_PREFIX, - cctx.igniteInstanceName(), - persistenceCfg.getCheckpointThreads(), - persistenceCfg.getCheckpointThreads(), - 30_000, - new LinkedBlockingQueue<>() - ); - } - - if (checkpointer == null) + if (!cctx.kernalContext().clientNode() && checkpointer == null) checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log); super.onActivate(ctx); + + if (!cctx.kernalContext().clientNode()) { + initializeCheckpointPool(); + + finishRecovery(); + } } /** {@inheritDoc} */ @@ -760,6 +766,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan stopping = false; } + /** + * + */ + private void initializeCheckpointPool() { + if (persistenceCfg.getCheckpointThreads() > 1) + asyncRunner = new IgniteThreadPoolExecutor( + CHECKPOINT_RUNNER_THREAD_PREFIX, + cctx.igniteInstanceName(), + persistenceCfg.getCheckpointThreads(), + persistenceCfg.getCheckpointThreads(), + 30_000, + new LinkedBlockingQueue<Runnable>() + ); + } + /** {@inheritDoc} */ @Override protected void registerMetricsMBeans(IgniteConfiguration cfg) { super.registerMetricsMBeans(cfg); @@ -798,8 +819,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan }; } - /** {@inheritDoc} */ - @Override public void onDoneRestoreBinaryMemory() throws IgniteCheckedException { + /** + * Restores last valid WAL pointer and resumes logging from that pointer. + * Re-creates metastorage if needed. + * + * @throws IgniteCheckedException If failed. + */ + private void finishRecovery() throws IgniteCheckedException { assert !cctx.kernalContext().clientNode(); long time = System.currentTimeMillis(); @@ -810,37 +836,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) lsnr.beforeResumeWalLogging(this); - cctx.pageStore().initializeForMetastorage(); - - CheckpointStatus status = readCheckpointStatus(); - - // Binary memory should be recovered at startup. - assert !status.needRestoreMemory() : status; - - WALPointer statusEndPtr = CheckpointStatus.NULL_PTR.equals(status.endPtr) ? null : status.endPtr; + // Try to resume logging since last finished checkpoint if possible. + if (walTail == null) { + CheckpointStatus status = readCheckpointStatus(); - // If binary memory recovery occurs resume from the last walTail in the other case from END checkpoint. - WALPointer walPtr = walTail == null ? statusEndPtr : walTail; + walTail = CheckpointStatus.NULL_PTR.equals(status.endPtr) ? null : status.endPtr; + } - cctx.wal().resumeLogging(walPtr); + cctx.wal().resumeLogging(walTail); walTail = null; - metaStorage = new MetaStorage( - cctx, - dataRegionMap.get(METASTORE_DATA_REGION_NAME), - (DataRegionMetricsImpl)memMetricsMap.get(METASTORE_DATA_REGION_NAME), - false - ); - - // Init metastore only after WAL logging resumed. Can't do it earlier because - // MetaStorage first initialization also touches WAL, look at #isWalDeltaRecordNeeded. - metaStorage.init(this); + // Recreate metastorage to refresh page memory state after deactivation. + if (metaStorage == null) + metaStorage = createMetastorage(false); notifyMetastorageReadyForReadWrite(); - for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) - lsnr.afterMemoryRestore(this); + U.log(log, "Finish recovery performed in " + (System.currentTimeMillis() - time) + " ms."); } catch (IgniteCheckedException e) { if (X.hasCause(e, StorageException.class, IOException.class)) @@ -850,17 +863,35 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } finally { checkpointReadUnlock(); - - U.log(log, "Resume logging performed in " + (System.currentTimeMillis() - time) + " ms."); } } /** - * @param cacheGrps Cache groups to restore. + * @param readOnly Metastorage read-only mode. + * @return Instance of Metastorage. + * @throws IgniteCheckedException If failed to create metastorage. + */ + private MetaStorage createMetastorage(boolean readOnly) throws IgniteCheckedException { + cctx.pageStore().initializeForMetastorage(); + + MetaStorage storage = new MetaStorage( + cctx, + dataRegion(METASTORE_DATA_REGION_NAME), + (DataRegionMetricsImpl) memMetricsMap.get(METASTORE_DATA_REGION_NAME), + readOnly + ); + + storage.init(this); + + return storage; + } + + /** + * @param cacheGroupsPredicate Cache groups to restore. * @return Last seen WAL pointer during binary memory recovery. * @throws IgniteCheckedException If failed. */ - protected WALPointer restoreBinaryMemory(Set<Integer> cacheGrps) throws IgniteCheckedException { + private WALPointer restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException { assert !cctx.kernalContext().clientNode(); long time = System.currentTimeMillis(); @@ -868,28 +899,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointReadLock(); try { - for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) - lsnr.beforeBinaryMemoryRestore(this); - - cctx.pageStore().initializeForMetastorage(); - CheckpointStatus status = readCheckpointStatus(); // First, bring memory to the last consistent checkpoint state if needed. // This method should return a pointer to the last valid record in the WAL. - WALPointer tailWalPtr = restoreMemory(status, - false, - (PageMemoryEx)dataRegionMap.get(METASTORE_DATA_REGION_NAME).pageMemory(), - cacheGrps); + WALPointer restored = performBinaryMemoryRestore(status, cacheGroupsPredicate, true); - if (tailWalPtr == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) { + if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) { throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " + - "[tailWalPtr=" + tailWalPtr + ", endPtr=" + status.endPtr + ']'); + "[tailWalPtr=" + restored + ", endPtr=" + status.endPtr + ']'); } - nodeStart(tailWalPtr); + nodeStart(restored); + + if (log.isInfoEnabled()) + log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms."); - return tailWalPtr; + return restored; } catch (IgniteCheckedException e) { if (X.hasCause(e, StorageException.class, IOException.class)) @@ -899,9 +925,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } finally { checkpointReadUnlock(); - - if (log.isInfoEnabled()) - log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms."); } } @@ -1015,21 +1038,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan MBEAN_GROUP, MBEAN_NAME ); + + metaStorage = null; } /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { super.stop0(cancel); - if (!cctx.kernalContext().clientNode()) { - if (fileLockHolder != null) { - if (log.isDebugEnabled()) - log.debug("Release file lock [nodeId=" + - cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]"); - - fileLockHolder.close(); - } - } + releaseFileLock(); } /** */ @@ -1322,34 +1339,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override public boolean beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - DiscoveryEvent discoEvt = fut.firstEvent(); + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + // Try to restore partition states. + if (fut.localJoinExchange() || fut.activateCluster() + || (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) { + U.doInParallel( + cctx.kernalContext().getSystemExecutorService(), + cctx.cache().cacheGroups(), + cacheGroup -> { + if (cacheGroup.isLocal()) + return; + + cctx.database().checkpointReadLock(); - boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED; - - boolean locNode = discoEvt.eventNode().isLocal(); - - boolean isSrvNode = !cctx.kernalContext().clientNode(); - - boolean clusterInTransitionStateToActive = fut.activateCluster(); - - boolean restored = false; - - long time = System.currentTimeMillis(); - - // In case of cluster activation or local join restore, restore whole manager state. - if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) { - restoreState(); - - restored = true; - } - // In case of starting groups, restore partition states only for these groups. - else if (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart())) { - Set<Integer> restoreGroups = fut.exchangeActions().cacheGroupsToStart().stream() - .map(actionData -> actionData.descriptor().groupId()) - .collect(Collectors.toSet()); + try { + cacheGroup.restorePartitionStates(Collections.emptyMap()); - restorePartitionStates(Collections.emptyMap(), restoreGroups); + if (cacheGroup.localStartVersion().equals(fut.initialVersion())) + cacheGroup.topology().afterStateRestored(fut.initialVersion()); + } + finally { + cctx.database().checkpointReadUnlock(); + } + } + ); } if (cctx.kernalContext().query().moduleEnabled()) { @@ -1366,11 +1379,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } } - - if (log.isInfoEnabled()) - log.info("Logical recovery performed in " + (System.currentTimeMillis() - time) + " ms."); - - return restored; } /** @@ -1661,52 +1669,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1); } - /** - * Restores from last checkpoint and applies WAL changes since this checkpoint. - * - * @throws IgniteCheckedException If failed to restore database status from WAL. - */ - private void restoreState() throws IgniteCheckedException { - try { - CheckpointStatus status = readCheckpointStatus(); - - checkpointReadLock(); - - try { - applyLastUpdates(status, false); - } - finally { - checkpointReadUnlock(); - } - - snapshotMgr.restoreState(); - } - catch (StorageException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * Called when all partitions have been fully restored and pre-created on node start. - * - * Starts checkpointing process and initiates first checkpoint. - * - * @throws IgniteCheckedException If first checkpoint has failed. - */ - @Override public void onStateRestored() throws IgniteCheckedException { - long time = System.currentTimeMillis(); - - new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start(); - - CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started"); - - if (chp != null) - chp.cpBeginFut.get(); - - if (log.isInfoEnabled()) - log.info("Checkpointer initilialzation performed in " + (System.currentTimeMillis() - time) + " ms."); - } - /** {@inheritDoc} */ @Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() { assert reservedForExchange == null : reservedForExchange; @@ -2005,39 +1967,146 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (kctx.clientNode()) return; - // Preform early regions startup before restoring state. - initAndStartRegions(kctx.config().getDataStorageConfiguration()); + checkpointReadLock(); + + try { + // Preform early regions startup before restoring state. + initAndStartRegions(kctx.config().getDataStorageConfiguration()); + + for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx)) + lsnr.beforeBinaryMemoryRestore(this); + + log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet()); + + cctx.pageStore().initializeForMetastorage(); + + // Restore binary memory for all not WAL disabled cache groups. + WALPointer restored = restoreBinaryMemory( + g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g) + ); + + if (restored != null) + U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']'); + + for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx)) + lsnr.afterBinaryMemoryRestore(this); + + cctx.wal().resumeLogging(restored); + + // We should log this record to ensure that node start marker pointer will be found in compacted segment. + cctx.wal().log(new MemoryRecoveryRecord(System.currentTimeMillis())); - // Only presistence caches to start. - for (DynamicCacheDescriptor desc : cctx.cache().cacheDescriptors().values()) { - if (CU.isPersistentCache(desc.cacheConfiguration(), cctx.gridConfig().getDataStorageConfiguration())) - storeMgr.initializeForCache(desc.groupDescriptor(), new StoredCacheData(desc.cacheConfiguration())); + assert metaStorage == null; + + metaStorage = createMetastorage(false); + + CheckpointStatus status = readCheckpointStatus(); + + RestoreLogicalState logicalState = applyLogicalUpdates( + status, + g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g), + true + ); + + // Restore state for all groups. + restorePartitionStates(cctx.cache().cacheGroups(), logicalState.partitionRecoveryStates); + + walTail = tailPointer(logicalState.lastRead); + + cctx.wal().onDeActivate(kctx); } + catch (IgniteCheckedException e) { + releaseFileLock(); - final WALPointer restoredPtr = restoreBinaryMemory(cctx.cache().cacheGroupDescriptors().keySet()); + throw e; + } + finally { + checkpointReadUnlock(); + } + } - walTail = restoredPtr; + /** + * Calculates tail pointer for WAL at the end of logical recovery. + * + * @param from Start replay WAL from. + * @return Tail pointer. + * @throws IgniteCheckedException If failed. + */ + private WALPointer tailPointer(WALPointer from) throws IgniteCheckedException { + WALPointer lastRead = from; + + try (WALIterator it = cctx.wal().replay(from)) { + while (it.hasNextX()) { + IgniteBiTuple<WALPointer, WALRecord> rec = it.nextX(); - if (restoredPtr != null) - U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restoredPtr + ']'); + if (rec == null) + break; + + lastRead = rec.get1(); + } + } + + return lastRead != null ? lastRead.next() : null; + } + + /** + * @param forGroups Cache groups. + * @param partitionStates Partition states. + * @throws IgniteCheckedException If failed. + */ + private void restorePartitionStates( + Collection<CacheGroupContext> forGroups, + Map<GroupPartitionId, PartitionRecoverState> partitionStates + ) throws IgniteCheckedException { + long startRestorePart = U.currentTimeMillis(); + + if (log.isInfoEnabled()) + log.info("Restoring partition state for local groups."); + + long totalProcessed = 0; + + for (CacheGroupContext grp : forGroups) + totalProcessed += grp.restorePartitionStates(partitionStates); + + if (log.isInfoEnabled()) + log.info("Finished restoring partition state for local groups [" + + "groupsProcessed" + forGroups.size() + + "partitionsProcessed=" + totalProcessed + + ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]"); + } + + /** + * Called when all partitions have been fully restored and pre-created on node start. + * + * Starts checkpointing process and initiates first checkpoint. + * + * @throws IgniteCheckedException If first checkpoint has failed. + */ + @Override public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException { + long time = System.currentTimeMillis(); + + new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start(); + + CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started"); + + if (chp != null) + chp.cpBeginFut.get(); + + if (log.isInfoEnabled()) + log.info("Checkpointer initilialzation performed in " + (System.currentTimeMillis() - time) + " ms."); } /** * @param status Checkpoint status. - * @param metastoreOnly If {@code True} restores Metastorage only. - * @param storePageMem Metastore page memory. - * @param cacheGrps Cache groups to restore. + * @param cacheGroupsPredicate Cache groups to restore. * @throws IgniteCheckedException If failed. * @throws StorageException In case I/O error occurred during operations with storage. */ - @Nullable private WALPointer restoreMemory( + private WALPointer performBinaryMemoryRestore( CheckpointStatus status, - boolean metastoreOnly, - PageMemoryEx storePageMem, - Set<Integer> cacheGrps + Predicate<Integer> cacheGroupsPredicate, + boolean finalizeState ) throws IgniteCheckedException { - assert !metastoreOnly || storePageMem != null; - if (log.isInfoEnabled()) log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked=" + status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']'); @@ -2045,8 +2114,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan boolean apply = status.needRestoreMemory(); if (apply) { - U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " + - "finish checkpoint on node start."); + if (finalizeState) + U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " + + "finish checkpoint on node start."); cctx.pageStore().beginRecover(); } @@ -2057,16 +2127,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long lastArchivedSegment = cctx.wal().lastArchivedSegment(); - RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, log); - - // Always perform recovery at least meta storage cache. - Set<Integer> restoreGrps = new HashSet<>(Collections.singletonList(METASTORAGE_CACHE_ID)); - - if (!metastoreOnly && !F.isEmpty(cacheGrps)) { - restoreGrps.addAll(cacheGrps.stream() - .filter(g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g)) - .collect(Collectors.toSet())); - } + RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, cacheGroupsPredicate); int applied = 0; @@ -2086,29 +2147,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // several repetitive restarts and the same pages may have changed several times. int grpId = pageRec.fullPageId().groupId(); - if (restoreGrps.contains(grpId)) { - long pageId = pageRec.fullPageId().pageId(); + long pageId = pageRec.fullPageId().pageId(); - PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId); - long page = pageMem.acquirePage(grpId, pageId, true); + if (pageMem == null) + break; - try { - long pageAddr = pageMem.writeLock(grpId, pageId, page); + long page = pageMem.acquirePage(grpId, pageId, true); - try { - PageUtils.putBytes(pageAddr, 0, pageRec.pageData()); - } - finally { - pageMem.writeUnlock(grpId, pageId, page, null, true, true); - } + try { + long pageAddr = pageMem.writeLock(grpId, pageId, page); + + try { + PageUtils.putBytes(pageAddr, 0, pageRec.pageData()); } finally { - pageMem.releasePage(grpId, pageId, page); + pageMem.writeUnlock(grpId, pageId, page, null, true, true); } - - applied++; } + finally { + pageMem.releasePage(grpId, pageId, page); + } + + applied++; } break; @@ -2119,9 +2181,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan { int grpId = metaStateRecord.groupId(); - if (!restoreGrps.contains(grpId)) - continue; - int partId = metaStateRecord.partitionId(); GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(metaStateRecord.state()); @@ -2140,10 +2199,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan { int grpId = destroyRecord.groupId(); - if (!restoreGrps.contains(grpId)) - continue; + PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId); - PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId); + if (pageMem == null) + break; pageMem.invalidate(grpId, destroyRecord.partitionId()); @@ -2158,37 +2217,38 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int grpId = r.groupId(); - if (restoreGrps.contains(grpId)) { - long pageId = r.pageId(); + long pageId = r.pageId(); - PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId); - // Here we do not require tag check because we may be applying memory changes after - // several repetitive restarts and the same pages may have changed several times. - long page = pageMem.acquirePage(grpId, pageId, true); + if (pageMem == null) + break; - try { - long pageAddr = pageMem.writeLock(grpId, pageId, page); + // Here we do not require tag check because we may be applying memory changes after + // several repetitive restarts and the same pages may have changed several times. + long page = pageMem.acquirePage(grpId, pageId, true); - try { - r.applyDelta(pageMem, pageAddr); - } - finally { - pageMem.writeUnlock(grpId, pageId, page, null, true, true); - } + try { + long pageAddr = pageMem.writeLock(grpId, pageId, page); + + try { + r.applyDelta(pageMem, pageAddr); } finally { - pageMem.releasePage(grpId, pageId, page); + pageMem.writeUnlock(grpId, pageId, page, null, true, true); } - - applied++; } + finally { + pageMem.releasePage(grpId, pageId, page); + } + + applied++; } } } } - if (metastoreOnly) + if (!finalizeState) return null; WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer(); @@ -2208,7 +2268,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cpHistory.initialize(retreiveHistory()); - return lastReadPtr == null ? null : lastReadPtr.next(); + return lastReadPtr != null ? lastReadPtr.next() : null; } /** @@ -2219,6 +2279,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor. */ private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException { + if (grpId == MetaStorage.METASTORAGE_CACHE_ID) + return (PageMemoryEx)dataRegion(METASTORE_DATA_REGION_NAME).pageMemory(); + // TODO IGNITE-7792 add generic mapping. if (grpId == TxLog.TX_LOG_CACHE_ID) return (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory(); @@ -2229,7 +2292,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CacheGroupDescriptor desc = sharedCtx.cache().cacheGroupDescriptors().get(grpId); if (desc == null) - throw new IgniteCheckedException("Failed to find cache group descriptor [grpId=" + grpId + ']'); + return null; String memPlcName = desc.config().getDataRegionName(); @@ -2242,13 +2305,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param it WalIterator. * @param recPredicate Wal record filter. * @param entryPredicate Entry filter. - * @param partStates Partition to restore state. + * @param partitionRecoveryStates Partition to restore state. */ public void applyUpdatesOnRecovery( @Nullable WALIterator it, IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate, IgnitePredicate<DataEntry> entryPredicate, - Map<T2<Integer, Integer>, T2<Integer, Long>> partStates + Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates ) throws IgniteCheckedException { cctx.walState().runWithOutWAL(() -> { if (it != null) { @@ -2306,7 +2369,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointReadLock(); try { - restorePartitionStates(partStates, null); + restorePartitionStates(cctx.cache().cacheGroups(), partitionRecoveryStates); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -2319,31 +2382,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param status Last registered checkpoint status. - * @param metastoreOnly If {@code True} only records related to metastorage will be processed. * @throws IgniteCheckedException If failed to apply updates. * @throws StorageException If IO exception occurred while reading write-ahead log. */ - private void applyLastUpdates(CheckpointStatus status, boolean metastoreOnly) throws IgniteCheckedException { + private RestoreLogicalState applyLogicalUpdates( + CheckpointStatus status, + Predicate<Integer> cacheGroupsPredicate, + boolean skipFieldLookup + ) throws IgniteCheckedException { if (log.isInfoEnabled()) log.info("Applying lost cache updates since last checkpoint record [lastMarked=" + status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']'); - if (!metastoreOnly) + if (skipFieldLookup) cctx.kernalContext().query().skipFieldLookup(true); long lastArchivedSegment = cctx.wal().lastArchivedSegment(); - RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, log); + RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, cacheGroupsPredicate); long start = U.currentTimeMillis(); - int applied = 0; - Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() : - F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps); + int applied = 0; try (WALIterator it = cctx.wal().replay(status.startPtr)) { - Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>(); - while (it.hasNextX()) { WALRecord rec = restoreLogicalState.next(it); @@ -2352,9 +2414,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan switch (rec.type()) { case DATA_RECORD: - if (metastoreOnly) - continue; - DataRecord dataRec = (DataRecord)rec; for (DataEntry dataEntry : dataRec.writeEntries()) { @@ -2366,27 +2425,27 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (cacheDesc == null) continue; - if (!ignoreGrps.contains(cacheDesc.groupId())) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - applyUpdate(cacheCtx, dataEntry); + applyUpdate(cacheCtx, dataEntry); - applied++; - } + applied++; } break; case PART_META_UPDATE_STATE: - if (metastoreOnly) - continue; - PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec; - if (!ignoreGrps.contains(metaStateRecord.groupId())) { - partStates.put(new T2<>(metaStateRecord.groupId(), metaStateRecord.partitionId()), - new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter())); - } + GroupPartitionId groupPartitionId = new GroupPartitionId( + metaStateRecord.groupId(), metaStateRecord.partitionId() + ); + + PartitionRecoverState state = new PartitionRecoverState( + (int)metaStateRecord.state(), metaStateRecord.updateCounter() + ); + + restoreLogicalState.partitionRecoveryStates.put(groupPartitionId, state); break; @@ -2400,13 +2459,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: - if (metastoreOnly) - continue; - PageDeltaRecord rec0 = (PageDeltaRecord) rec; PageMemoryEx pageMem = getPageMemoryForCacheGroup(rec0.groupId()); + if (pageMem == null) + break; + long page = pageMem.acquirePage(rec0.groupId(), rec0.pageId(), true); try { @@ -2429,166 +2488,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // Skip other records. } } - - if (!metastoreOnly) { - long startRestorePart = U.currentTimeMillis(); - - if (log.isInfoEnabled()) - log.info("Restoring partition state for local groups [cntPartStateWal=" - + partStates.size() + ", lastCheckpointId=" + status.cpStartId + ']'); - - long proc = restorePartitionStates(partStates, null); - - if (log.isInfoEnabled()) - log.info("Finished restoring partition state for local groups [cntProcessed=" + proc + - ", cntPartStateWal=" + partStates.size() + - ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]"); - } } finally { - if (!metastoreOnly) + if (skipFieldLookup) cctx.kernalContext().query().skipFieldLookup(false); } if (log.isInfoEnabled()) log.info("Finished applying WAL changes [updatesApplied=" + applied + ", time=" + (U.currentTimeMillis() - start) + " ms]"); - } - /** - * Initializes not empty partitions and restores their state from page memory or WAL. - * Partition states presented in page memory may be overriden by states restored from WAL {@code partStates}. - * - * @param partStates Partition states restored from WAL. - * @param onlyForGroups If not {@code null} restore states only for specified cache groups. - * @return cntParts Count of partitions processed. - * @throws IgniteCheckedException If failed to restore partition states. - */ - private long restorePartitionStates( - Map<T2<Integer, Integer>, T2<Integer, Long>> partStates, - @Nullable Set<Integer> onlyForGroups - ) throws IgniteCheckedException { - long cntParts = 0; - - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal() || !grp.affinityNode()) { - // Local cache has no partitions and its states. - continue; - } - - if (!grp.dataRegion().config().isPersistenceEnabled()) - continue; - - if (onlyForGroups != null && !onlyForGroups.contains(grp.groupId())) - continue; - - int grpId = grp.groupId(); - - PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); - - for (int i = 0; i < grp.affinity().partitions(); i++) { - T2<Integer, Long> restore = partStates.get(new T2<>(grpId, i)); - - if (storeMgr.exists(grpId, i)) { - storeMgr.ensure(grpId, i); - - if (storeMgr.pages(grpId, i) <= 1) - continue; - - if (log.isDebugEnabled()) - log.debug("Creating partition on recovery (exists in page store) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + i + "]"); - - GridDhtLocalPartition part = grp.topology().forceCreatePartition(i); - - assert part != null; - - // TODO: https://issues.apache.org/jira/browse/IGNITE-6097 - grp.offheap().onPartitionInitialCounterUpdated(i, 0); - - checkpointReadLock(); - - try { - long partMetaId = pageMem.partitionMetaPageId(grpId, i); - long partMetaPage = pageMem.acquirePage(grpId, partMetaId); - - try { - long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); - - boolean changed = false; - - try { - PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - - if (restore != null) { - int stateId = restore.get1(); - - io.setPartitionState(pageAddr, (byte)stateId); - - changed = updateState(part, stateId); - - if (stateId == GridDhtPartitionState.OWNING.ordinal() - || (stateId == GridDhtPartitionState.MOVING.ordinal() - && part.initialUpdateCounter() < restore.get2())) { - part.initialUpdateCounter(restore.get2()); - - changed = true; - } - - if (log.isDebugEnabled()) - log.debug("Restored partition state (from WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() + - "updCntr=" + part.initialUpdateCounter() + "]"); - } - else { - changed = updateState(part, (int) io.getPartitionState(pageAddr)); - - if (log.isDebugEnabled()) - log.debug("Restored partition state (from page memory) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() + - "updCntr=" + part.initialUpdateCounter() + "]"); - } - } - finally { - pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); - } - } - finally { - pageMem.releasePage(grpId, partMetaId, partMetaPage); - } - } - finally { - checkpointReadUnlock(); - } - } - else if (restore != null) { - if (log.isDebugEnabled()) - log.debug("Creating partition on recovery (exists in WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + i + "]"); - - GridDhtLocalPartition part = grp.topology().forceCreatePartition(i); - - assert part != null; - - // TODO: https://issues.apache.org/jira/browse/IGNITE-6097 - grp.offheap().onPartitionInitialCounterUpdated(i, 0); - - updateState(part, restore.get1()); - - if (log.isDebugEnabled()) - log.debug("Restored partition state (from WAL) " + - "[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() + - "updCntr=" + part.initialUpdateCounter() + "]"); - } - - cntParts++; - } - - // After partition states are restored, it is necessary to update internal data structures in topology. - grp.topology().afterStateRestored(grp.topology().lastTopologyChangeVersion()); - } - - return cntParts; + return restoreLogicalState; } /** @@ -2604,25 +2514,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param part Partition to restore state for. - * @param stateId State enum ordinal. - * @return Updated flag. - */ - private boolean updateState(GridDhtLocalPartition part, int stateId) { - if (stateId != -1) { - GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId); - - assert state != null; - - part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state); - - return true; - } - - return false; - } - - /** * @param cacheCtx Cache context to apply an update. * @param dataEntry Data entry to apply. * @throws IgniteCheckedException If failed to restore. @@ -3143,6 +3034,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan while (!isCancelled()) { waitCheckpointEvent(); + if (skipCheckpointOnNodeStop && (isStopping() || shutdownNow)) { + if (log.isInfoEnabled()) + log.warning("Skipping last checkpoint because node is stopping."); + + return; + } + GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied; if (enableChangeApplied != null) { @@ -3574,7 +3472,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (req != null) req.waitCompleted(); - if (log.isDebugEnabled()) + if (req != null && log.isDebugEnabled()) log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]"); } @@ -4137,7 +4035,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan else { CacheGroupContext grp = context().cache().cacheGroup(grpId); - DataRegion region = grp != null ?grp .dataRegion() : null; + DataRegion region = grp != null ? grp.dataRegion() : null; if (region == null || !region.config().isPersistenceEnabled()) continue; @@ -4645,10 +4543,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * */ private void fillWalDisabledGroups() { - MetaStorage meta = cctx.database().metaStorage(); + assert metaStorage != null; try { - Set<String> keys = meta.readForPredicate(WAL_KEY_PREFIX_PRED).keySet(); + Set<String> keys = metaStorage.readForPredicate(WAL_KEY_PREFIX_PRED).keySet(); if (keys.isEmpty()) return; @@ -4713,23 +4611,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Abstract class for create restore context. */ - public abstract static class RestoreStateContext { - /** */ - protected final IgniteLogger log; - + private abstract class RestoreStateContext { /** Last archived segment. */ protected final long lastArchivedSegment; /** Last read record WAL pointer. */ protected FileWALPointer lastRead; + /** Only {@link WalRecordCacheGroupAware} records satisfied this predicate will be applied. */ + private final Predicate<Integer> cacheGroupPredicate; + + /** Set to {@code true} if data records should be skipped. */ + private final boolean skipDataRecords; + /** * @param lastArchivedSegment Last archived segment index. - * @param log Ignite logger. */ - public RestoreStateContext(long lastArchivedSegment, IgniteLogger log) { + public RestoreStateContext(long lastArchivedSegment, Predicate<Integer> cacheGroupPredicate, boolean skipDataRecords) { this.lastArchivedSegment = lastArchivedSegment; - this.log = log; + this.cacheGroupPredicate = cacheGroupPredicate; + this.skipDataRecords = skipDataRecords; } /** @@ -4741,17 +4642,63 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ public WALRecord next(WALIterator it) throws IgniteCheckedException { try { - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); + for (;;) { + if (!it.hasNextX()) + return null; - WALRecord rec = tup.get2(); + IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); - WALPointer ptr = tup.get1(); + if (tup == null) + return null; - lastRead = (FileWALPointer)ptr; + WALRecord rec = tup.get2(); - rec.position(ptr); + WALPointer ptr = tup.get1(); - return rec; + lastRead = (FileWALPointer)ptr; + + rec.position(ptr); + + // Filter out records. + if (rec instanceof WalRecordCacheGroupAware) { + WalRecordCacheGroupAware groupAwareRecord = (WalRecordCacheGroupAware) rec; + + if (!cacheGroupPredicate.test(groupAwareRecord.groupId())) + continue; + } + + switch (rec.type()) { + case METASTORE_DATA_RECORD: + case DATA_RECORD: + if (skipDataRecords) + continue; + + if (rec instanceof DataRecord) { + DataRecord dataRecord = (DataRecord) rec; + + // Filter data entries by group id. + List<DataEntry> filteredEntries = dataRecord.writeEntries().stream() + .filter(entry -> { + if (entry == null) + return false; + + int cacheId = entry.cacheId(); + + return cctx != null && cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId()); + }) + .collect(Collectors.toList()); + + dataRecord.setWriteEntries(filteredEntries); + } + + break; + + default: + break; + } + + return rec; + } } catch (IgniteCheckedException e) { boolean throwsCRCError = throwsCRCError(); @@ -4791,7 +4738,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Restore memory context. Tracks the safety of binary recovery. */ - public static class RestoreBinaryState extends RestoreStateContext { + private class RestoreBinaryState extends RestoreStateContext { /** Checkpoint status. */ private final CheckpointStatus status; @@ -4801,13 +4748,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param status Checkpoint status. * @param lastArchivedSegment Last archived segment index. - * @param log Ignite logger. */ - public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, IgniteLogger log) { - super(lastArchivedSegment, log); + public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) { + super(lastArchivedSegment, cacheGroupsPredicate, true); this.status = status; - needApplyBinaryUpdates = status.needRestoreMemory(); + this.needApplyBinaryUpdates = status.needRestoreMemory(); } /** @@ -4867,13 +4813,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Restore logical state context. Tracks the safety of logical recovery. */ - public static class RestoreLogicalState extends RestoreStateContext { + private class RestoreLogicalState extends RestoreStateContext { + /** States of partitions recovered during applying logical updates. */ + private final Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates = new HashMap<>(); + /** * @param lastArchivedSegment Last archived segment index. - * @param log Ignite logger. */ - public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) { - super(lastArchivedSegment, log); + public RestoreLogicalState(long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) { + super(lastArchivedSegment, cacheGroupsPredicate, false); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 21bd454..4966bca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.management.InstanceNotFoundException; import org.apache.ignite.DataRegionMetrics; import org.apache.ignite.DataStorageMetrics; @@ -45,6 +46,7 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -91,13 +93,19 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap private static final long MAX_PAGE_MEMORY_INIT_SIZE_32_BIT = 2L * 1024 * 1024 * 1024; /** {@code True} to reuse memory on deactive. */ - private final boolean reuseMemory = IgniteSystemProperties.getBoolean(IGNITE_REUSE_MEMORY_ON_DEACTIVATE); + protected final boolean reuseMemory = IgniteSystemProperties.getBoolean(IGNITE_REUSE_MEMORY_ON_DEACTIVATE); + + /** */ + protected final Map<String, DataRegion> dataRegionMap = new ConcurrentHashMap<>(); + + /** Stores memory providers eligible for reuse. */ + private final Map<String, DirectMemoryProvider> memProviderMap = new ConcurrentHashMap<>(); /** */ private static final String MBEAN_GROUP_NAME = "DataRegionMetrics"; /** */ - protected volatile Map<String, DataRegion> dataRegionMap; + protected final Map<String, DataRegionMetrics> memMetricsMap = new ConcurrentHashMap<>(); /** */ private volatile boolean dataRegionsInitialized; @@ -106,9 +114,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap private volatile boolean dataRegionsStarted; /** */ - protected Map<String, DataRegionMetrics> memMetricsMap; - - /** */ protected DataRegion dfltDataRegion; /** */ @@ -123,8 +128,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** First eviction was warned flag. */ private volatile boolean firstEvictWarn; - /** Stores memory providers eligible for reuse. */ - private Map<String, DirectMemoryProvider> memProviderMap; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -267,6 +270,17 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * + */ + private void startDataRegions() { + for (DataRegion region : dataRegionMap.values()) { + region.pageMemory().start(); + + region.evictionTracker().start(); + } + } + + /** * @param memCfg Database config. * @throws IgniteCheckedException If failed to initialize swap path. */ @@ -288,12 +302,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException { DataRegionConfiguration[] dataRegionCfgs = memCfg.getDataRegionConfigurations(); - int dataRegions = dataRegionCfgs == null ? 0 : dataRegionCfgs.length; - - dataRegionMap = U.newHashMap(3 + dataRegions); - memMetricsMap = U.newHashMap(3 + dataRegions); - memProviderMap = reuseMemory ? U.newHashMap(3 + dataRegions) : null; - if (dataRegionCfgs != null) { for (DataRegionConfiguration dataRegionCfg : dataRegionCfgs) addDataRegion(memCfg, dataRegionCfg, dataRegionCfg.isPersistenceEnabled()); @@ -346,14 +354,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(dataRegionCfg, freeSpaceProvider(dataRegionCfg)); - DataRegion memPlc = initMemory(dataStorageCfg, dataRegionCfg, memMetrics, trackable); + DataRegion region = initMemory(dataStorageCfg, dataRegionCfg, memMetrics, trackable); - dataRegionMap.put(dataRegionName, memPlc); + dataRegionMap.put(dataRegionName, region); memMetricsMap.put(dataRegionName, memMetrics); if (dataRegionName.equals(dfltMemPlcName)) - dfltDataRegion = memPlc; + dfltDataRegion = region; else if (dataRegionName.equals(DFLT_DATA_REG_DEFAULT_NAME)) U.warn(log, "Data Region with name 'default' isn't used as a default. " + "Please, check Data Region configuration."); @@ -689,13 +697,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** - * @throws IgniteCheckedException If fails. - */ - public void onDoneRestoreBinaryMemory() throws IgniteCheckedException { - // No-op. - } - - /** * Creates file with current timestamp and specific "node-started.bin" suffix * and writes into memory recovery pointer. * @@ -729,7 +730,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap if (memPlcName == null) return dfltDataRegion; - if (dataRegionMap == null) + if (dataRegionMap.isEmpty()) return null; DataRegion plc; @@ -856,11 +857,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * @param discoEvt Before exchange for the given discovery event. - * - * @return {@code True} if partitions have been restored from persistent storage. */ - public boolean beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException { - return false; + public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException { + } /** @@ -878,7 +877,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * * @throws IgniteCheckedException If failed. */ - public void onStateRestored() throws IgniteCheckedException { + public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException { // No-op. } @@ -1026,7 +1025,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * * @return {@code True} if policy supports memory reuse. */ - private boolean supportsMemoryReuse(DataRegionConfiguration plcCfg) { + public boolean supportsMemoryReuse(DataRegionConfiguration plcCfg) { return reuseMemory && plcCfg.getSwapPath() == null; } @@ -1209,11 +1208,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap registerMetricsMBeans(cctx.gridConfig()); - for (DataRegion memPlc : dataRegionMap.values()) { - memPlc.pageMemory().start(); - - memPlc.evictionTracker().start(); - } + startDataRegions(); initPageMemoryDataStructures(cfg); @@ -1234,33 +1229,26 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) lsnr.beforeStop(this); - if (dataRegionMap != null) { - for (DataRegion memPlc : dataRegionMap.values()) { - memPlc.pageMemory().stop(shutdown); + for (DataRegion region : dataRegionMap.values()) { + region.pageMemory().stop(shutdown); - memPlc.evictionTracker().stop(); + region.evictionTracker().stop(); - unregisterMetricsMBean( - cctx.gridConfig(), - MBEAN_GROUP_NAME, - memPlc.memoryMetrics().getName() - ); + unregisterMetricsMBean( + cctx.gridConfig(), + MBEAN_GROUP_NAME, + region.memoryMetrics().getName() + ); } - dataRegionMap.clear(); + dataRegionMap.clear(); - dataRegionMap = null; + if (shutdown && memProviderMap != null) + memProviderMap.clear(); - if (shutdown && memProviderMap != null) { - memProviderMap.clear(); + dataRegionsInitialized = false; - memProviderMap = null; - } - - dataRegionsInitialized = false; - - dataRegionsStarted = false; - } + dataRegionsStarted = false; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java index 104697e..e0c545b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java @@ -22,10 +22,6 @@ import java.io.IOException; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.OpenOption; -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.READ; -import static java.nio.file.StandardOpenOption.WRITE; - /** * File I/O factory which uses {@link AsynchronousFileChannel} based implementation of FileIO. */ @@ -37,11 +33,6 @@ public class AsyncFileIOFactory implements FileIOFactory { private transient volatile ThreadLocal<AsyncFileIO.ChannelOpFuture> holder = initHolder(); /** {@inheritDoc} */ - @Override public FileIO create(File file) throws IOException { - return create(file, CREATE, READ, WRITE); - } - - /** {@inheritDoc} */ @Override public FileIO create(File file, OpenOption... modes) throws IOException { if (holder == null) { synchronized (this) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java index 336aab6..b4b0389 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIOFactory.java @@ -76,13 +76,6 @@ public class EncryptedFileIOFactory implements FileIOFactory { } /** {@inheritDoc} */ - @Override public FileIO create(File file) throws IOException { - FileIO io = plainIOFactory.create(file); - - return new EncryptedFileIO(io, groupId, pageSize, headerSize, encMgr, encSpi); - } - - /** {@inheritDoc} */ @Override public FileIO create(File file, OpenOption... modes) throws IOException { FileIO io = plainIOFactory.create(file, modes); http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java index 2735185..b236000 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java @@ -22,6 +22,10 @@ import java.io.IOException; import java.io.Serializable; import java.nio.file.OpenOption; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; + /** * {@link FileIO} factory definition. */ @@ -33,7 +37,9 @@ public interface FileIOFactory extends Serializable { * @return File I/O interface. * @throws IOException If I/O interface creation was failed. */ - public FileIO create(File file) throws IOException; + default FileIO create(File file) throws IOException{ + return create(file, CREATE, READ, WRITE); + }; /** * Creates I/O interface for file with specified mode. @@ -43,5 +49,5 @@ public interface FileIOFactory extends Serializable { * @return File I/O interface. * @throws IOException If I/O interface creation was failed. */ - public FileIO create(File file, OpenOption... modes) throws IOException; + FileIO create(File file, OpenOption... modes) throws IOException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index fdf4705..16d74c3 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -258,6 +258,10 @@ public class FilePageStore implements PageStore { + ", delete=" + delete + "]", e); } finally { + allocatedTracker.updateTotalAllocatedPages(-1L * allocated.getAndSet(0) / pageSize); + + inited = false; + lock.writeLock().unlock(); } } @@ -542,7 +546,8 @@ public class FilePageStore implements PageStore { long off = pageOffset(pageId); assert (off >= 0 && off <= allocated.get()) || recover : - "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId); + "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + + ", pageId=" + U.hexLong(pageId) + ", file=" + cfgFile.getPath(); assert pageBuf.capacity() == pageSize; assert pageBuf.position() == 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java index 856ba1c..3fa3e2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java @@ -21,10 +21,6 @@ import java.io.File; import java.io.IOException; import java.nio.file.OpenOption; -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.READ; -import static java.nio.file.StandardOpenOption.WRITE; - /** * File I/O factory which provides RandomAccessFileIO implementation of FileIO. */ @@ -33,11 +29,6 @@ public class RandomAccessFileIOFactory implements FileIOFactory { private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public FileIO create(File file) throws IOException { - return create(file, CREATE, READ, WRITE); - } - - /** {@inheritDoc} */ @Override public FileIO create(File file, OpenOption... modes) throws IOException { return new RandomAccessFileIO(file, modes); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java index 8ab418c..12cc446 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageLifecycleListener.java @@ -33,7 +33,7 @@ public interface MetastorageLifecycleListener { * * @param metastorage Read-only meta storage. */ - public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException; + default void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException { }; /** * Fully functional metastore capable of performing reading and writing operations. @@ -43,5 +43,5 @@ public interface MetastorageLifecycleListener { * * @param metastorage Fully functional meta storage. */ - public void onReadyForReadWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException; + default void onReadyForReadWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException { }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java new file mode 100644 index 0000000..4c7e4d7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionRecoverState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.partstate; + +/** + * Class holds state of partition during recovery process. + */ +public class PartitionRecoverState { + /** State id. */ + private final int stateId; + + /** Update counter. */ + private final long updateCounter; + + /** + * @param stateId State id. + * @param updateCounter Update counter. + */ + public PartitionRecoverState(int stateId, long updateCounter) { + this.stateId = stateId; + this.updateCounter = updateCounter; + } + + /** + * + */ + public int stateId() { + return stateId; + } + + /** + * + */ + public long updateCounter() { + return updateCounter; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c076aee4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java index 16cc8f5..75aa983 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java @@ -94,10 +94,6 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri /** * */ - public void restoreState() throws IgniteCheckedException { - // No-op. - } - public boolean snapshotOperationInProgress(){ return false; }