Repository: ignite Updated Branches: refs/heads/master 20ffb7641 -> 27786a05e
IGNITE-10556 Skip WAL records deserialization on recovery if possible - Fixes #5624. Signed-off-by: Pavel Kovalenko <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27786a05 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27786a05 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27786a05 Branch: refs/heads/master Commit: 27786a05e2993334c5db403ed59fdaa0c36b0722 Parents: 20ffb76 Author: Pavel Kovalenko <[email protected]> Authored: Wed Dec 12 17:52:02 2018 +0300 Committer: Pavel Kovalenko <[email protected]> Committed: Wed Dec 12 17:52:02 2018 +0300 ---------------------------------------------------------------------- .../encryption/example-encrypted-store.xml | 2 + .../pagemem/wal/IgniteWriteAheadLogManager.java | 16 + .../internal/pagemem/wal/WALIterator.java | 6 +- .../internal/pagemem/wal/record/DataRecord.java | 5 +- .../pagemem/wal/record/MvccTxRecord.java | 8 +- .../internal/pagemem/wal/record/WALRecord.java | 167 ++++++---- .../GridCacheDatabaseSharedManager.java | 310 ++++++++++++------- .../persistence/GridCacheOffheapManager.java | 11 + .../wal/AbstractWalRecordsIterator.java | 11 + .../wal/FileWriteAheadLogManager.java | 11 +- .../serializer/RecordSerializerFactoryImpl.java | 7 +- .../wal/serializer/RecordV1Serializer.java | 3 +- .../wal/serializer/RecordV2Serializer.java | 3 +- .../db/IgniteLogicalRecoveryTest.java | 17 +- .../persistence/pagemem/NoOpWALManager.java | 7 + 15 files changed, 400 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/examples/config/encryption/example-encrypted-store.xml ---------------------------------------------------------------------- diff --git a/examples/config/encryption/example-encrypted-store.xml b/examples/config/encryption/example-encrypted-store.xml index e526ae3..7ce5482 100644 --- a/examples/config/encryption/example-encrypted-store.xml +++ b/examples/config/encryption/example-encrypted-store.xml @@ -28,6 +28,8 @@ <property name="defaultDataRegionConfiguration"> <bean class="org.apache.ignite.configuration.DataRegionConfiguration"> <property name="persistenceEnabled" value="true"/> + + <property name="maxSize" value="#{128 * 1024 * 1024}"/> </bean> </property> </bean> http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index 679eec9..8a4d3a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -24,6 +24,8 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.jetbrains.annotations.Nullable; /** * @@ -102,6 +104,20 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public WALIterator replay(WALPointer start) throws IgniteCheckedException, StorageException; /** + * Invoke this method to iterate over the written log entries. + * + * @param start Optional WAL pointer from which to start iteration. + * @param recordDeserializeFilter Specify a filter to skip WAL records. Those records will not be explicitly deserialized. + * @return Records iterator. + * @throws IgniteException If failed to start iteration. + * @throws StorageException If IO error occurred while reading WAL entries. + */ + public WALIterator replay( + WALPointer start, + @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter + ) throws IgniteCheckedException, StorageException; + + /** * Invoke this method to reserve WAL history since provided pointer and prevent it's deletion. * * @param start WAL pointer. http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java index 14fdfda..b3c9726 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.pagemem.wal; +import java.util.Optional; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteBiTuple; @@ -25,5 +26,8 @@ import org.apache.ignite.lang.IgniteBiTuple; * */ public interface WALIterator extends GridCloseableIterator<IgniteBiTuple<WALPointer, WALRecord>> { - // Iterator alias. + /** + * @return Pointer of last read valid record. Empty if no records were read. + */ + public Optional<WALPointer> lastRead(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java index d5ab53a..ef6c3ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java @@ -78,9 +78,12 @@ public class DataRecord extends TimeStampRecord { /** * @param writeEntries Write entries. + * @return {@code this} for chaining. */ - public void setWriteEntries(List<DataEntry> writeEntries) { + public DataRecord setWriteEntries(List<DataEntry> writeEntries) { this.writeEntries = writeEntries; + + return this; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java index 82c4409..86ad983 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record; import java.util.Collection; import java.util.Map; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.transactions.TransactionState; @@ -29,7 +30,7 @@ import org.jetbrains.annotations.Nullable; * Logical data record indented for MVCC transaction related actions.<br> * This record is marker of prepare, commit, and rollback transactions. */ -public class MvccTxRecord extends TxRecord { +public class MvccTxRecord extends TxRecord implements WalRecordCacheGroupAware { /** Transaction mvcc snapshot version. */ private final MvccVersion mvccVer; @@ -86,6 +87,11 @@ public class MvccTxRecord extends TxRecord { } /** {@inheritDoc} */ + @Override public int groupId() { + return TxLog.TX_LOG_CACHE_ID; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MvccTxRecord.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index ad8a2a4..5d72768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -19,9 +19,12 @@ package org.apache.ignite.internal.pagemem.wal.record; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.*; + /** * Log entry abstract class. */ @@ -32,126 +35,126 @@ public abstract class WALRecord { */ public enum RecordType { /** */ - TX_RECORD, + TX_RECORD (LOGICAL), /** */ - PAGE_RECORD, + PAGE_RECORD (PHYSICAL), /** */ - DATA_RECORD, + DATA_RECORD (LOGICAL), /** Checkpoint (begin) record */ - CHECKPOINT_RECORD, + CHECKPOINT_RECORD (PHYSICAL), /** WAL segment header record. */ - HEADER_RECORD, + HEADER_RECORD (INTERNAL), // Delta records. /** */ - INIT_NEW_PAGE_RECORD, + INIT_NEW_PAGE_RECORD (PHYSICAL), /** */ - DATA_PAGE_INSERT_RECORD, + DATA_PAGE_INSERT_RECORD (PHYSICAL), /** */ - DATA_PAGE_INSERT_FRAGMENT_RECORD, + DATA_PAGE_INSERT_FRAGMENT_RECORD (PHYSICAL), /** */ - DATA_PAGE_REMOVE_RECORD, + DATA_PAGE_REMOVE_RECORD (PHYSICAL), /** */ - DATA_PAGE_SET_FREE_LIST_PAGE, + DATA_PAGE_SET_FREE_LIST_PAGE (PHYSICAL), /** */ - BTREE_META_PAGE_INIT_ROOT, + BTREE_META_PAGE_INIT_ROOT (PHYSICAL), /** */ - BTREE_META_PAGE_ADD_ROOT, + BTREE_META_PAGE_ADD_ROOT (PHYSICAL), /** */ - BTREE_META_PAGE_CUT_ROOT, + BTREE_META_PAGE_CUT_ROOT (PHYSICAL), /** */ - BTREE_INIT_NEW_ROOT, + BTREE_INIT_NEW_ROOT (PHYSICAL), /** */ - BTREE_PAGE_RECYCLE, + BTREE_PAGE_RECYCLE (PHYSICAL), /** */ - BTREE_PAGE_INSERT, + BTREE_PAGE_INSERT (PHYSICAL), /** */ - BTREE_FIX_LEFTMOST_CHILD, + BTREE_FIX_LEFTMOST_CHILD (PHYSICAL), /** */ - BTREE_FIX_COUNT, + BTREE_FIX_COUNT (PHYSICAL), /** */ - BTREE_PAGE_REPLACE, + BTREE_PAGE_REPLACE (PHYSICAL), /** */ - BTREE_PAGE_REMOVE, + BTREE_PAGE_REMOVE (PHYSICAL), /** */ - BTREE_PAGE_INNER_REPLACE, + BTREE_PAGE_INNER_REPLACE (PHYSICAL), /** */ - BTREE_FIX_REMOVE_ID, + BTREE_FIX_REMOVE_ID (PHYSICAL), /** */ - BTREE_FORWARD_PAGE_SPLIT, + BTREE_FORWARD_PAGE_SPLIT (PHYSICAL), /** */ - BTREE_EXISTING_PAGE_SPLIT, + BTREE_EXISTING_PAGE_SPLIT (PHYSICAL), /** */ - BTREE_PAGE_MERGE, + BTREE_PAGE_MERGE (PHYSICAL), /** */ - PAGES_LIST_SET_NEXT, + PAGES_LIST_SET_NEXT (PHYSICAL), /** */ - PAGES_LIST_SET_PREVIOUS, + PAGES_LIST_SET_PREVIOUS (PHYSICAL), /** */ - PAGES_LIST_INIT_NEW_PAGE, + PAGES_LIST_INIT_NEW_PAGE (PHYSICAL), /** */ - PAGES_LIST_ADD_PAGE, + PAGES_LIST_ADD_PAGE (PHYSICAL), /** */ - PAGES_LIST_REMOVE_PAGE, + PAGES_LIST_REMOVE_PAGE (PHYSICAL), /** */ - META_PAGE_INIT, + META_PAGE_INIT (PHYSICAL), /** */ - PARTITION_META_PAGE_UPDATE_COUNTERS, + PARTITION_META_PAGE_UPDATE_COUNTERS (PHYSICAL), /** Memory recovering start marker */ MEMORY_RECOVERY, /** */ - TRACKING_PAGE_DELTA, + TRACKING_PAGE_DELTA (PHYSICAL), /** Meta page update last successful snapshot id. */ - META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID, + META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID (MIXED), /** Meta page update last successful full snapshot id. */ - META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID, + META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID (MIXED), /** Meta page update next snapshot id. */ - META_PAGE_UPDATE_NEXT_SNAPSHOT_ID, + META_PAGE_UPDATE_NEXT_SNAPSHOT_ID (MIXED), /** Meta page update last allocated index. */ - META_PAGE_UPDATE_LAST_ALLOCATED_INDEX, + META_PAGE_UPDATE_LAST_ALLOCATED_INDEX (MIXED), /** Partition meta update state. */ - PART_META_UPDATE_STATE, + PART_META_UPDATE_STATE (MIXED), /** Page list meta reset count record. */ - PAGE_LIST_META_RESET_COUNT_RECORD, + PAGE_LIST_META_RESET_COUNT_RECORD (PHYSICAL), /** Switch segment record. * Marker record for indicate end of segment. @@ -160,22 +163,22 @@ public abstract class WALRecord { * that one byte in the end,then we write SWITCH_SEGMENT_RECORD as marker end of segment. * No need write CRC or WAL pointer for this record. It is byte marker record. * */ - SWITCH_SEGMENT_RECORD, + SWITCH_SEGMENT_RECORD (INTERNAL), /** */ - DATA_PAGE_UPDATE_RECORD, + DATA_PAGE_UPDATE_RECORD (PHYSICAL), /** init */ - BTREE_META_PAGE_INIT_ROOT2, + BTREE_META_PAGE_INIT_ROOT2 (PHYSICAL), /** Partition destroy. */ - PARTITION_DESTROY, + PARTITION_DESTROY (PHYSICAL), /** Snapshot record. */ SNAPSHOT, /** Metastore data record. */ - METASTORE_DATA_RECORD, + METASTORE_DATA_RECORD (LOGICAL), /** Exchange record. */ EXCHANGE, @@ -184,28 +187,57 @@ public abstract class WALRecord { RESERVED, /** Rotated id part record. */ - ROTATED_ID_PART_RECORD, + ROTATED_ID_PART_RECORD (PHYSICAL), /** */ - MVCC_DATA_PAGE_MARK_UPDATED_RECORD, + MVCC_DATA_PAGE_MARK_UPDATED_RECORD (PHYSICAL), /** */ - MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD, + MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD (PHYSICAL), /** */ - MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD, + MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD (PHYSICAL), /** Encrypted WAL-record. */ - ENCRYPTED_RECORD, + ENCRYPTED_RECORD (PHYSICAL), /** Ecnrypted data record. */ - ENCRYPTED_DATA_RECORD, + ENCRYPTED_DATA_RECORD (LOGICAL), /** Mvcc data record. */ - MVCC_DATA_RECORD, + MVCC_DATA_RECORD (LOGICAL), /** Mvcc Tx state change record. */ - MVCC_TX_RECORD; + MVCC_TX_RECORD (LOGICAL); + + /** + * When you're adding a new record don't forget to choose record purpose explicitly + * if record is needed for physical or logical recovery. + * By default the purpose of record is {@link RecordPurpose#CUSTOM} and this record will not be used in recovery process. + * For more information read description of {@link RecordPurpose}. + */ + private final RecordPurpose purpose; + + /** + * @param purpose Purpose. + */ + RecordType(RecordPurpose purpose) { + this.purpose = purpose; + } + + /** + * Default constructor. + */ + RecordType() { + this(CUSTOM); + } + + /** + * @return Purpose of record. + */ + public RecordPurpose purpose() { + return purpose; + } /** */ private static final RecordType[] VALS = RecordType.values(); @@ -223,6 +255,37 @@ public abstract class WALRecord { public static final int STOP_ITERATION_RECORD_TYPE = 0; } + /** + * Record purposes set. + */ + public enum RecordPurpose { + /** + * Internal records are needed for correct iterating over WAL structure. + * These records will never be returned to user during WAL iteration. + */ + INTERNAL, + /** + * Physical records are needed for correct recovering physical state of {@link org.apache.ignite.internal.pagemem.PageMemory}. + * {@link org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager#restoreBinaryMemory(org.apache.ignite.lang.IgnitePredicate, org.apache.ignite.lang.IgniteBiPredicate)}. + */ + PHYSICAL, + /** + * Logical records are needed to replay logical updates since last checkpoint. + * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointStatus, org.apache.ignite.lang.IgnitePredicate, org.apache.ignite.lang.IgniteBiPredicate, boolean)} + */ + LOGICAL, + /** + * Physical-logical records are used both for physical and logical recovery. + * Usually these records contain meta-information about partitions. + * NOTE: Not recommend to use this type without strong reason. + */ + MIXED, + /** + * Custom records are needed for any custom iterations over WAL in various components. + */ + CUSTOM + } + /** */ private int size; http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index fd56262..73c5fbb 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -175,6 +176,7 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize; /** @@ -724,11 +726,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().start(); - performBinaryMemoryRestore(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false); + performBinaryMemoryRestore(status, onlyMetastorageGroup(), physicalRecords(), false); metaStorage = createMetastorage(true); - applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false, true); + applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageRecords(), false); fillWalDisabledGroups(); @@ -911,10 +913,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param cacheGroupsPredicate Cache groups to restore. + * @param recordTypePredicate Filter records by type. * @return Last seen WAL pointer during binary memory recovery. * @throws IgniteCheckedException If failed. */ - private RestoreBinaryState restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException { + private RestoreBinaryState restoreBinaryMemory( + IgnitePredicate<Integer> cacheGroupsPredicate, + IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate + ) throws IgniteCheckedException { long time = System.currentTimeMillis(); try { @@ -927,9 +933,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // First, bring memory to the last consistent checkpoint state if needed. // This method should return a pointer to the last valid record in the WAL. - RestoreBinaryState binaryState = performBinaryMemoryRestore(status, cacheGroupsPredicate, true); + RestoreBinaryState binaryState = performBinaryMemoryRestore( + status, + cacheGroupsPredicate, + recordTypePredicate, + true + ); - WALPointer restored = binaryState.lastReadRecordPointer(); + WALPointer restored = binaryState.lastReadRecordPointer().map(FileWALPointer::next).orElse(null); if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) { throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " + @@ -1918,7 +1929,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // Restore binary memory for all not WAL disabled cache groups. restoreBinaryMemory( - g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g) + groupsWithEnabledWal(), + physicalRecords() ); if (recoveryVerboseLogging && log.isInfoEnabled()) { @@ -1930,10 +1942,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CheckpointStatus status = readCheckpointStatus(); RestoreLogicalState logicalState = applyLogicalUpdates( - status, - g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g), - true, - false + status, + groupsWithEnabledWal(), + logicalRecords(), + true ); if (recoveryVerboseLogging && log.isInfoEnabled()) { @@ -1942,7 +1954,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan dumpPartitionsInfo(cctx, log); } - walTail = tailPointer(logicalState.lastRead); + walTail = tailPointer(logicalState.lastReadRecordPointer().orElse(null)); cctx.wal().onDeActivate(kctx); } @@ -2002,20 +2014,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @throws IgniteCheckedException If failed. */ private WALPointer tailPointer(WALPointer from) throws IgniteCheckedException { - WALPointer lastRead = from; + WALIterator it = cctx.wal().replay(from); - try (WALIterator it = cctx.wal().replay(from)) { + try { while (it.hasNextX()) { IgniteBiTuple<WALPointer, WALRecord> rec = it.nextX(); if (rec == null) break; - - lastRead = rec.get1(); } } + finally { + it.close(); + } - return lastRead != null ? lastRead.next() : null; + return it.lastRead().map(WALPointer::next).orElse(null); } /** @@ -2051,7 +2064,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private RestoreBinaryState performBinaryMemoryRestore( CheckpointStatus status, - Predicate<Integer> cacheGroupsPredicate, + IgnitePredicate<Integer> cacheGroupsPredicate, + IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate, boolean finalizeState ) throws IgniteCheckedException { if (log.isInfoEnabled()) @@ -2074,13 +2088,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long lastArchivedSegment = cctx.wal().lastArchivedSegment(); - RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, cacheGroupsPredicate); + WALIterator it = cctx.wal().replay(status.endPtr, recordTypePredicate); + + RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, it, lastArchivedSegment, cacheGroupsPredicate); int applied = 0; - try (WALIterator it = cctx.wal().replay(status.endPtr)) { + try { while (it.hasNextX()) { - WALRecord rec = restoreBinaryState.next(it); + WALRecord rec = restoreBinaryState.next(); if (rec == null) break; @@ -2194,11 +2210,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } } + finally { + it.close(); + } if (!finalizeState) return null; - FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer(); + FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer().orElse(null); if (status.needRestoreMemory()) { if (restoreBinaryState.needApplyBinaryUpdate()) @@ -2215,9 +2234,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cpHistory.initialize(retreiveHistory()); - // Move pointer position to the end of last read record. - restoreBinaryState.lastRead = lastReadPtr != null ? lastReadPtr.next() : lastReadPtr; - return restoreBinaryState; } @@ -2275,7 +2291,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan switch (rec.type()) { case MVCC_DATA_RECORD: - case DATA_RECORD: + case DATA_RECORD: checkpointReadLock(); try { @@ -2311,24 +2327,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; - case MVCC_TX_RECORD: - checkpointReadLock(); + case MVCC_TX_RECORD: + checkpointReadLock(); - try { - MvccTxRecord txRecord = (MvccTxRecord)rec; + try { + MvccTxRecord txRecord = (MvccTxRecord)rec; - byte txState = convertToTxState(txRecord.state()); + byte txState = convertToTxState(txRecord.state()); - cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - finally { - checkpointReadUnlock(); - } + cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + checkpointReadUnlock(); + } - break; + break; default: // Skip other records. @@ -2344,9 +2360,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private RestoreLogicalState applyLogicalUpdates( CheckpointStatus status, - Predicate<Integer> cacheGroupsPredicate, - boolean skipFieldLookup, - boolean metaStoreOnly + IgnitePredicate<Integer> cacheGroupsPredicate, + IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate, + boolean skipFieldLookup ) throws IgniteCheckedException { if (log.isInfoEnabled()) log.info("Applying lost cache updates since last checkpoint record [lastMarked=" @@ -2355,17 +2371,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (skipFieldLookup) cctx.kernalContext().query().skipFieldLookup(true); - long lastArchivedSegment = cctx.wal().lastArchivedSegment(); - - RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, cacheGroupsPredicate); - long start = U.currentTimeMillis(); int applied = 0; - try (WALIterator it = cctx.wal().replay(status.startPtr)) { + long lastArchivedSegment = cctx.wal().lastArchivedSegment(); + + WALIterator it = cctx.wal().replay(status.startPtr, recordTypePredicate); + + RestoreLogicalState restoreLogicalState = new RestoreLogicalState(it, lastArchivedSegment, cacheGroupsPredicate); + + try { while (it.hasNextX()) { - WALRecord rec = restoreLogicalState.next(it); + WALRecord rec = restoreLogicalState.next(); if (rec == null) break; @@ -2373,6 +2391,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan switch (rec.type()) { case MVCC_DATA_RECORD: case DATA_RECORD: + case ENCRYPTED_DATA_RECORD: DataRecord dataRec = (DataRecord)rec; for (DataEntry dataEntry : dataRec.writeEntries()) { @@ -2393,6 +2412,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; + case MVCC_TX_RECORD: + MvccTxRecord txRecord = (MvccTxRecord)rec; + + byte txState = convertToTxState(txRecord.state()); + + cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false); + + break; + case PART_META_UPDATE_STATE: PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec; @@ -2418,6 +2446,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: + case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: PageDeltaRecord rec0 = (PageDeltaRecord) rec; PageMemoryEx pageMem = getPageMemoryForCacheGroup(rec0.groupId()); @@ -2443,24 +2472,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; - case MVCC_TX_RECORD: - if (metaStoreOnly) - continue; - - MvccTxRecord txRecord = (MvccTxRecord)rec; - - byte txState = convertToTxState(txRecord.state()); - - cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false); - - break; - default: // Skip other records. } } } finally { + it.close(); + if (skipFieldLookup) cctx.kernalContext().query().skipFieldLookup(false); } @@ -4660,6 +4679,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan assert pageStore != null : "Persistent cache should have initialize page store manager."; for (int p = 0; p < grp.affinity().partitions(); p++) { + if (grp.topology().localPartition(p) != null) { + GridDhtLocalPartition part = grp.topology().localPartition(p); + + log.info("Partition [grp=" + grp.cacheOrGroupName() + + ", id=" + p + + ", state=" + part.state() + + ", counter=" + part.updateCounter() + + ", size=" + part.fullSize() + "]"); + + continue; + } + if (!pageStore.exists(grp.groupId(), p)) continue; @@ -4707,13 +4738,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * Recovery lifecycle for read-write metastorage. */ private class MetastorageRecoveryLifecycle implements DatabaseLifecycleListener { + /** {@inheritDoc} */ @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { cctx.pageStore().initializeForMetastorage(); } + /** {@inheritDoc} */ @Override public void afterBinaryMemoryRestore( IgniteCacheDatabaseSharedManager mgr, - RestoreBinaryState restoreState) throws IgniteCheckedException { + RestoreBinaryState restoreState + ) throws IgniteCheckedException { assert metaStorage == null; metaStorage = createMetastorage(false); @@ -4721,44 +4755,84 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * Abstract class for create restore context. + * @return Cache group predicate that passes only Metastorage cache group id. + */ + private IgnitePredicate<Integer> onlyMetastorageGroup() { + return groupId -> MetaStorage.METASTORAGE_CACHE_ID == groupId; + } + + /** + * @return Cache group predicate that passes only cache groups with enabled WAL. + */ + private IgnitePredicate<Integer> groupsWithEnabledWal() { + return groupId -> !initiallyGlobalWalDisabledGrps.contains(groupId) + && !initiallyLocalWalDisabledGrps.contains(groupId); + } + + /** + * @return WAL records predicate that passes only Metastorage data records. + */ + private IgniteBiPredicate<WALRecord.RecordType, WALPointer> onlyMetastorageRecords() { + return (type, ptr) -> type == METASTORE_DATA_RECORD; + } + + /** + * @return WAL records predicate that passes only physical and mixed WAL records. + */ + private IgniteBiPredicate<WALRecord.RecordType, WALPointer> physicalRecords() { + return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.PHYSICAL + || type.purpose() == WALRecord.RecordPurpose.MIXED; + } + + /** + * @return WAL records predicate that passes only logical and mixed WAL records. + */ + private IgniteBiPredicate<WALRecord.RecordType, WALPointer> logicalRecords() { + return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.LOGICAL + || type.purpose() == WALRecord.RecordPurpose.MIXED; + } + + /** + * Abstract class to create restore context. */ private abstract class RestoreStateContext { /** Last archived segment. */ protected final long lastArchivedSegment; - /** Last read record WAL pointer. */ - protected FileWALPointer lastRead; + /** WAL iterator. */ + private final WALIterator iterator; /** Only {@link WalRecordCacheGroupAware} records satisfied this predicate will be applied. */ - private final Predicate<Integer> cacheGroupPredicate; - - /** Set to {@code true} if data records should be skipped. */ - private final boolean skipDataRecords; + private final IgnitePredicate<Integer> cacheGroupPredicate; /** + * @param iterator WAL iterator. * @param lastArchivedSegment Last archived segment index. + * @param cacheGroupPredicate Cache groups predicate. */ - public RestoreStateContext(long lastArchivedSegment, Predicate<Integer> cacheGroupPredicate, boolean skipDataRecords) { + protected RestoreStateContext( + WALIterator iterator, + long lastArchivedSegment, + IgnitePredicate<Integer> cacheGroupPredicate + ) { + this.iterator = iterator; this.lastArchivedSegment = lastArchivedSegment; this.cacheGroupPredicate = cacheGroupPredicate; - this.skipDataRecords = skipDataRecords; } /** * Advance iterator to the next record. * - * @param it WAL iterator. * @return WALRecord entry. * @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring. */ - public WALRecord next(WALIterator it) throws IgniteCheckedException { + public WALRecord next() throws IgniteCheckedException { try { for (;;) { - if (!it.hasNextX()) + if (!iterator.hasNextX()) return null; - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); + IgniteBiTuple<WALPointer, WALRecord> tup = iterator.nextX(); if (tup == null) return null; @@ -4767,45 +4841,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan WALPointer ptr = tup.get1(); - lastRead = (FileWALPointer)ptr; - rec.position(ptr); - // Filter out records. + // Filter out records by group id. if (rec instanceof WalRecordCacheGroupAware) { - WalRecordCacheGroupAware groupAwareRecord = (WalRecordCacheGroupAware) rec; + WalRecordCacheGroupAware grpAwareRecord = (WalRecordCacheGroupAware) rec; - if (!cacheGroupPredicate.test(groupAwareRecord.groupId())) + if (!cacheGroupPredicate.apply(grpAwareRecord.groupId())) continue; } - switch (rec.type()) { - case METASTORE_DATA_RECORD: - case MVCC_DATA_RECORD: - case DATA_RECORD: - if (skipDataRecords) - continue; - - if (rec instanceof DataRecord) { - DataRecord dataRecord = (DataRecord) rec; - - // Filter data entries by group id. - List<DataEntry> filteredEntries = dataRecord.writeEntries().stream() - .filter(entry -> { - int cacheId = entry.cacheId(); - - return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId()); - }) - .collect(Collectors.toList()); - - dataRecord.setWriteEntries(filteredEntries); - } - - break; - - default: - break; - } + // Filter out data entries by group id. + if (rec instanceof DataRecord) + rec = filterEntriesByGroupId((DataRecord) rec); return rec; } @@ -4827,21 +4875,37 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Filter outs data entries from given data record that not satisfy {@link #cacheGroupPredicate}. + * + * @param record Original data record. + * @return Data record with filtered data entries. + */ + private DataRecord filterEntriesByGroupId(DataRecord record) { + List<DataEntry> filteredEntries = record.writeEntries().stream() + .filter(entry -> { + int cacheId = entry.cacheId(); + + return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.apply(cctx.cacheContext(cacheId).groupId()); + }) + .collect(Collectors.toList()); + + return record.setWriteEntries(filteredEntries); + } + + /** * * @return Last read WAL record pointer. */ - public FileWALPointer lastReadRecordPointer() { - return lastRead; + public Optional<FileWALPointer> lastReadRecordPointer() { + return iterator.lastRead().map(ptr -> (FileWALPointer)ptr); } /** * * @return Flag indicates need throws CRC exception or not. */ - public boolean throwsCRCError(){ - FileWALPointer lastReadPtr = lastRead; - - return lastReadPtr != null && lastReadPtr.index() <= lastArchivedSegment; + public boolean throwsCRCError() { + return lastReadRecordPointer().filter(ptr -> ptr.index() <= lastArchivedSegment).isPresent(); } } @@ -4857,10 +4921,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param status Checkpoint status. + * @param iterator WAL iterator. * @param lastArchivedSegment Last archived segment index. + * @param cacheGroupsPredicate Cache groups predicate. */ - public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) { - super(lastArchivedSegment, cacheGroupsPredicate, true); + public RestoreBinaryState( + CheckpointStatus status, + WALIterator iterator, + long lastArchivedSegment, + IgnitePredicate<Integer> cacheGroupsPredicate + ) { + super(iterator, lastArchivedSegment, cacheGroupsPredicate); this.status = status; this.needApplyBinaryUpdates = status.needRestoreMemory(); @@ -4869,12 +4940,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Advance iterator to the next record. * - * @param it WAL iterator. * @return WALRecord entry. * @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring. */ - @Override public WALRecord next(WALIterator it) throws IgniteCheckedException { - WALRecord rec = super.next(it); + @Override public WALRecord next() throws IgniteCheckedException { + WALRecord rec = super.next(); if (rec == null) return null; @@ -4911,7 +4981,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ @Override public boolean throwsCRCError() { log.info("Throws CRC error check [needApplyBinaryUpdates=" + needApplyBinaryUpdates + - ", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" + lastRead + ']'); + ", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" + lastReadRecordPointer() + ']'); if (needApplyBinaryUpdates) return true; @@ -4930,8 +5000,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param lastArchivedSegment Last archived segment index. */ - public RestoreLogicalState(long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) { - super(lastArchivedSegment, cacheGroupsPredicate, false); + public RestoreLogicalState(WALIterator iterator, long lastArchivedSegment, IgnitePredicate<Integer> cacheGroupsPredicate) { + super(iterator, lastArchivedSegment, cacheGroupsPredicate); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 04255c0..f78428d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -194,6 +194,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } } + syncMetadata(execSvc, ctx, needSnapshot); + } + + /** + * Syncs and saves meta-information of all data structures to page memory. + * + * @param execSvc Executor service to run save process + * @param ctx Checkpoint listener context. + * @throws IgniteCheckedException If failed. + */ + private void syncMetadata(Executor execSvc, Context ctx, boolean needSnapshot) throws IgniteCheckedException { if (execSvc == null) { reuseList.saveMetadata(); http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 3cbe577..f37b154 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteOrder; +import java.util.Optional; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -92,6 +93,9 @@ public abstract class AbstractWalRecordsIterator /** Factory to provide I/O interfaces for read primitives with files. */ private final SegmentFileInputFactory segmentFileInputFactory; + /** Position of last read valid record. */ + private WALPointer lastRead; + /** * @param log Logger. * @param sharedCtx Shared context. @@ -154,6 +158,8 @@ public abstract class AbstractWalRecordsIterator curRec = advanceRecord(currWalSegment); if (curRec != null) { + lastRead = curRec.get1(); + if (curRec.get2().type() == null) continue; // Record was skipped by filter of current serializer, should read next record. @@ -183,6 +189,11 @@ public abstract class AbstractWalRecordsIterator } } + /** {@inheritDoc} */ + @Override public Optional<WALPointer> lastRead() { + return Optional.ofNullable(lastRead); + } + /** * @param tailReachedException Tail reached exception. * @param currWalSegment Current WAL segment read handler. http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index fad1ec1..addbbd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -114,6 +114,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; @@ -839,6 +840,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public WALIterator replay(WALPointer start) throws IgniteCheckedException, StorageException { + return replay(start, null); + } + + /** {@inheritDoc} */ + @Override public WALIterator replay( + WALPointer start, + @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter + ) throws IgniteCheckedException, StorageException { assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start; FileWriteHandle hnd = currentHandle(); @@ -855,7 +864,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl (FileWALPointer)start, end, dsCfg, - new RecordSerializerFactoryImpl(cctx), + new RecordSerializerFactoryImpl(cctx).recordDeserializeFilter(recordDeserializeFilter), ioFactory, archiver, decompressor, http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java index c149817..96b78e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.lang.IgniteBiPredicate; +import org.jetbrains.annotations.Nullable; /** * @@ -34,7 +35,7 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { private boolean needWritePointer; /** Read record filter. */ - private IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter; + private @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter; /** * Marshalled mode flag. @@ -56,7 +57,7 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { */ public RecordSerializerFactoryImpl( GridCacheSharedContext cctx, - IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter + @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter ) { this.cctx = cctx; this.recordDeserializeFilter = readTypeFilter; @@ -114,7 +115,7 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { /** {@inheritDoc} */ @Override public RecordSerializerFactoryImpl recordDeserializeFilter( - IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter + @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter ) { this.recordDeserializeFilter = readTypeFilter; http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index ee5a1e2..5b36d9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -144,7 +144,8 @@ public class RecordV1Serializer implements RecordSerializer { rec.position(ptr); - if (recordFilter != null && !recordFilter.apply(rec.type(), ptr)) + if (recType.purpose() != WALRecord.RecordPurpose.INTERNAL + && recordFilter != null && !recordFilter.apply(rec.type(), ptr)) return FilteredRecord.INSTANCE; else if (marshalledMode) { ByteBuffer buf = heapTlb.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index d27a331..0d78d08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -122,7 +122,8 @@ public class RecordV2Serializer implements RecordSerializer { ", expected pointer [idx=" + exp.index() + ", offset=" + exp.fileOffset() + "]"); } - if (recordFilter != null && !recordFilter.apply(recType, ptr)) { + if (recType.purpose() != WALRecord.RecordPurpose.INTERNAL + && recordFilter != null && !recordFilter.apply(recType, ptr)) { int toSkip = ptr.length() - REC_TYPE_SIZE - FILE_WAL_POINTER_SIZE - CRC_SIZE; assert toSkip >= 0 : "Too small saved record length: " + ptr; http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java index c629420..b919e41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; @@ -49,6 +50,8 @@ import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; @@ -261,7 +264,7 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest { */ @Test public void testRecoveryWithMvccCaches() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10052"); + fail("https://issues.apache.org/jira/browse/IGNITE-10582"); List<CacheConfiguration> dynamicCaches = Lists.newArrayList( cacheConfiguration(DYNAMIC_CACHE_PREFIX + 0, CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT), @@ -409,13 +412,21 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest { List<Ignite> nodes = G.allGrids(); - for (Ignite node : nodes) { + for (final Ignite node : nodes) { TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(node); + Set<Integer> mvccCaches = ((IgniteEx) node).context().cache().cacheGroups().stream() + .flatMap(group -> group.caches().stream()) + .filter(cache -> cache.config().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) + .map(GridCacheContext::groupId) + .collect(Collectors.toSet()); + List<Integer> rebalancedGroups = spi.recordedMessages(true).stream() .map(msg -> (GridDhtPartitionDemandMessage) msg) - .map(msg -> msg.groupId()) + .map(GridCacheGroupIdMessage::groupId) .filter(grpId -> grpId != sysCacheGroupId) + //TODO: remove following filter when failover for MVCC will be fixed. + .filter(grpId -> !mvccCaches.contains(grpId)) .distinct() .collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index ea3ed2f..8d854bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -26,7 +26,9 @@ import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; +import org.jetbrains.annotations.Nullable; /** * @@ -73,6 +75,11 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ + @Override public WALIterator replay(WALPointer start, @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter) throws IgniteCheckedException, StorageException { + return null; + } + + /** {@inheritDoc} */ @Override public boolean reserve(WALPointer start) { return false; }
