Repository: ignite
Updated Branches:
  refs/heads/master 20ffb7641 -> 27786a05e


IGNITE-10556 Skip WAL records deserialization on recovery if possible - Fixes 
#5624.

Signed-off-by: Pavel Kovalenko <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27786a05
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27786a05
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27786a05

Branch: refs/heads/master
Commit: 27786a05e2993334c5db403ed59fdaa0c36b0722
Parents: 20ffb76
Author: Pavel Kovalenko <[email protected]>
Authored: Wed Dec 12 17:52:02 2018 +0300
Committer: Pavel Kovalenko <[email protected]>
Committed: Wed Dec 12 17:52:02 2018 +0300

----------------------------------------------------------------------
 .../encryption/example-encrypted-store.xml      |   2 +
 .../pagemem/wal/IgniteWriteAheadLogManager.java |  16 +
 .../internal/pagemem/wal/WALIterator.java       |   6 +-
 .../internal/pagemem/wal/record/DataRecord.java |   5 +-
 .../pagemem/wal/record/MvccTxRecord.java        |   8 +-
 .../internal/pagemem/wal/record/WALRecord.java  | 167 ++++++----
 .../GridCacheDatabaseSharedManager.java         | 310 ++++++++++++-------
 .../persistence/GridCacheOffheapManager.java    |  11 +
 .../wal/AbstractWalRecordsIterator.java         |  11 +
 .../wal/FileWriteAheadLogManager.java           |  11 +-
 .../serializer/RecordSerializerFactoryImpl.java |   7 +-
 .../wal/serializer/RecordV1Serializer.java      |   3 +-
 .../wal/serializer/RecordV2Serializer.java      |   3 +-
 .../db/IgniteLogicalRecoveryTest.java           |  17 +-
 .../persistence/pagemem/NoOpWALManager.java     |   7 +
 15 files changed, 400 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/examples/config/encryption/example-encrypted-store.xml
----------------------------------------------------------------------
diff --git a/examples/config/encryption/example-encrypted-store.xml 
b/examples/config/encryption/example-encrypted-store.xml
index e526ae3..7ce5482 100644
--- a/examples/config/encryption/example-encrypted-store.xml
+++ b/examples/config/encryption/example-encrypted-store.xml
@@ -28,6 +28,8 @@
                 <property name="defaultDataRegionConfiguration">
                     <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
                         <property name="persistenceEnabled" value="true"/>
+
+                        <property name="maxSize" value="#{128 * 1024 * 1024}"/>
                     </bean>
                 </property>
             </bean>

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 679eec9..8a4d3a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -24,6 +24,8 @@ import 
org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import 
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -102,6 +104,20 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
     public WALIterator replay(WALPointer start) throws IgniteCheckedException, 
StorageException;
 
     /**
+     * Invoke this method to iterate over the written log entries.
+     *
+     * @param start Optional WAL pointer from which to start iteration.
+     * @param recordDeserializeFilter Specify a filter to skip WAL records. 
Those records will not be explicitly deserialized.
+     * @return Records iterator.
+     * @throws IgniteException If failed to start iteration.
+     * @throws StorageException If IO error occurred while reading WAL entries.
+     */
+    public WALIterator replay(
+        WALPointer start,
+        @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordDeserializeFilter
+    ) throws IgniteCheckedException, StorageException;
+
+    /**
      * Invoke this method to reserve WAL history since provided pointer and 
prevent it's deletion.
      *
      * @param start WAL pointer.

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java
index 14fdfda..b3c9726 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/WALIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagemem.wal;
 
+import java.util.Optional;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -25,5 +26,8 @@ import org.apache.ignite.lang.IgniteBiTuple;
  *
  */
 public interface WALIterator extends 
GridCloseableIterator<IgniteBiTuple<WALPointer, WALRecord>> {
-    // Iterator alias.
+    /**
+     * @return Pointer of last read valid record. Empty if no records were 
read.
+     */
+    public Optional<WALPointer> lastRead();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index d5ab53a..ef6c3ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -78,9 +78,12 @@ public class DataRecord extends TimeStampRecord {
 
     /**
      * @param writeEntries Write entries.
+     * @return {@code this} for chaining.
      */
-    public void setWriteEntries(List<DataEntry> writeEntries) {
+    public DataRecord setWriteEntries(List<DataEntry> writeEntries) {
         this.writeEntries = writeEntries;
+
+        return this;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
index 82c4409..86ad983 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.transactions.TransactionState;
@@ -29,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
  * Logical data record indented for MVCC transaction related actions.<br>
  * This record is marker of prepare, commit, and rollback transactions.
  */
-public class MvccTxRecord extends TxRecord {
+public class MvccTxRecord extends TxRecord implements WalRecordCacheGroupAware 
{
     /** Transaction mvcc snapshot version. */
     private final MvccVersion mvccVer;
 
@@ -86,6 +87,11 @@ public class MvccTxRecord extends TxRecord {
     }
 
     /** {@inheritDoc} */
+    @Override public int groupId() {
+        return TxLog.TX_LOG_CACHE_ID;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(MvccTxRecord.class, this, "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index ad8a2a4..5d72768 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.pagemem.wal.record;
 
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.*;
+
 /**
  * Log entry abstract class.
  */
@@ -32,126 +35,126 @@ public abstract class WALRecord {
      */
     public enum RecordType {
         /** */
-        TX_RECORD,
+        TX_RECORD (LOGICAL),
 
         /** */
-        PAGE_RECORD,
+        PAGE_RECORD (PHYSICAL),
 
         /** */
-        DATA_RECORD,
+        DATA_RECORD (LOGICAL),
 
         /** Checkpoint (begin) record */
-        CHECKPOINT_RECORD,
+        CHECKPOINT_RECORD (PHYSICAL),
 
         /** WAL segment header record. */
-        HEADER_RECORD,
+        HEADER_RECORD (INTERNAL),
 
         // Delta records.
 
         /** */
-        INIT_NEW_PAGE_RECORD,
+        INIT_NEW_PAGE_RECORD (PHYSICAL),
 
         /** */
-        DATA_PAGE_INSERT_RECORD,
+        DATA_PAGE_INSERT_RECORD (PHYSICAL),
 
         /** */
-        DATA_PAGE_INSERT_FRAGMENT_RECORD,
+        DATA_PAGE_INSERT_FRAGMENT_RECORD (PHYSICAL),
 
         /** */
-        DATA_PAGE_REMOVE_RECORD,
+        DATA_PAGE_REMOVE_RECORD (PHYSICAL),
 
         /** */
-        DATA_PAGE_SET_FREE_LIST_PAGE,
+        DATA_PAGE_SET_FREE_LIST_PAGE (PHYSICAL),
 
         /** */
-        BTREE_META_PAGE_INIT_ROOT,
+        BTREE_META_PAGE_INIT_ROOT (PHYSICAL),
 
         /** */
-        BTREE_META_PAGE_ADD_ROOT,
+        BTREE_META_PAGE_ADD_ROOT (PHYSICAL),
 
         /** */
-        BTREE_META_PAGE_CUT_ROOT,
+        BTREE_META_PAGE_CUT_ROOT (PHYSICAL),
 
         /** */
-        BTREE_INIT_NEW_ROOT,
+        BTREE_INIT_NEW_ROOT (PHYSICAL),
 
         /** */
-        BTREE_PAGE_RECYCLE,
+        BTREE_PAGE_RECYCLE (PHYSICAL),
 
         /** */
-        BTREE_PAGE_INSERT,
+        BTREE_PAGE_INSERT (PHYSICAL),
 
         /** */
-        BTREE_FIX_LEFTMOST_CHILD,
+        BTREE_FIX_LEFTMOST_CHILD (PHYSICAL),
 
         /** */
-        BTREE_FIX_COUNT,
+        BTREE_FIX_COUNT (PHYSICAL),
 
         /** */
-        BTREE_PAGE_REPLACE,
+        BTREE_PAGE_REPLACE (PHYSICAL),
 
         /** */
-        BTREE_PAGE_REMOVE,
+        BTREE_PAGE_REMOVE (PHYSICAL),
 
         /** */
-        BTREE_PAGE_INNER_REPLACE,
+        BTREE_PAGE_INNER_REPLACE (PHYSICAL),
 
         /** */
-        BTREE_FIX_REMOVE_ID,
+        BTREE_FIX_REMOVE_ID (PHYSICAL),
 
         /** */
-        BTREE_FORWARD_PAGE_SPLIT,
+        BTREE_FORWARD_PAGE_SPLIT (PHYSICAL),
 
         /** */
-        BTREE_EXISTING_PAGE_SPLIT,
+        BTREE_EXISTING_PAGE_SPLIT (PHYSICAL),
 
         /** */
-        BTREE_PAGE_MERGE,
+        BTREE_PAGE_MERGE (PHYSICAL),
 
         /** */
-        PAGES_LIST_SET_NEXT,
+        PAGES_LIST_SET_NEXT (PHYSICAL),
 
         /** */
-        PAGES_LIST_SET_PREVIOUS,
+        PAGES_LIST_SET_PREVIOUS (PHYSICAL),
 
         /** */
-        PAGES_LIST_INIT_NEW_PAGE,
+        PAGES_LIST_INIT_NEW_PAGE (PHYSICAL),
 
         /** */
-        PAGES_LIST_ADD_PAGE,
+        PAGES_LIST_ADD_PAGE (PHYSICAL),
 
         /** */
-        PAGES_LIST_REMOVE_PAGE,
+        PAGES_LIST_REMOVE_PAGE (PHYSICAL),
 
         /** */
-        META_PAGE_INIT,
+        META_PAGE_INIT (PHYSICAL),
 
         /** */
-        PARTITION_META_PAGE_UPDATE_COUNTERS,
+        PARTITION_META_PAGE_UPDATE_COUNTERS (PHYSICAL),
 
         /** Memory recovering start marker */
         MEMORY_RECOVERY,
 
         /** */
-        TRACKING_PAGE_DELTA,
+        TRACKING_PAGE_DELTA (PHYSICAL),
 
         /** Meta page update last successful snapshot id. */
-        META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID,
+        META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID (MIXED),
 
         /** Meta page update last successful full snapshot id. */
-        META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID,
+        META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID (MIXED),
 
         /** Meta page update next snapshot id. */
-        META_PAGE_UPDATE_NEXT_SNAPSHOT_ID,
+        META_PAGE_UPDATE_NEXT_SNAPSHOT_ID (MIXED),
 
         /** Meta page update last allocated index. */
-        META_PAGE_UPDATE_LAST_ALLOCATED_INDEX,
+        META_PAGE_UPDATE_LAST_ALLOCATED_INDEX (MIXED),
 
         /** Partition meta update state. */
-        PART_META_UPDATE_STATE,
+        PART_META_UPDATE_STATE (MIXED),
 
         /** Page list meta reset count record. */
-        PAGE_LIST_META_RESET_COUNT_RECORD,
+        PAGE_LIST_META_RESET_COUNT_RECORD (PHYSICAL),
 
         /** Switch segment record.
          *  Marker record for indicate end of segment.
@@ -160,22 +163,22 @@ public abstract class WALRecord {
          *  that one byte in the end,then we write SWITCH_SEGMENT_RECORD as 
marker end of segment.
          *  No need write CRC or WAL pointer for this record. It is byte 
marker record.
          *  */
-        SWITCH_SEGMENT_RECORD,
+        SWITCH_SEGMENT_RECORD (INTERNAL),
 
         /** */
-        DATA_PAGE_UPDATE_RECORD,
+        DATA_PAGE_UPDATE_RECORD (PHYSICAL),
 
         /** init */
-        BTREE_META_PAGE_INIT_ROOT2,
+        BTREE_META_PAGE_INIT_ROOT2 (PHYSICAL),
 
         /** Partition destroy. */
-        PARTITION_DESTROY,
+        PARTITION_DESTROY (PHYSICAL),
 
         /** Snapshot record. */
         SNAPSHOT,
 
         /** Metastore data record. */
-        METASTORE_DATA_RECORD,
+        METASTORE_DATA_RECORD (LOGICAL),
 
         /** Exchange record. */
         EXCHANGE,
@@ -184,28 +187,57 @@ public abstract class WALRecord {
         RESERVED,
 
         /** Rotated id part record. */
-        ROTATED_ID_PART_RECORD,
+        ROTATED_ID_PART_RECORD (PHYSICAL),
 
         /** */
-        MVCC_DATA_PAGE_MARK_UPDATED_RECORD,
+        MVCC_DATA_PAGE_MARK_UPDATED_RECORD (PHYSICAL),
 
         /** */
-        MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD,
+        MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD (PHYSICAL),
 
         /** */
-        MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD,
+        MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD (PHYSICAL),
 
         /** Encrypted WAL-record. */
-        ENCRYPTED_RECORD,
+        ENCRYPTED_RECORD (PHYSICAL),
 
         /** Ecnrypted data record. */
-        ENCRYPTED_DATA_RECORD,
+        ENCRYPTED_DATA_RECORD (LOGICAL),
 
         /** Mvcc data record. */
-        MVCC_DATA_RECORD,
+        MVCC_DATA_RECORD (LOGICAL),
 
         /** Mvcc Tx state change record. */
-        MVCC_TX_RECORD;
+        MVCC_TX_RECORD (LOGICAL);
+
+        /**
+         * When you're adding a new record don't forget to choose record 
purpose explicitly
+         * if record is needed for physical or logical recovery.
+         * By default the purpose of record is {@link RecordPurpose#CUSTOM} 
and this record will not be used in recovery process.
+         * For more information read description of {@link RecordPurpose}.
+         */
+        private final RecordPurpose purpose;
+
+        /**
+         * @param purpose Purpose.
+         */
+        RecordType(RecordPurpose purpose) {
+            this.purpose = purpose;
+        }
+
+        /**
+         * Default constructor.
+         */
+        RecordType() {
+            this(CUSTOM);
+        }
+
+        /**
+         * @return Purpose of record.
+         */
+        public RecordPurpose purpose() {
+            return purpose;
+        }
 
         /** */
         private static final RecordType[] VALS = RecordType.values();
@@ -223,6 +255,37 @@ public abstract class WALRecord {
         public static final int STOP_ITERATION_RECORD_TYPE = 0;
     }
 
+    /**
+     * Record purposes set.
+     */
+    public enum RecordPurpose {
+        /**
+         * Internal records are needed for correct iterating over WAL 
structure.
+         * These records will never be returned to user during WAL iteration.
+         */
+        INTERNAL,
+        /**
+         * Physical records are needed for correct recovering physical state 
of {@link org.apache.ignite.internal.pagemem.PageMemory}.
+         * {@link 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager#restoreBinaryMemory(org.apache.ignite.lang.IgnitePredicate,
 org.apache.ignite.lang.IgniteBiPredicate)}.
+         */
+        PHYSICAL,
+        /**
+         * Logical records are needed to replay logical updates since last 
checkpoint.
+         * {@link 
GridCacheDatabaseSharedManager#applyLogicalUpdates(org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointStatus,
 org.apache.ignite.lang.IgnitePredicate, 
org.apache.ignite.lang.IgniteBiPredicate, boolean)}
+         */
+        LOGICAL,
+        /**
+         * Physical-logical records are used both for physical and logical 
recovery.
+         * Usually these records contain meta-information about partitions.
+         * NOTE: Not recommend to use this type without strong reason.
+         */
+        MIXED,
+        /**
+         * Custom records are needed for any custom iterations over WAL in 
various components.
+         */
+        CUSTOM
+    }
+
     /** */
     private int size;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index fd56262..73c5fbb 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -175,6 +176,7 @@ import static 
org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
 import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
 
 /**
@@ -724,11 +726,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             try {
                 dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().start();
 
-                performBinaryMemoryRestore(status, g -> 
MetaStorage.METASTORAGE_CACHE_ID == g, false);
+                performBinaryMemoryRestore(status, onlyMetastorageGroup(), 
physicalRecords(), false);
 
                 metaStorage = createMetastorage(true);
 
-                applyLogicalUpdates(status, g -> 
MetaStorage.METASTORAGE_CACHE_ID == g, false, true);
+                applyLogicalUpdates(status, onlyMetastorageGroup(), 
onlyMetastorageRecords(), false);
 
                 fillWalDisabledGroups();
 
@@ -911,10 +913,14 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
     /**
      * @param cacheGroupsPredicate Cache groups to restore.
+     * @param recordTypePredicate Filter records by type.
      * @return Last seen WAL pointer during binary memory recovery.
      * @throws IgniteCheckedException If failed.
      */
-    private RestoreBinaryState restoreBinaryMemory(Predicate<Integer> 
cacheGroupsPredicate) throws IgniteCheckedException {
+    private RestoreBinaryState restoreBinaryMemory(
+        IgnitePredicate<Integer> cacheGroupsPredicate,
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate
+    ) throws IgniteCheckedException {
         long time = System.currentTimeMillis();
 
         try {
@@ -927,9 +933,14 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
             // First, bring memory to the last consistent checkpoint state if 
needed.
             // This method should return a pointer to the last valid record in 
the WAL.
-            RestoreBinaryState binaryState = 
performBinaryMemoryRestore(status, cacheGroupsPredicate, true);
+            RestoreBinaryState binaryState = performBinaryMemoryRestore(
+                status,
+                cacheGroupsPredicate,
+                recordTypePredicate,
+                true
+            );
 
-            WALPointer restored = binaryState.lastReadRecordPointer();
+            WALPointer restored = 
binaryState.lastReadRecordPointer().map(FileWALPointer::next).orElse(null);
 
             if (restored == null && 
!status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
                 throw new StorageException("The memory cannot be restored. The 
critical part of WAL archive is missing " +
@@ -1918,7 +1929,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
             // Restore binary memory for all not WAL disabled cache groups.
             restoreBinaryMemory(
-                    g -> !initiallyGlobalWalDisabledGrps.contains(g) && 
!initiallyLocalWalDisabledGrps.contains(g)
+                groupsWithEnabledWal(),
+                physicalRecords()
             );
 
             if (recoveryVerboseLogging && log.isInfoEnabled()) {
@@ -1930,10 +1942,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             CheckpointStatus status = readCheckpointStatus();
 
             RestoreLogicalState logicalState = applyLogicalUpdates(
-                    status,
-                    g -> !initiallyGlobalWalDisabledGrps.contains(g) && 
!initiallyLocalWalDisabledGrps.contains(g),
-                    true,
-                    false
+                status,
+                groupsWithEnabledWal(),
+                logicalRecords(),
+                true
             );
 
             if (recoveryVerboseLogging && log.isInfoEnabled()) {
@@ -1942,7 +1954,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 dumpPartitionsInfo(cctx, log);
             }
 
-            walTail = tailPointer(logicalState.lastRead);
+            walTail = 
tailPointer(logicalState.lastReadRecordPointer().orElse(null));
 
             cctx.wal().onDeActivate(kctx);
         }
@@ -2002,20 +2014,21 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      * @throws IgniteCheckedException If failed.
      */
     private WALPointer tailPointer(WALPointer from) throws 
IgniteCheckedException {
-        WALPointer lastRead = from;
+        WALIterator it = cctx.wal().replay(from);
 
-        try (WALIterator it = cctx.wal().replay(from)) {
+        try {
             while (it.hasNextX()) {
                 IgniteBiTuple<WALPointer, WALRecord> rec = it.nextX();
 
                 if (rec == null)
                     break;
-
-                lastRead = rec.get1();
             }
         }
+        finally {
+            it.close();
+        }
 
-        return lastRead != null ? lastRead.next() : null;
+        return it.lastRead().map(WALPointer::next).orElse(null);
     }
 
     /**
@@ -2051,7 +2064,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      */
     private RestoreBinaryState performBinaryMemoryRestore(
         CheckpointStatus status,
-        Predicate<Integer> cacheGroupsPredicate,
+        IgnitePredicate<Integer> cacheGroupsPredicate,
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordTypePredicate,
         boolean finalizeState
     ) throws IgniteCheckedException {
         if (log.isInfoEnabled())
@@ -2074,13 +2088,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         long lastArchivedSegment = cctx.wal().lastArchivedSegment();
 
-        RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, 
lastArchivedSegment, cacheGroupsPredicate);
+        WALIterator it = cctx.wal().replay(status.endPtr, recordTypePredicate);
+
+        RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, 
it, lastArchivedSegment, cacheGroupsPredicate);
 
         int applied = 0;
 
-        try (WALIterator it = cctx.wal().replay(status.endPtr)) {
+        try {
             while (it.hasNextX()) {
-                WALRecord rec = restoreBinaryState.next(it);
+                WALRecord rec = restoreBinaryState.next();
 
                 if (rec == null)
                     break;
@@ -2194,11 +2210,14 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 }
             }
         }
+        finally {
+            it.close();
+        }
 
         if (!finalizeState)
             return null;
 
-        FileWALPointer lastReadPtr = 
restoreBinaryState.lastReadRecordPointer();
+        FileWALPointer lastReadPtr = 
restoreBinaryState.lastReadRecordPointer().orElse(null);
 
         if (status.needRestoreMemory()) {
             if (restoreBinaryState.needApplyBinaryUpdate())
@@ -2215,9 +2234,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         cpHistory.initialize(retreiveHistory());
 
-        // Move pointer position to the end of last read record.
-        restoreBinaryState.lastRead = lastReadPtr != null ? lastReadPtr.next() 
: lastReadPtr;
-
         return restoreBinaryState;
     }
 
@@ -2275,7 +2291,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 switch (rec.type()) {
                     case MVCC_DATA_RECORD:
-                        case DATA_RECORD:
+                    case DATA_RECORD:
                         checkpointReadLock();
 
                         try {
@@ -2311,24 +2327,24 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                         break;
 
-                        case MVCC_TX_RECORD:
-                            checkpointReadLock();
+                    case MVCC_TX_RECORD:
+                        checkpointReadLock();
 
-                            try {
-                                MvccTxRecord txRecord = (MvccTxRecord)rec;
+                        try {
+                            MvccTxRecord txRecord = (MvccTxRecord)rec;
 
-                                byte txState = 
convertToTxState(txRecord.state());
+                            byte txState = convertToTxState(txRecord.state());
 
-                                
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
-                            }
-                            catch (IgniteCheckedException e) {
-                                throw new IgniteException(e);
-                            }
-                            finally {
-                                checkpointReadUnlock();
-                            }
+                            
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            checkpointReadUnlock();
+                        }
 
-                            break;
+                        break;
 
                     default:
                         // Skip other records.
@@ -2344,9 +2360,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      */
     private RestoreLogicalState applyLogicalUpdates(
         CheckpointStatus status,
-        Predicate<Integer> cacheGroupsPredicate,
-        boolean skipFieldLookup,
-        boolean metaStoreOnly
+        IgnitePredicate<Integer> cacheGroupsPredicate,
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordTypePredicate,
+        boolean skipFieldLookup
     ) throws IgniteCheckedException {
         if (log.isInfoEnabled())
             log.info("Applying lost cache updates since last checkpoint record 
[lastMarked="
@@ -2355,17 +2371,19 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (skipFieldLookup)
             cctx.kernalContext().query().skipFieldLookup(true);
 
-        long lastArchivedSegment = cctx.wal().lastArchivedSegment();
-
-        RestoreLogicalState restoreLogicalState = new 
RestoreLogicalState(lastArchivedSegment, cacheGroupsPredicate);
-
         long start = U.currentTimeMillis();
 
         int applied = 0;
 
-        try (WALIterator it = cctx.wal().replay(status.startPtr)) {
+        long lastArchivedSegment = cctx.wal().lastArchivedSegment();
+
+        WALIterator it = cctx.wal().replay(status.startPtr, 
recordTypePredicate);
+
+        RestoreLogicalState restoreLogicalState = new RestoreLogicalState(it, 
lastArchivedSegment, cacheGroupsPredicate);
+
+        try {
             while (it.hasNextX()) {
-                WALRecord rec = restoreLogicalState.next(it);
+                WALRecord rec = restoreLogicalState.next();
 
                 if (rec == null)
                     break;
@@ -2373,6 +2391,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 switch (rec.type()) {
                     case MVCC_DATA_RECORD:
                     case DATA_RECORD:
+                    case ENCRYPTED_DATA_RECORD:
                         DataRecord dataRec = (DataRecord)rec;
 
                         for (DataEntry dataEntry : dataRec.writeEntries()) {
@@ -2393,6 +2412,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                         break;
 
+                    case MVCC_TX_RECORD:
+                        MvccTxRecord txRecord = (MvccTxRecord)rec;
+
+                        byte txState = convertToTxState(txRecord.state());
+
+                        
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
+
+                        break;
+
                     case PART_META_UPDATE_STATE:
                         PartitionMetaStateRecord metaStateRecord = 
(PartitionMetaStateRecord)rec;
 
@@ -2418,6 +2446,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
                     case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
                     case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
+                    case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
                         PageDeltaRecord rec0 = (PageDeltaRecord) rec;
 
                         PageMemoryEx pageMem = 
getPageMemoryForCacheGroup(rec0.groupId());
@@ -2443,24 +2472,14 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                         break;
 
-                    case MVCC_TX_RECORD:
-                        if (metaStoreOnly)
-                            continue;
-
-                        MvccTxRecord txRecord = (MvccTxRecord)rec;
-
-                        byte txState = convertToTxState(txRecord.state());
-
-                        
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
-
-                        break;
-
                     default:
                         // Skip other records.
                 }
             }
         }
         finally {
+            it.close();
+
             if (skipFieldLookup)
                 cctx.kernalContext().query().skipFieldLookup(false);
         }
@@ -4660,6 +4679,18 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         assert pageStore != null : "Persistent cache should have initialize 
page store manager.";
 
         for (int p = 0; p < grp.affinity().partitions(); p++) {
+            if (grp.topology().localPartition(p) != null) {
+                GridDhtLocalPartition part = grp.topology().localPartition(p);
+
+                log.info("Partition [grp=" + grp.cacheOrGroupName()
+                    + ", id=" + p
+                    + ", state=" + part.state()
+                    + ", counter=" + part.updateCounter()
+                    + ", size=" + part.fullSize() + "]");
+
+                continue;
+            }
+
             if (!pageStore.exists(grp.groupId(), p))
                 continue;
 
@@ -4707,13 +4738,16 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      * Recovery lifecycle for read-write metastorage.
      */
     private class MetastorageRecoveryLifecycle implements 
DatabaseLifecycleListener {
+        /** {@inheritDoc} */
         @Override public void 
beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws 
IgniteCheckedException {
             cctx.pageStore().initializeForMetastorage();
         }
 
+        /** {@inheritDoc} */
         @Override public void afterBinaryMemoryRestore(
             IgniteCacheDatabaseSharedManager mgr,
-            RestoreBinaryState restoreState) throws IgniteCheckedException {
+            RestoreBinaryState restoreState
+        ) throws IgniteCheckedException {
             assert metaStorage == null;
 
             metaStorage = createMetastorage(false);
@@ -4721,44 +4755,84 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
-     * Abstract class for create restore context.
+     * @return Cache group predicate that passes only Metastorage cache group 
id.
+     */
+    private IgnitePredicate<Integer> onlyMetastorageGroup() {
+        return groupId -> MetaStorage.METASTORAGE_CACHE_ID == groupId;
+    }
+
+    /**
+     * @return Cache group predicate that passes only cache groups with 
enabled WAL.
+     */
+    private IgnitePredicate<Integer> groupsWithEnabledWal() {
+        return groupId -> !initiallyGlobalWalDisabledGrps.contains(groupId)
+            && !initiallyLocalWalDisabledGrps.contains(groupId);
+    }
+
+    /**
+     * @return WAL records predicate that passes only Metastorage data records.
+     */
+    private IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
onlyMetastorageRecords() {
+        return (type, ptr) -> type == METASTORE_DATA_RECORD;
+    }
+
+    /**
+     * @return WAL records predicate that passes only physical and mixed WAL 
records.
+     */
+    private IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
physicalRecords() {
+        return (type, ptr) -> type.purpose() == 
WALRecord.RecordPurpose.PHYSICAL
+            || type.purpose() == WALRecord.RecordPurpose.MIXED;
+    }
+
+    /**
+     * @return WAL records predicate that passes only logical and mixed WAL 
records.
+     */
+    private IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
logicalRecords() {
+        return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.LOGICAL
+            || type.purpose() == WALRecord.RecordPurpose.MIXED;
+    }
+
+    /**
+     * Abstract class to create restore context.
      */
     private abstract class RestoreStateContext {
         /** Last archived segment. */
         protected final long lastArchivedSegment;
 
-        /** Last read record WAL pointer. */
-        protected FileWALPointer lastRead;
+        /** WAL iterator. */
+        private final WALIterator iterator;
 
         /** Only {@link WalRecordCacheGroupAware} records satisfied this 
predicate will be applied. */
-        private final Predicate<Integer> cacheGroupPredicate;
-
-        /** Set to {@code true} if data records should be skipped. */
-        private final boolean skipDataRecords;
+        private final IgnitePredicate<Integer> cacheGroupPredicate;
 
         /**
+         * @param iterator WAL iterator.
          * @param lastArchivedSegment Last archived segment index.
+         * @param cacheGroupPredicate Cache groups predicate.
          */
-        public RestoreStateContext(long lastArchivedSegment, 
Predicate<Integer> cacheGroupPredicate, boolean skipDataRecords) {
+        protected RestoreStateContext(
+            WALIterator iterator,
+            long lastArchivedSegment,
+            IgnitePredicate<Integer> cacheGroupPredicate
+        ) {
+            this.iterator = iterator;
             this.lastArchivedSegment = lastArchivedSegment;
             this.cacheGroupPredicate = cacheGroupPredicate;
-            this.skipDataRecords = skipDataRecords;
         }
 
         /**
          * Advance iterator to the next record.
          *
-         * @param it WAL iterator.
          * @return WALRecord entry.
          * @throws IgniteCheckedException If CRC check fail during binary 
recovery state or another exception occurring.
          */
-        public WALRecord next(WALIterator it) throws IgniteCheckedException {
+        public WALRecord next() throws IgniteCheckedException {
             try {
                 for (;;) {
-                    if (!it.hasNextX())
+                    if (!iterator.hasNextX())
                         return null;
 
-                    IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+                    IgniteBiTuple<WALPointer, WALRecord> tup = 
iterator.nextX();
 
                     if (tup == null)
                         return null;
@@ -4767,45 +4841,19 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                     WALPointer ptr = tup.get1();
 
-                    lastRead = (FileWALPointer)ptr;
-
                     rec.position(ptr);
 
-                    // Filter out records.
+                    // Filter out records by group id.
                     if (rec instanceof WalRecordCacheGroupAware) {
-                        WalRecordCacheGroupAware groupAwareRecord = 
(WalRecordCacheGroupAware) rec;
+                        WalRecordCacheGroupAware grpAwareRecord = 
(WalRecordCacheGroupAware) rec;
 
-                        if 
(!cacheGroupPredicate.test(groupAwareRecord.groupId()))
+                        if 
(!cacheGroupPredicate.apply(grpAwareRecord.groupId()))
                             continue;
                     }
 
-                    switch (rec.type()) {
-                        case METASTORE_DATA_RECORD:
-                        case MVCC_DATA_RECORD:
-                        case DATA_RECORD:
-                            if (skipDataRecords)
-                                continue;
-
-                            if (rec instanceof DataRecord) {
-                                DataRecord dataRecord = (DataRecord) rec;
-
-                                // Filter data entries by group id.
-                                List<DataEntry> filteredEntries = 
dataRecord.writeEntries().stream()
-                                        .filter(entry -> {
-                                            int cacheId = entry.cacheId();
-
-                                            return cctx.cacheContext(cacheId) 
!= null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId());
-                                        })
-                                        .collect(Collectors.toList());
-
-                                dataRecord.setWriteEntries(filteredEntries);
-                            }
-
-                            break;
-
-                        default:
-                            break;
-                    }
+                    // Filter out data entries by group id.
+                    if (rec instanceof DataRecord)
+                        rec = filterEntriesByGroupId((DataRecord) rec);
 
                     return rec;
                 }
@@ -4827,21 +4875,37 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
 
         /**
+         * Filter outs data entries from given data record that not satisfy 
{@link #cacheGroupPredicate}.
+         *
+         * @param record Original data record.
+         * @return Data record with filtered data entries.
+         */
+        private DataRecord filterEntriesByGroupId(DataRecord record) {
+            List<DataEntry> filteredEntries = record.writeEntries().stream()
+                .filter(entry -> {
+                    int cacheId = entry.cacheId();
+
+                    return cctx.cacheContext(cacheId) != null && 
cacheGroupPredicate.apply(cctx.cacheContext(cacheId).groupId());
+                })
+                .collect(Collectors.toList());
+
+            return record.setWriteEntries(filteredEntries);
+        }
+
+        /**
          *
          * @return Last read WAL record pointer.
          */
-        public FileWALPointer lastReadRecordPointer() {
-            return lastRead;
+        public Optional<FileWALPointer> lastReadRecordPointer() {
+            return iterator.lastRead().map(ptr -> (FileWALPointer)ptr);
         }
 
         /**
          *
          * @return Flag indicates need throws CRC exception or not.
          */
-        public boolean throwsCRCError(){
-            FileWALPointer lastReadPtr = lastRead;
-
-            return lastReadPtr != null && lastReadPtr.index() <= 
lastArchivedSegment;
+        public boolean throwsCRCError() {
+            return lastReadRecordPointer().filter(ptr -> ptr.index() <= 
lastArchivedSegment).isPresent();
         }
     }
 
@@ -4857,10 +4921,17 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         /**
          * @param status Checkpoint status.
+         * @param iterator WAL iterator.
          * @param lastArchivedSegment Last archived segment index.
+         * @param cacheGroupsPredicate Cache groups predicate.
          */
-        public RestoreBinaryState(CheckpointStatus status, long 
lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) {
-            super(lastArchivedSegment, cacheGroupsPredicate, true);
+        public RestoreBinaryState(
+            CheckpointStatus status,
+            WALIterator iterator,
+            long lastArchivedSegment,
+            IgnitePredicate<Integer> cacheGroupsPredicate
+        ) {
+            super(iterator, lastArchivedSegment, cacheGroupsPredicate);
 
             this.status = status;
             this.needApplyBinaryUpdates = status.needRestoreMemory();
@@ -4869,12 +4940,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         /**
          * Advance iterator to the next record.
          *
-         * @param it WAL iterator.
          * @return WALRecord entry.
          * @throws IgniteCheckedException If CRC check fail during binary 
recovery state or another exception occurring.
          */
-        @Override public WALRecord next(WALIterator it) throws 
IgniteCheckedException {
-            WALRecord rec = super.next(it);
+        @Override public WALRecord next() throws IgniteCheckedException {
+            WALRecord rec = super.next();
 
             if (rec == null)
                 return null;
@@ -4911,7 +4981,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
          */
         @Override public boolean throwsCRCError() {
             log.info("Throws CRC error check [needApplyBinaryUpdates=" + 
needApplyBinaryUpdates +
-                ", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" 
+ lastRead + ']');
+                ", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" 
+ lastReadRecordPointer() + ']');
 
             if (needApplyBinaryUpdates)
                 return true;
@@ -4930,8 +5000,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         /**
          * @param lastArchivedSegment Last archived segment index.
          */
-        public RestoreLogicalState(long lastArchivedSegment, 
Predicate<Integer> cacheGroupsPredicate) {
-            super(lastArchivedSegment, cacheGroupsPredicate, false);
+        public RestoreLogicalState(WALIterator iterator, long 
lastArchivedSegment, IgnitePredicate<Integer> cacheGroupsPredicate) {
+            super(iterator, lastArchivedSegment, cacheGroupsPredicate);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 04255c0..f78428d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -194,6 +194,17 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             }
         }
 
+        syncMetadata(execSvc, ctx, needSnapshot);
+    }
+
+    /**
+     * Syncs and saves meta-information of all data structures to page memory.
+     *
+     * @param execSvc Executor service to run save process
+     * @param ctx Checkpoint listener context.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void syncMetadata(Executor execSvc, Context ctx, boolean 
needSnapshot) throws IgniteCheckedException {
         if (execSvc == null) {
             reuseList.saveMetadata();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 3cbe577..f37b154 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteOrder;
+import java.util.Optional;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
@@ -92,6 +93,9 @@ public abstract class AbstractWalRecordsIterator
     /** Factory to provide I/O interfaces for read primitives with files. */
     private final SegmentFileInputFactory segmentFileInputFactory;
 
+    /** Position of last read valid record. */
+    private WALPointer lastRead;
+
     /**
      * @param log Logger.
      * @param sharedCtx Shared context.
@@ -154,6 +158,8 @@ public abstract class AbstractWalRecordsIterator
                 curRec = advanceRecord(currWalSegment);
 
                 if (curRec != null) {
+                    lastRead = curRec.get1();
+
                     if (curRec.get2().type() == null)
                         continue; // Record was skipped by filter of current 
serializer, should read next record.
 
@@ -183,6 +189,11 @@ public abstract class AbstractWalRecordsIterator
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Optional<WALPointer> lastRead() {
+        return Optional.ofNullable(lastRead);
+    }
+
     /**
      * @param tailReachedException Tail reached exception.
      * @param currWalSegment Current WAL segment read handler.

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index fad1ec1..addbbd3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -114,6 +114,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -839,6 +840,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public WALIterator replay(WALPointer start) throws 
IgniteCheckedException, StorageException {
+        return replay(start, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALIterator replay(
+        WALPointer start,
+        @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordDeserializeFilter
+    ) throws IgniteCheckedException, StorageException {
         assert start == null || start instanceof FileWALPointer : "Invalid 
start pointer: " + start;
 
         FileWriteHandle hnd = currentHandle();
@@ -855,7 +864,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             (FileWALPointer)start,
             end,
             dsCfg,
-            new RecordSerializerFactoryImpl(cctx),
+            new 
RecordSerializerFactoryImpl(cctx).recordDeserializeFilter(recordDeserializeFilter),
             ioFactory,
             archiver,
             decompressor,

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
index c149817..96b78e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
@@ -22,6 +22,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -34,7 +35,7 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
     private boolean needWritePointer;
 
     /** Read record filter. */
-    private IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordDeserializeFilter;
+    private @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordDeserializeFilter;
 
     /**
      * Marshalled mode flag.
@@ -56,7 +57,7 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
      */
     public RecordSerializerFactoryImpl(
         GridCacheSharedContext cctx,
-        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter
+        @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
readTypeFilter
     ) {
         this.cctx = cctx;
         this.recordDeserializeFilter = readTypeFilter;
@@ -114,7 +115,7 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
 
     /** {@inheritDoc} */
     @Override public RecordSerializerFactoryImpl recordDeserializeFilter(
-        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter
+        @Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
readTypeFilter
     ) {
         this.recordDeserializeFilter = readTypeFilter;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index ee5a1e2..5b36d9e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -144,7 +144,8 @@ public class RecordV1Serializer implements RecordSerializer 
{
 
             rec.position(ptr);
 
-            if (recordFilter != null && !recordFilter.apply(rec.type(), ptr))
+            if (recType.purpose() != WALRecord.RecordPurpose.INTERNAL
+                && recordFilter != null && !recordFilter.apply(rec.type(), 
ptr))
                 return FilteredRecord.INSTANCE;
             else if (marshalledMode) {
                 ByteBuffer buf = heapTlb.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index d27a331..0d78d08 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -122,7 +122,8 @@ public class RecordV2Serializer implements RecordSerializer 
{
                     ", expected pointer [idx=" + exp.index() + ", offset=" + 
exp.fileOffset() + "]");
             }
 
-            if (recordFilter != null && !recordFilter.apply(recType, ptr)) {
+            if (recType.purpose() != WALRecord.RecordPurpose.INTERNAL
+                && recordFilter != null && !recordFilter.apply(recType, ptr)) {
                 int toSkip = ptr.length() - REC_TYPE_SIZE - 
FILE_WAL_POINTER_SIZE - CRC_SIZE;
 
                 assert toSkip >= 0 : "Too small saved record length: " + ptr;

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
index c629420..b919e41 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
@@ -49,6 +50,8 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
@@ -261,7 +264,7 @@ public class IgniteLogicalRecoveryTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testRecoveryWithMvccCaches() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-10052";);
+        fail("https://issues.apache.org/jira/browse/IGNITE-10582";);
 
         List<CacheConfiguration> dynamicCaches = Lists.newArrayList(
             cacheConfiguration(DYNAMIC_CACHE_PREFIX + 0, 
CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT),
@@ -409,13 +412,21 @@ public class IgniteLogicalRecoveryTest extends 
GridCommonAbstractTest {
 
         List<Ignite> nodes = G.allGrids();
 
-        for (Ignite node : nodes) {
+        for (final Ignite node : nodes) {
             TestRecordingCommunicationSpi spi = 
TestRecordingCommunicationSpi.spi(node);
 
+            Set<Integer> mvccCaches = ((IgniteEx) 
node).context().cache().cacheGroups().stream()
+                .flatMap(group -> group.caches().stream())
+                .filter(cache -> cache.config().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+                .map(GridCacheContext::groupId)
+                .collect(Collectors.toSet());
+
             List<Integer> rebalancedGroups = 
spi.recordedMessages(true).stream()
                 .map(msg -> (GridDhtPartitionDemandMessage) msg)
-                .map(msg -> msg.groupId())
+                .map(GridCacheGroupIdMessage::groupId)
                 .filter(grpId -> grpId != sysCacheGroupId)
+                //TODO: remove following filter when failover for MVCC will be 
fixed.
+                .filter(grpId -> !mvccCaches.contains(grpId))
                 .distinct()
                 .collect(Collectors.toList());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27786a05/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index ea3ed2f..8d854bb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -26,7 +26,9 @@ import 
org.apache.ignite.internal.pagemem.wal.record.RolloverType;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -73,6 +75,11 @@ public class NoOpWALManager implements 
IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
+    @Override public WALIterator replay(WALPointer start, @Nullable 
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter) 
throws IgniteCheckedException, StorageException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserve(WALPointer start) {
         return false;
     }

Reply via email to