This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch cache_dumps
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/cache_dumps by this push:
     new 8018473c5f9 IGNITE-20472 Dump API implemented (#10953)
8018473c5f9 is described below

commit 8018473c5f918c8a9e7865b655e182c5226544e8
Author: Nikolay <[email protected]>
AuthorDate: Fri Oct 6 10:26:43 2023 +0300

    IGNITE-20472 Dump API implemented (#10953)
---
 .../java/org/apache/ignite/dump/DumpConsumer.java  |  83 ++++++
 .../persistence/snapshot => }/dump/DumpEntry.java  |  16 +-
 .../java/org/apache/ignite/dump/DumpReader.java    | 210 ++++++++++++++
 .../ignite/dump/DumpReaderConfiguration.java       | 112 ++++++++
 .../org/apache/ignite/internal/cdc/CdcMain.java    |  57 ++--
 .../org/apache/ignite/internal/cdc/CdcUtils.java   |  60 ++++
 .../pagemem/wal/record/UnwrapDataEntry.java        |  48 ++--
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |  40 +--
 .../snapshot/dump/CreateDumpFutureTask.java        |  57 ++--
 .../cache/persistence/snapshot/dump/Dump.java      | 119 ++++++--
 .../snapshot/dump/DumpEntrySerializer.java         |  27 +-
 .../wal/reader/IgniteWalIteratorFactory.java       |  13 +-
 .../wal/reader/StandaloneGridKernalContext.java    |  18 ++
 .../snapshot/dump/AbstractCacheDumpTest.java       | 306 +++++++++++++++------
 .../snapshot/dump/IgniteCacheDumpSelfTest.java     |   9 +-
 .../dump/IgniteConcurrentCacheDumpTest.java        |   4 +-
 .../junits/GridTestKernalContext.java              |   5 +-
 .../ignite/internal/dump/DumpCacheConfigTest.java  | 108 +++++---
 18 files changed, 1031 insertions(+), 261 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/dump/DumpConsumer.java 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpConsumer.java
new file mode 100644
index 00000000000..aec3a02c997
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpConsumer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.dump;
+
+import java.util.Iterator;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Consumer of {@link Dump}.
+ * This consumer will receive all {@link DumpEntry} stored in cache dump 
during {@code IgniteDumpReader} application invocation.
+ * The lifecycle of the consumer is the following:
+ * <ul>
+ *     <li>Start of the consumer {@link #start()}.</li>
+ *     <li>Stop of the consumer {@link #stop()}.</li>
+ * </ul>
+ *
+ */
+@IgniteExperimental
+public interface DumpConsumer {
+    /**
+     * Starts the consumer.
+     */
+    void start();
+
+    /**
+     * Handles type mappings.
+     * @param mappings Mappings iterator.
+     */
+    void onMappings(Iterator<TypeMapping> mappings);
+
+    /**
+     * Handles binary types.
+     * @param types Binary types iterator.
+     */
+    void onTypes(Iterator<BinaryType> types);
+
+    /**
+     * Handles cache configs.
+     * Note, there can be several copies of cache config in the dump.
+     * This can happen if dump contains data from several nodes.
+     * @param caches Stored cache data.
+     */
+    void onCacheConfigs(Iterator<StoredCacheData> caches);
+
+    /**
+     * Handles cache data.
+     * This method can be invoced by several threads concurrently.
+     * Note, there can be several copies of group partition in the dump.
+     * This can happen if dump contains data from several nodes.
+     * In this case callback will be invoked several time for the same pair of 
[grp, part] values.
+     *
+     * @param grp Group id.
+     * @param part Partition.
+     * @param data Cache data iterator.
+     * @see DumpReaderConfiguration#threadCount()
+     */
+    void onPartition(int grp, int part, Iterator<DumpEntry> data);
+
+    /**
+     * Stops the consumer.
+     * This method can be invoked only after {@link #start()}.
+     */
+    void stop();
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntry.java
 b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
similarity index 71%
rename from 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntry.java
rename to modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
index 5054c97806c..ca34f3cbace 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
@@ -15,14 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+package org.apache.ignite.dump;
 
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import java.util.Iterator;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
+import org.apache.ignite.lang.IgniteExperimental;
 
 /**
  * Single cache entry from dump.
+ *
+ * @see Dump#iterator(String, int, int)
+ * @see DumpConsumer#onPartition(int, int, Iterator)
+ * @see org.apache.ignite.IgniteSnapshot#createDump(String)
  */
+@IgniteExperimental
 public interface DumpEntry {
     /** @return Cache id. */
     public int cacheId();
@@ -31,8 +37,8 @@ public interface DumpEntry {
     public long expireTime();
 
     /** @return Key. */
-    public KeyCacheObject key();
+    public Object key();
 
     /** @return Value. */
-    public CacheObject value();
+    public Object value();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
new file mode 100644
index 00000000000..37923316617
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.dump;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridLoggerProxy;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.cdc.CdcMain;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_MARSHALLER_PATH;
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+
+/**
+ * Dump Reader application.
+ * The application runs independently of Ignite node process and provides the 
ability to the {@link DumpConsumer} to consume
+ * all data stored in cache dump ({@link Dump})
+ */
+public class DumpReader implements Runnable {
+    /** Configuration. */
+    private final DumpReaderConfiguration cfg;
+
+    /** Log. */
+    private final IgniteLogger log;
+
+    /**
+     * @param cfg Dump reader configuration.
+     * @param log Logger.
+     */
+    public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
+        this.cfg = cfg;
+        this.log = log.getLogger(DumpReader.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        ackAsciiLogo();
+
+        try (Dump dump = new Dump(cfg.dumpRoot(), cfg.keepBinary(), false, 
log)) {
+            DumpConsumer cnsmr = cfg.consumer();
+
+            cnsmr.start();
+
+            try {
+                File[] files = new File(cfg.dumpRoot(), 
DFLT_MARSHALLER_PATH).listFiles(BinaryUtils::notTmpFile);
+
+                if (files != null)
+                    cnsmr.onMappings(CdcMain.typeMappingIterator(files, tm -> 
true));
+
+                cnsmr.onTypes(dump.types());
+
+                Map<Integer, List<String>> grpToNodes = new HashMap<>();
+
+                for (SnapshotMetadata meta : dump.metadata()) {
+                    for (Integer grp : meta.cacheGroupIds())
+                        grpToNodes.computeIfAbsent(grp, key -> new 
ArrayList<>()).add(meta.folderName());
+                }
+
+                cnsmr.onCacheConfigs(grpToNodes.entrySet().stream()
+                    .flatMap(e -> dump.configs(F.first(e.getValue()), 
e.getKey()).stream())
+                    .iterator());
+
+                ExecutorService execSvc = cfg.threadCount() > 1 ? 
Executors.newFixedThreadPool(cfg.threadCount()) : null;
+
+                AtomicBoolean skip = new AtomicBoolean(false);
+
+                for (Map.Entry<Integer, List<String>> e : 
grpToNodes.entrySet()) {
+                    int grp = e.getKey();
+
+                    for (String node : e.getValue()) {
+                        for (int part : dump.partitions(node, grp)) {
+                            Runnable consumePart = () -> {
+                                if (skip.get()) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Skip partition due to 
previous error [node=" + node + ", grp=" + grp +
+                                            ", part=" + part + ']');
+                                    }
+
+                                    return;
+                                }
+
+                                try (DumpedPartitionIterator iter = 
dump.iterator(node, grp, part)) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Consuming partition [node=" 
+ node + ", grp=" + grp +
+                                            ", part=" + part + ']');
+                                    }
+
+                                    cnsmr.onPartition(grp, part, iter);
+                                }
+                                catch (Exception ex) {
+                                    skip.set(cfg.failFast());
+
+                                    log.error("Error consuming partition 
[node=" + node + ", grp=" + grp +
+                                        ", part=" + part + ']', ex);
+
+                                    throw new IgniteException(ex);
+                                }
+                            };
+
+                            if (cfg.threadCount() > 1)
+                                execSvc.submit(consumePart);
+                            else
+                                consumePart.run();
+                        }
+                    }
+                }
+
+                if (cfg.threadCount() > 1) {
+                    execSvc.shutdown();
+
+                    boolean res = 
execSvc.awaitTermination(cfg.timeout().toMillis(), MILLISECONDS);
+
+                    if (!res) {
+                        log.warning("Dump processing tasks not finished after 
timeout. Cancelling");
+
+                        execSvc.shutdownNow();
+                    }
+                }
+            }
+            finally {
+                cnsmr.stop();
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void ackAsciiLogo() {
+        String ver = "ver. " + ACK_VER_STR;
+
+        if (log.isInfoEnabled()) {
+            log.info(NL + NL +
+                ">>>    __________  ________________  ___  __  ____  ______    
___  _______   ___  _______" + NL +
+                ">>>   /  _/ ___/ |/ /  _/_  __/ __/ / _ \\/ / / /  |/  / _ \\ 
 / _ \\/ __/ _ | / _ \\/ __/ _ \\" + NL +
+                ">>>  _/ // (_ /    // /  / / / _/  / // / /_/ / /|_/ / ___/ / 
, _/ _// __ |/ // / _// , _/" + NL +
+                ">>> /___/\\___/_/|_/___/ /_/ /___/ /____/\\____/_/  /_/_/    
/_/|_/___/_/ |_/____/___/_/|_|" + NL +
+                ">>> " + NL +
+                ">>> " + ver + NL +
+                ">>> " + COPYRIGHT + NL +
+                ">>> " + NL +
+                ">>> Ignite documentation: " + "http://"; + SITE + NL +
+                ">>> ConsistentId: " + cfg.dumpRoot() + NL +
+                ">>> Consumer: " + U.toStringSafe(cfg.consumer())
+            );
+        }
+
+        if (log.isQuiet()) {
+            U.quiet(false,
+                "   __________  ________________  ___  __  ____  ______    ___ 
 _______   ___  _______",
+                "  /  _/ ___/ |/ /  _/_  __/ __/ / _ \\/ / / /  |/  / _ \\  / 
_ \\/ __/ _ | / _ \\/ __/ _ \\",
+                " _/ // (_ /    // /  / / / _/  / // / /_/ / /|_/ / ___/ / , 
_/ _// __ |/ // / _// , _/",
+                "/___/\\___/_/|_/___/ /_/ /___/ /____/\\____/_/  /_/_/    
/_/|_/___/_/ |_/____/___/_/|_|",
+                "",
+                ver,
+                COPYRIGHT,
+                "",
+                "Ignite documentation: " + "http://"; + SITE,
+                "Dump: " + cfg.dumpRoot(),
+                "Consumer: " + U.toStringSafe(cfg.consumer()),
+                "",
+                "Quiet mode.");
+
+            String fileName = log.fileName();
+
+            if (fileName != null)
+                U.quiet(false, "  ^-- Logging to file '" + fileName + '\'');
+
+            if (log instanceof GridLoggerProxy)
+                U.quiet(false, "  ^-- Logging by '" + 
((GridLoggerProxy)log).getLoggerInfo() + '\'');
+
+            U.quiet(false,
+                "  ^-- To see **FULL** console log here add 
-DIGNITE_QUIET=false or \"-v\" to ignite-cdc.{sh|bat}",
+                "");
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
new file mode 100644
index 00000000000..c967136fe81
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.dump;
+
+import java.io.File;
+import java.time.Duration;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Configuration class of {@link DumpReader}.
+ *
+ * @see DumpReader
+ * @see DumpConsumer
+ */
+@IgniteExperimental
+public class DumpReaderConfiguration {
+    /** Default timeout. */
+    public static final Duration DFLT_TIMEOUT = Duration.ofDays(7);
+
+    /** Default thread count. */
+    public static final int DFLT_THREAD_CNT = 1;
+
+    /** Root dump directory. */
+    private final File dir;
+
+    /** Dump consumer. */
+    private final DumpConsumer cnsmr;
+
+    /** Count of threads to consume dumped partitions. */
+    private final int thCnt;
+
+    /** Timeout of dump reader. 1 week, by default. */
+    private final Duration timeout;
+
+    /** Stop processing partitions if consumer fail to process one. */
+    private final boolean failFast;
+
+    /** If {@code true} then don't deserialize {@link KeyCacheObject} and 
{@link CacheObject}. */
+    private final boolean keepBinary;
+
+    /**
+     * @param dir Root dump directory.
+     * @param cnsmr Dump consumer.
+     */
+    public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
+        this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true);
+    }
+
+    /**
+     * @param dir Root dump directory.
+     * @param cnsmr Dump consumer.
+     * @param thCnt Count of threads to consume dumped partitions.
+     * @param timeout Timeout of dump reader invocation.
+     * @param failFast Stop processing partitions if consumer fail to process 
one.
+     * @param keepBinary If {@code true} then don't deserialize {@link 
KeyCacheObject} and {@link CacheObject}.
+     */
+    public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, 
Duration timeout, boolean failFast, boolean keepBinary) {
+        this.dir = dir;
+        this.cnsmr = cnsmr;
+        this.thCnt = thCnt;
+        this.timeout = timeout;
+        this.failFast = failFast;
+        this.keepBinary = keepBinary;
+    }
+
+    /** @return Root dump directiory. */
+    public File dumpRoot() {
+        return dir;
+    }
+
+    /** @return Dump consumer instance. */
+    public DumpConsumer consumer() {
+        return cnsmr;
+    }
+
+    /** @return Count of threads to consume dumped partitions. */
+    public int threadCount() {
+        return thCnt;
+    }
+
+    /** @return Timeout of dump reader invocation. */
+    public Duration timeout() {
+        return timeout;
+    }
+
+    /** @return {@code True} if stop processing after first consumer error. */
+    public boolean failFast() {
+        return failFast;
+    }
+
+    /** @return If {@code true} then don't deserialize {@link KeyCacheObject} 
and {@link CacheObject}. */
+    public boolean keepBinary() {
+        return keepBinary;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 1e73e75be22..da0e791a905 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -44,7 +45,6 @@ import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridLoggerProxy;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.MarshallerContextImpl;
@@ -82,6 +82,8 @@ import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 
 /**
@@ -328,8 +330,7 @@ public class CdcMain implements Runnable {
                 }
             }
             finally {
-                for (GridComponent comp : kctx)
-                    comp.stop(false);
+                closeAll(kctx);
 
                 if (log.isInfoEnabled())
                     log.info("Ignite Change Data Capture Application 
stopped.");
@@ -370,8 +371,7 @@ public class CdcMain implements Runnable {
 
         kctx.resource().setSpringContext(ctx);
 
-        for (GridComponent comp : kctx)
-            comp.start();
+        startAll(kctx);
 
         mreg = kctx.metric().registry("cdc");
 
@@ -647,27 +647,10 @@ public class CdcMain implements Runnable {
             if (files == null)
                 return;
 
-            Iterator<TypeMapping> changedMappings = Arrays.stream(files)
-                .map(f -> {
-                    String fileName = f.getName();
-
-                    int typeId = BinaryUtils.mappedTypeId(fileName);
-                    byte platformId = 
BinaryUtils.mappedFilePlatformId(fileName);
-
-                    T2<Integer, Byte> state = new T2<>(typeId, platformId);
-
-                    if (mappingsState.contains(state))
-                        return null;
-
-                    mappingsState.add(state);
-
-                    return (TypeMapping)new TypeMappingImpl(
-                        typeId,
-                        BinaryUtils.readMapping(f),
-                        platformId == 0 ? PlatformType.JAVA : 
PlatformType.DOTNET);
-                })
-                .filter(Objects::nonNull)
-                .iterator();
+            Iterator<TypeMapping> changedMappings = typeMappingIterator(
+                files,
+                tm -> mappingsState.add(new T2<>(tm.typeId(), 
(byte)tm.platformType().ordinal()))
+            );
 
             if (!changedMappings.hasNext())
                 return;
@@ -867,4 +850,26 @@ public class CdcMain implements Runnable {
     public static String cdcInstanceName(String igniteInstanceName) {
         return "cdc-" + igniteInstanceName;
     }
+
+    /**
+     * @param files Mapping files.
+     * @return Type mapping iterator.
+     */
+    public static Iterator<TypeMapping> typeMappingIterator(File[] files, 
Predicate<TypeMapping> filter) {
+        return Arrays.stream(files)
+            .map(f -> {
+                String fileName = f.getName();
+
+                int typeId = BinaryUtils.mappedTypeId(fileName);
+                byte platformId = BinaryUtils.mappedFilePlatformId(fileName);
+
+                return (TypeMapping)new TypeMappingImpl(
+                    typeId,
+                    BinaryUtils.readMapping(f),
+                    platformId == 0 ? PlatformType.JAVA : PlatformType.DOTNET);
+            })
+            .filter(filter)
+            .filter(Objects::nonNull)
+            .iterator();
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtils.java
new file mode 100644
index 00000000000..c53fc7fed98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+
+/**
+ * Methods to reuse various CDC like utilities.
+ */
+public class CdcUtils {
+    /**
+     * Register {@code meta}.
+     *
+     * @param ctx Binary context.
+     * @param log Logger.
+     * @param meta Binary metadata to register.
+     */
+    public static void registerBinaryMeta(BinaryContext ctx, IgniteLogger log, 
BinaryMetadata meta) {
+        ctx.updateMetadata(meta.typeId(), meta, false);
+
+        if (log.isInfoEnabled())
+            log.info("BinaryMeta [meta=" + meta + ']');
+    }
+
+    /**
+     * Register {@code mapping}.
+     *
+     * @param ctx Binary context.
+     * @param log Logger.
+     * @param mapping Type mapping to register.
+     */
+    public static void registerMapping(BinaryContext ctx, IgniteLogger log, 
TypeMapping mapping) {
+        assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;
+
+        byte platformType = (byte)mapping.platformType().ordinal();
+
+        ctx.registerUserClassName(mapping.typeId(), mapping.typeName(), false, 
false, platformType);
+
+        if (log.isInfoEnabled())
+            log.info("Mapping [mapping=" + mapping + ']');
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
index 216e7332fbd..f69f579084f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java
@@ -71,18 +71,7 @@ public class UnwrapDataEntry extends DataEntry implements 
UnwrappedDataEntry {
     /** {@inheritDoc} */
     @Override public Object unwrappedKey() {
         try {
-            if (keepBinary && key instanceof BinaryObject)
-                return key;
-
-            Object unwrapped = key.value(cacheObjValCtx, false);
-
-            if (unwrapped instanceof BinaryObject) {
-                if (keepBinary)
-                    return unwrapped;
-                unwrapped = ((BinaryObject)unwrapped).deserialize();
-            }
-
-            return unwrapped;
+            return unwrapKey(key, keepBinary, cacheObjValCtx);
         }
         catch (Exception e) {
             cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class)
@@ -95,13 +84,7 @@ public class UnwrapDataEntry extends DataEntry implements 
UnwrappedDataEntry {
     /** {@inheritDoc} */
     @Override public Object unwrappedValue() {
         try {
-            if (val == null)
-                return null;
-
-            if (keepBinary && val instanceof BinaryObject)
-                return val;
-
-            return val.value(cacheObjValCtx, false);
+            return unwrapValue(val, keepBinary, cacheObjValCtx);
         }
         catch (Exception e) {
             cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class)
@@ -110,6 +93,33 @@ public class UnwrapDataEntry extends DataEntry implements 
UnwrappedDataEntry {
         }
     }
 
+    /** */
+    public static Object unwrapKey(KeyCacheObject key, boolean keepBinary, 
CacheObjectValueContext cacheObjValCtx) {
+        if (keepBinary && key instanceof BinaryObject)
+            return key;
+
+        Object unwrapped = key.value(cacheObjValCtx, false);
+
+        if (unwrapped instanceof BinaryObject) {
+            if (keepBinary)
+                return unwrapped;
+            unwrapped = ((BinaryObject)unwrapped).deserialize();
+        }
+
+        return unwrapped;
+    }
+
+    /** */
+    public static Object unwrapValue(CacheObject val, boolean keepBinary, 
CacheObjectValueContext cacheObjValCtx) {
+        if (val == null)
+            return null;
+
+        if (keepBinary && val instanceof BinaryObject)
+            return val;
+
+        return val.value(cacheObjValCtx, false);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         SB sb = new SB();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 02be48a5620..c36917ee023 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -38,7 +38,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.management.cache.IdleVerifyResultV2;
 import org.apache.ignite.internal.management.cache.PartitionKeyV2;
@@ -46,14 +46,15 @@ import 
org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider
 import org.apache.ignite.internal.managers.encryption.GroupKey;
 import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
 import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpEntry;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
@@ -82,6 +83,8 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
 import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
 import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
 
@@ -197,8 +200,7 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
 
         EncryptionCacheKeyProvider snpEncrKeyProvider = new 
SnapshotEncryptionKeyProvider(cctx.kernalContext(), grpDirs);
 
-        for (GridComponent comp : snpCtx)
-            comp.start();
+        startAll(snpCtx);
 
         try {
             U.doInParallel(
@@ -311,8 +313,7 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
             throw t;
         }
         finally {
-            for (GridComponent comp : snpCtx)
-                comp.stop(true);
+            closeAll(snpCtx);
         }
 
         return res;
@@ -356,23 +357,12 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
     private Map<PartitionKeyV2, PartitionHashRecordV2> checkDumpFiles(
         SnapshotHandlerContext opCtx,
         Set<File> partFiles
-    ) throws IgniteCheckedException {
-        GridKernalContext snpCtx = 
cctx.snapshotMgr().createStandaloneKernalContext(
-            cctx.kernalContext().compress(),
-            opCtx.snapshotDirectory(),
-            opCtx.metadata().folderName()
-        );
-
-        for (GridComponent comp : snpCtx)
-            comp.start();
-
-        try {
-            Dump dump = new Dump(snpCtx, opCtx.snapshotDirectory());
-
+    ) {
+        try (Dump dump = new Dump(opCtx.snapshotDirectory(), true, true, log)) 
{
             Collection<PartitionHashRecordV2> partitionHashRecordV2s = 
U.doInParallel(
                 cctx.snapshotMgr().snapshotExecutorService(),
                 partFiles,
-                part -> caclucateDumpedPartitionHash(dump, 
cacheGroupName(part.getParentFile()), partId(part.getName()))
+                part -> calculateDumpedPartitionHash(dump, 
cacheGroupName(part.getParentFile()), partId(part.getName()))
             );
 
             return 
partitionHashRecordV2s.stream().collect(Collectors.toMap(PartitionHashRecordV2::partitionKey,
 r -> r));
@@ -380,16 +370,12 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
         catch (Throwable t) {
             log.error("Error executing handler: ", t);
 
-            throw t;
-        }
-        finally {
-            for (GridComponent comp : snpCtx)
-                comp.stop(true);
+            throw new IgniteException(t);
         }
     }
 
     /** */
-    private PartitionHashRecordV2 caclucateDumpedPartitionHash(Dump dump, 
String grpName, int part) {
+    private PartitionHashRecordV2 calculateDumpedPartitionHash(Dump dump, 
String grpName, int part) {
         if (skipHash()) {
             return new PartitionHashRecordV2(
                 new PartitionKeyV2(CU.cacheId(grpName), part, grpName),
@@ -413,7 +399,7 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
                 while (iter.hasNext()) {
                     DumpEntry e = iter.next();
 
-                    ctx.update(e.key(), e.value(), null);
+                    ctx.update((KeyCacheObject)e.key(), 
(CacheObject)e.value(), null);
 
                     size++;
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index e1f98c68404..f3bc78e457a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -36,10 +36,12 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -217,6 +219,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
         long start = System.currentTimeMillis();
 
         AtomicLong entriesCnt = new AtomicLong();
+        AtomicLong writtenEntriesCnt = new AtomicLong();
         AtomicLong changedEntriesCnt = new AtomicLong();
 
         String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();
@@ -228,6 +231,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
 
         List<CompletableFuture<Void>> futs = grpParts.stream().map(part -> 
runAsync(() -> {
             long entriesCnt0 = 0;
+            long writtenEntriesCnt0 = 0;
 
             try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
                 try (GridCloseableIterator<CacheDataRow> rows = 
gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
@@ -242,11 +246,14 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                         int cache = row.cacheId() == 0 ? grp : row.cacheId();
 
                         if (dumpCtx.writeForIterator(cache, row.expireTime(), 
row.key(), row.value(), row.version()))
-                            entriesCnt0++;
+                            writtenEntriesCnt0++;
+
+                        entriesCnt0++;
                     }
                 }
 
                 entriesCnt.addAndGet(entriesCnt0);
+                writtenEntriesCnt.addAndGet(writtenEntriesCnt0);
                 changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());
 
                 if (log.isDebugEnabled()) {
@@ -254,8 +261,9 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                         ", id=" + grp +
                         ", part=" + part +
                         ", time=" + (System.currentTimeMillis() - start) +
-                        ", iteratorEntriesCount=" + entriesCnt +
-                        ", changedEntriesCount=" + changedEntriesCnt + ']');
+                        ", iterEntriesCnt=" + entriesCnt +
+                        ", writtenIterEntriesCnt=" + entriesCnt +
+                        ", changedEntriesCnt=" + changedEntriesCnt + ']');
 
                 }
             }
@@ -270,8 +278,9 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                 log.info("Finish group dump [name=" + name +
                     ", id=" + grp +
                     ", time=" + (System.currentTimeMillis() - start) +
-                    ", iteratorEntriesCount=" + entriesCnt +
-                    ", changedEntriesCount=" + changedEntriesCnt + ']');
+                    ", iterEntriesCnt=" + entriesCnt.get() +
+                    ", writtenIterEntriesCnt=" + writtenEntriesCnt.get() +
+                    ", changedEntriesCnt=" + changedEntriesCnt.get() + ']');
             }
         });
 
@@ -281,9 +290,15 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
     /** {@inheritDoc} */
     @Override public void beforeChange(GridCacheContext cctx, KeyCacheObject 
key, CacheObject val, long expireTime, GridCacheVersion ver) {
         try {
-            assert key.partition() != -1;
+            int part = key.partition();
+            int grp = cctx.groupId();
+
+            assert part != -1;
 
-            dumpContext(cctx.groupId(), 
key.partition()).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
+            if (!parts.get(grp).contains(part))
+                return;
+
+            dumpContext(grp, part).writeChanged(cctx.cacheId(), expireTime, 
key, val, ver);
         }
         catch (IgniteException e) {
             acceptException(e);
@@ -369,6 +384,9 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
         /** Last version on time of dump start. Can be used only for primary. 
*/
         @Nullable final GridCacheVersion startVer;
 
+        /** Last version on time of dump start. Can be used only for primary. 
*/
+        final GridCacheVersion isolatedStreamerVer;
+
         /** Topology Version. */
         private final AffinityTopologyVersion topVer;
 
@@ -395,6 +413,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                 topVer = gctx.topology().lastTopologyChangeVersion();
 
                 startVer = grpPrimaries.get(gctx.groupId()).contains(part) ? 
gctx.shared().versions().last() : null;
+                isolatedStreamerVer = 
cctx.versions().isolatedStreamerVersion();
 
                 serdes = new DumpEntrySerializer(thLocBufs);
                 changed = new HashMap<>();
@@ -422,13 +441,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
          * @param val Value before change.
          * @param ver Version before change.
          */
-        public void writeChanged(
-            int cache,
-            long expireTime,
-            KeyCacheObject key,
-            CacheObject val,
-            GridCacheVersion ver
-        ) {
+        public void writeChanged(int cache, long expireTime, KeyCacheObject 
key, CacheObject val, GridCacheVersion ver) {
             String reasonToSkip = null;
 
             if (closed) // Quick exit. Partition already saved in dump.
@@ -439,7 +452,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                 try {
                     if (closed) // Partition already saved in dump.
                         reasonToSkip = "partition already saved";
-                    else if (startVer != null && ver.isGreater(startVer))
+                    else if (isAfterStart(ver))
                         reasonToSkip = "greater version";
                     else if (!changed.get(cache).add(key)) // Entry changed 
several time during dump.
                         reasonToSkip = "changed several times";
@@ -485,7 +498,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
         ) {
             boolean written = true;
 
-            if (startVer != null && ver.isGreater(startVer))
+            if (isAfterStart(ver))
                 written = false;
             else if (changed.get(cache).contains(key))
                 written = false;
@@ -519,6 +532,18 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
             }
         }
 
+        /**
+         * Note, usage of {@link IgniteDataStreamer} may lead to dump 
inconsistency.
+         * Because, streamer use the same {@link GridCacheVersion} for all 
entries.
+         * So, we can't efficiently filter out new entries and write them all 
to dump.
+         *
+         * @param ver Entry version.
+         * @return {@code True} if {@code ver} appeared after dump started.
+         */
+        private boolean isAfterStart(GridCacheVersion ver) {
+            return (startVer != null && ver.isGreater(startVer)) && 
!isolatedStreamerVer.equals(ver);
+        }
+
         /** {@inheritDoc} */
         @Override public void close() {
             synchronized (this) { // Prevent concurrent close invocation.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
index b63b86ed63e..38baf38356b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -35,16 +36,24 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -60,17 +69,30 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METAFILE_EXT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
 
 /**
  * This class provides ability to work with saved cache dump.
  */
-public class Dump {
+public class Dump implements AutoCloseable {
+    /** Snapshot meta. */
+    private final List<SnapshotMetadata> metadata;
+
     /** Dump directory. */
     private final File dumpDir;
 
-    /** Kernal context. */
+    /**
+     * Kernal context for each node in dump.
+     */
     private final GridKernalContext cctx;
 
+    /** If {@code true} then return data in form of {@link BinaryObject}. */
+    private final boolean keepBinary;
+
+    /** If {@code true} then don't deserialize {@link KeyCacheObject} and 
{@link CacheObject}. */
+    private final boolean raw;
+
     /**
      * Map shared across all instances of {@link DumpEntrySerializer}.
      * We use per thread buffer because number of threads is fewer then number 
of partitions.
@@ -80,36 +102,77 @@ public class Dump {
     private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new 
ConcurrentHashMap<>();
 
     /**
-     * @param cctx Kernal context.
      * @param dumpDir Dump directory.
+     * @param keepBinary If {@code true} then don't deserialize {@link 
KeyCacheObject} and {@link CacheObject}.
      */
-    public Dump(GridKernalContext cctx, File dumpDir) {
-        this.cctx = cctx;
+    public Dump(File dumpDir, boolean keepBinary, boolean raw, IgniteLogger 
log) {
+        A.ensure(dumpDir != null, "dump directory is null");
+        A.ensure(dumpDir.exists(), "dump directory not exists");
+
         this.dumpDir = dumpDir;
+        this.metadata = metadata(dumpDir);
+        this.keepBinary = keepBinary;
+        this.cctx = standaloneKernalContext(dumpDir, log);
+        this.raw = raw;
+    }
 
-        File binaryMeta = new File(dumpDir, DFLT_BINARY_METADATA_PATH);
+    /**
+     * @param dumpDir Dump directory.
+     * @param log Logger.
+     * @return Standalone kernal context.
+     */
+    private GridKernalContext standaloneKernalContext(File dumpDir, 
IgniteLogger log) {
+        File binaryMeta = 
CacheObjectBinaryProcessorImpl.binaryWorkDir(dumpDir.getAbsolutePath(), 
F.first(metadata).folderName());
         File marshaller = new File(dumpDir, DFLT_MARSHALLER_PATH);
 
-        A.ensure(dumpDir != null, "dump directory is null");
-        A.ensure(dumpDir.exists(), "dump directory not exists");
         A.ensure(binaryMeta.exists(), "binary metadata directory not exists");
         A.ensure(marshaller.exists(), "marshaller directory not exists");
+
+        try {
+            GridKernalContext kctx = new StandaloneGridKernalContext(log, 
binaryMeta, marshaller);
+
+            startAll(kctx);
+
+            return kctx;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** @return Binary types iterator. */
+    public Iterator<BinaryType> types() {
+        return cctx.cacheObjects().metadata().iterator();
     }
 
     /** @return List of node directories. */
     public List<String> nodesDirectories() {
-        return Arrays.stream(new File(dumpDir, DFLT_STORE_DIR).listFiles(f -> 
f.isDirectory() &&
-            !(f.getAbsolutePath().endsWith(DFLT_BINARY_METADATA_PATH)
-                || 
f.getAbsolutePath().endsWith(DFLT_MARSHALLER_PATH)))).map(File::getName).collect(Collectors.toList());
+        File[] dirs = new File(dumpDir, DFLT_STORE_DIR).listFiles(f -> 
f.isDirectory()
+            && !(f.getAbsolutePath().endsWith(DFLT_BINARY_METADATA_PATH) || 
f.getAbsolutePath().endsWith(DFLT_MARSHALLER_PATH)));
+
+        if (dirs == null)
+            return Collections.emptyList();
+
+        return 
Arrays.stream(dirs).map(File::getName).collect(Collectors.toList());
     }
 
     /** @return List of snapshot metadata saved in {@link #dumpDir}. */
     public List<SnapshotMetadata> metadata() throws IOException, 
IgniteCheckedException {
-        JdkMarshaller marsh = 
MarshallerUtils.jdkMarshaller(cctx.igniteInstanceName());
+        return metadata;
+    }
 
-        ClassLoader clsLdr = U.resolveClassLoader(cctx.config());
+    /** @return List of snapshot metadata saved in {@link #dumpDir}. */
+    private static List<SnapshotMetadata> metadata(File dumpDir) {
+        JdkMarshaller marsh = MarshallerUtils.jdkMarshaller("fake-node");
+
+        ClassLoader clsLdr = U.resolveClassLoader(new IgniteConfiguration());
+
+        File[] files = dumpDir.listFiles(f -> 
f.getName().endsWith(SNAPSHOT_METAFILE_EXT));
 
-        return Arrays.stream(dumpDir.listFiles(f -> 
f.getName().endsWith(SNAPSHOT_METAFILE_EXT))).map(meta -> {
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(files).map(meta -> {
             try (InputStream in = new 
BufferedInputStream(Files.newInputStream(meta.toPath()))) {
                 return marsh.<SnapshotMetadata>unmarshal(in, clsLdr);
             }
@@ -143,8 +206,13 @@ public class Dump {
      * @return Dump iterator.
      */
     public List<Integer> partitions(String node, int group) {
-        return Arrays.stream(dumpGroupDirectory(node, group)
-            .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) && 
f.getName().endsWith(DUMP_FILE_EXT)))
+        File[] parts = dumpGroupDirectory(node, group)
+            .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) && 
f.getName().endsWith(DUMP_FILE_EXT));
+
+        if (parts == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(parts)
             .map(partFile -> 
Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX, 
"").replace(DUMP_FILE_EXT, "")))
             .collect(Collectors.toList());
     }
@@ -155,16 +223,6 @@ public class Dump {
      * @return Dump iterator.
      */
     public DumpedPartitionIterator iterator(String node, int group, int part) {
-        return iterator(node, group, part, true);
-    }
-
-    /**
-     * @param node Node directory name.
-     * @param group Group id.
-     * @param excludeDuplicates Skip entries that was dumped twice - by 
iterator and change listener.
-     * @return Dump iterator.
-     */
-    DumpedPartitionIterator iterator(String node, int group, int part, boolean 
excludeDuplicates) {
         FileIOFactory ioFactory = new RandomAccessFileIOFactory();
 
         FileIO dumpFile;
@@ -179,11 +237,13 @@ public class Dump {
         DumpEntrySerializer serializer = new DumpEntrySerializer(thLocBufs);
 
         serializer.kernalContext(cctx);
+        serializer.keepBinary(keepBinary);
+        serializer.raw(raw);
 
         return new DumpedPartitionIterator() {
             DumpEntry next;
 
-            Set<KeyCacheObject> partKeys = new HashSet<>();
+            Set<Object> partKeys = new HashSet<>();
 
             /** {@inheritDoc} */
             @Override public boolean hasNext() {
@@ -268,6 +328,11 @@ public class Dump {
         return grpDirs[0];
     }
 
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        closeAll(cctx);
+    }
+
     /**
      * Closeable dump iterator.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
index 1e8b14d9886..52b33e65be4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,6 +52,12 @@ public class DumpEntrySerializer {
     /** Fake context. */
     private CacheObjectContext fakeCacheObjCtx;
 
+    /** If {@code true} then don't deserialize {@link KeyCacheObject} and 
{@link CacheObject}. */
+    private boolean raw = false;
+
+    /** If {@code true} then return data in form of {@link BinaryObject}. */
+    private boolean keepBinary;
+
     /**
      * @param thLocBufs Thread local buffers.
      */
@@ -62,6 +71,16 @@ public class DumpEntrySerializer {
         fakeCacheObjCtx = new CacheObjectContext(cctx, null, null, false, 
false, false, false, false);
     }
 
+    /** @param keepBinary If {@code true} then return data in form of {@link 
BinaryObject}. */
+    public void keepBinary(boolean keepBinary) {
+        this.keepBinary = keepBinary;
+    }
+
+    /** @param raw If {@code true} then don't deserialize {@link 
KeyCacheObject} and {@link CacheObject}. */
+    public void raw(boolean raw) {
+        this.raw = raw;
+    }
+
     /**
      * Dump entry structure:
      * <pre>
@@ -202,12 +221,12 @@ public class DumpEntrySerializer {
                 return expireTime;
             }
 
-            @Override public KeyCacheObject key() {
-                return key;
+            @Override public Object key() {
+                return raw ? key : UnwrapDataEntry.unwrapKey(key, keepBinary, 
fakeCacheObjCtx);
             }
 
-            @Override public CacheObject value() {
-                return val;
+            @Override public Object value() {
+                return raw ? val : UnwrapDataEntry.unwrapValue(val, 
keepBinary, fakeCacheObjCtx);
             }
         };
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 05c40db53bd..2535af7fadf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -34,7 +34,6 @@ import java.util.TreeSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
@@ -57,6 +56,8 @@ import static java.lang.System.arraycopy;
 import static java.nio.file.Files.walkFileTree;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAll;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
 
@@ -176,8 +177,7 @@ public class IgniteWalIteratorFactory {
         if (iteratorParametersBuilder.sharedCtx == null) {
             GridCacheSharedContext<?, ?> sctx = 
prepareSharedCtx(iteratorParametersBuilder);
 
-            for (GridComponent comp : sctx.kernalContext())
-                comp.start();
+            startAll(sctx.kernalContext());
 
             return new StandaloneWalRecordsIterator(
                 iteratorParametersBuilder.log == null ? log : 
iteratorParametersBuilder.log,
@@ -194,8 +194,7 @@ public class IgniteWalIteratorFactory {
                 @Override protected void onClose() throws 
IgniteCheckedException {
                     super.onClose();
 
-                    for (GridComponent comp : sctx.kernalContext())
-                        comp.stop(true);
+                    closeAll(sctx.kernalContext());
                 }
             };
         }
@@ -690,9 +689,9 @@ public class IgniteWalIteratorFactory {
     /**
      *
      */
-    private static class ConsoleLogger implements IgniteLogger {
+    public static class ConsoleLogger implements IgniteLogger {
         /** */
-        private static final ConsoleLogger INSTANCE = new ConsoleLogger();
+        public static final ConsoleLogger INSTANCE = new ConsoleLogger();
 
         /** */
         private static final PrintStream OUT = System.out;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 8c1f52a4c15..a166840f2fb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -694,4 +694,22 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     @Override public Executor getAsyncContinuationExecutor() {
         return null;
     }
+
+    /**
+     * @param kctx Kernal context.
+     * @throws IgniteCheckedException In case of any error.
+     */
+    public static void startAll(GridKernalContext kctx) throws 
IgniteCheckedException {
+        for (GridComponent comp : kctx)
+            comp.start();
+    }
+
+    /**
+     * @param kctx Kernal context.
+     * @throws IgniteCheckedException In case of any error.
+     */
+    public static void closeAll(GridKernalContext kctx) throws 
IgniteCheckedException {
+        for (GridComponent comp : kctx)
+            comp.stop(true);
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
index 5e7164c52fe..d4524b5eb1b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
@@ -19,8 +19,8 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -31,24 +31,30 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpConsumer;
+import org.apache.ignite.dump.DumpEntry;
+import org.apache.ignite.dump.DumpReader;
+import org.apache.ignite.dump.DumpReaderConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -63,8 +69,11 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_THREAD_CNT;
+import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_TIMEOUT;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_RUNNING_DIR_KEY;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.toLong;
 import static org.apache.ignite.platform.model.AccessLevel.SUPER;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -88,7 +97,7 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
     public static final String DMP_NAME = "dump";
 
     /** */
-    static final IntFunction<User> USER_FACTORY = i ->
+    public static final IntFunction<User> USER_FACTORY = i ->
         new User(i, ACL.values()[Math.abs(i) % ACL.values().length], new 
Role("Role" + i, SUPER));
 
     /** */
@@ -108,7 +117,15 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
     public CacheAtomicityMode mode;
 
     /** */
-    @Parameterized.Parameters(name = 
"nodes={0},backups={1},persistence={2},mode={3}")
+    @Parameterized.Parameter(4)
+    public boolean useDataStreamer;
+
+    /** */
+    @Parameterized.Parameter(5)
+    public boolean onlyPrimary;
+
+    /** */
+    @Parameterized.Parameters(name = 
"nodes={0},backups={1},persistence={2},mode={3},useDataStreamer={4},onlyPrimary={5}")
     public static List<Object[]> params() {
         List<Object[]> params = new ArrayList<>();
 
@@ -116,10 +133,17 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
             for (int backups : new int[]{0, 1})
                 for (boolean persistence : new boolean[]{true, false})
                     for (CacheAtomicityMode mode : 
CacheAtomicityMode._values()) {
-                        if (nodes == 1 && backups != 0)
-                            continue;
-
-                        params.add(new Object[]{nodes, backups, persistence, 
mode});
+                        for (boolean useDataStreamer : new boolean[]{true, 
false}) {
+                            if (nodes == 1 && backups != 0)
+                                continue;
+
+                            if (backups > 0) {
+                                params.add(new Object[]{nodes, backups, 
persistence, mode, useDataStreamer, false});
+                                params.add(new Object[]{nodes, backups, 
persistence, mode, useDataStreamer, true});
+                            }
+                            else
+                                params.add(new Object[]{nodes, backups, 
persistence, mode, useDataStreamer, false});
+                        }
                     }
 
         return params;
@@ -183,7 +207,7 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
 
         ign.cluster().state(ClusterState.ACTIVE);
 
-        putData(ign.cache(DEFAULT_CACHE_NAME), ign.cache(CACHE_0), 
ign.cache(CACHE_1));
+        putData(cli.cache(DEFAULT_CACHE_NAME), cli.cache(CACHE_0), 
cli.cache(CACHE_1));
 
         return ign;
     }
@@ -205,7 +229,7 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
             });
         }
 
-        IgniteInternalFuture<Object> dumpFut = runAsync(() -> 
createDump((IgniteEx)ignites.get(0)));
+        IgniteInternalFuture<Object> dumpFut = runAsync(() -> 
createDump((IgniteEx)F.first(ignites)));
 
         // Waiting while dump will be setup: task planned after change 
listener set.
         assertTrue(waitForCondition(() -> {
@@ -229,15 +253,30 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
         IgniteCache<Object, Object> grpCache0,
         IgniteCache<Object, Object> grpCache1
     ) {
-        IntStream.range(0, KEYS_CNT).forEach(i -> {
-            cache.put(i, i);
-            grpCache0.put(i, USER_FACTORY.apply(i));
-            grpCache1.put(new Key(i), new Value(String.valueOf(i)));
-        });
+        if (useDataStreamer) {
+            try (
+                IgniteDataStreamer<Integer, Integer> _cache = 
cli.dataStreamer(cache.getName());
+                IgniteDataStreamer<Integer, User> _grpCache0 = 
cli.dataStreamer(grpCache0.getName());
+                IgniteDataStreamer<Key, Value> _grpCache1 = 
cli.dataStreamer(grpCache1.getName())
+            ) {
+                IntStream.range(0, KEYS_CNT).forEach(i -> {
+                    _cache.addData(i, i);
+                    _grpCache0.addData(i, USER_FACTORY.apply(i));
+                    _grpCache1.addData(new Key(i), new 
Value(String.valueOf(i)));
+                });
+            }
+        }
+        else {
+            IntStream.range(0, KEYS_CNT).forEach(i -> {
+                cache.put(i, i);
+                grpCache0.put(i, USER_FACTORY.apply(i));
+                grpCache1.put(new Key(i), new Value(String.valueOf(i)));
+            });
+        }
     }
 
     /** */
-    void checkDump(IgniteEx ign) throws Exception {
+    protected void checkDump(IgniteEx ign) throws Exception {
         checkDump(ign, DMP_NAME);
     }
 
@@ -265,101 +304,117 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
 
         assertEquals(nodes, nodesDirs.size());
 
-        Set<Integer> keys = new HashSet<>();
-        int dfltDumpSz = 0;
-        int grpDumpSz = 0;
-
-        CacheObjectContext coCtx = 
ign.context().cache().context().cacheObjectContext(CU.cacheId(DEFAULT_CACHE_NAME));
-        CacheObjectContext coCtx0 = 
ign.context().cache().context().cacheObjectContext(CU.cacheId(CACHE_0));
-        CacheObjectContext coCtx1 = 
ign.context().cache().context().cacheObjectContext(CU.cacheId(CACHE_1));
-
-        for (String nodeDir : nodesDirs) {
-            List<StoredCacheData> ccfgs = dump.configs(nodeDir, 
CU.cacheId(DEFAULT_CACHE_NAME));
+        TestDumpConsumer cnsmr = new TestDumpConsumer() {
+            final Set<Integer> keys = new HashSet<>();
 
-            assertNotNull(ccfgs);
-            assertEquals(1, ccfgs.size());
+            final Set<Long> grpParts = new HashSet<>();
 
-            assertEquals(DEFAULT_CACHE_NAME, 
ccfgs.get(0).configuration().getName());
-            assertFalse(ccfgs.get(0).sql());
-            assertTrue(ccfgs.get(0).queryEntities().isEmpty());
+            int dfltDumpSz;
 
-            ccfgs = dump.configs(nodeDir, CU.cacheId(GRP));
+            int grpDumpSz;
 
-            assertNotNull(ccfgs);
-            assertEquals(2, ccfgs.size());
+            @Override public void onCacheConfigs(Iterator<StoredCacheData> 
caches) {
+                super.onCacheConfigs(caches);
 
-            ccfgs.sort(Comparator.comparing(d -> d.config().getName()));
+                boolean[] cachesFound = new boolean[3];
 
-            CacheConfiguration ccfg0 = ccfgs.get(0).configuration();
-            CacheConfiguration ccfg1 = ccfgs.get(1).configuration();
+                caches.forEachRemaining(data -> {
+                    if (data.config().getName().equals(DEFAULT_CACHE_NAME)) {
+                        assertFalse(cachesFound[0]);
+                        cachesFound[0] = true;
 
-            assertEquals(GRP, ccfg0.getGroupName());
-            assertEquals(CACHE_0, ccfg0.getName());
-
-            assertEquals(GRP, ccfg1.getGroupName());
-            assertEquals(CACHE_1, ccfg1.getName());
+                        assertEquals(DEFAULT_CACHE_NAME, 
data.config().getName());
+                        assertFalse(data.sql());
+                        assertTrue(data.queryEntities().isEmpty());
+                    }
+                    else if (data.config().getName().equals(CACHE_0)) {
+                        assertFalse(cachesFound[1]);
+                        cachesFound[1] = true;
+
+                        assertEquals(GRP, data.configuration().getGroupName());
+                        assertEquals(CACHE_0, data.configuration().getName());
+                        assertFalse(data.sql());
+                        assertTrue(data.queryEntities().isEmpty());
+                    }
+                    else if (data.config().getName().equals(CACHE_1)) {
+                        assertFalse(cachesFound[2]);
+                        cachesFound[2] = true;
+
+                        assertEquals(GRP, data.configuration().getGroupName());
+                        assertEquals(CACHE_1, data.configuration().getName());
+                        assertFalse(data.sql());
+                        assertTrue(data.queryEntities().isEmpty());
+                    }
+                    else
+                        throw new IgniteException("Unknown cache");
+                });
 
-            assertFalse(ccfgs.get(0).sql());
-            assertFalse(ccfgs.get(1).sql());
-            assertTrue(ccfgs.get(0).queryEntities().isEmpty());
-            assertTrue(ccfgs.get(1).queryEntities().isEmpty());
+                for (boolean found : cachesFound)
+                    assertTrue(found);
+            }
 
-            List<Integer> parts = dump.partitions(nodeDir, 
CU.cacheId(DEFAULT_CACHE_NAME));
+            @Override public void onPartition(int grp, int part, 
Iterator<DumpEntry> iter) {
+                if (onlyPrimary)
+                    assertTrue(grpParts.add(toLong(grp, part)));
 
-            for (int part : parts) {
-                try (DumpedPartitionIterator iter = dump.iterator(nodeDir, 
CU.cacheId(DEFAULT_CACHE_NAME), part)) {
+                if (grp == CU.cacheId(DEFAULT_CACHE_NAME)) {
                     while (iter.hasNext()) {
                         DumpEntry e = iter.next();
 
-                        checkDefaultCacheEntry(e, coCtx);
+                        checkDefaultCacheEntry(e);
 
-                        keys.add(e.key().<Integer>value(coCtx, true));
+                        keys.add((Integer)e.key());
 
                         dfltDumpSz++;
                     }
                 }
-            }
-
-            parts = dump.partitions(nodeDir, CU.cacheId(GRP));
-
-            for (int part : parts) {
-                try (DumpedPartitionIterator iter = dump.iterator(nodeDir, 
CU.cacheId(GRP), part)) {
+                else {
                     while (iter.hasNext()) {
                         DumpEntry e = iter.next();
 
-                        checkGroupEntry(e, coCtx0, coCtx1);
+                        assertNotNull(e);
+
+                        if (e.cacheId() == CU.cacheId(CACHE_0))
+                            assertEquals(USER_FACTORY.apply((Integer)e.key()), 
e.value());
+                        else
+                            assertEquals(((Key)e.key()).getId() + "", 
((Value)e.value()).getVal());
 
                         grpDumpSz++;
                     }
                 }
             }
-        }
 
-        assertEquals(KEYS_CNT + KEYS_CNT * backups, dfltDumpSz);
-        assertEquals(2 * (KEYS_CNT + KEYS_CNT * backups), grpDumpSz);
+            @Override public void check() {
+                super.check();
 
-        IntStream.range(0, KEYS_CNT).forEach(key -> 
assertTrue(keys.contains(key)));
-    }
-
-    /** */
-    protected void checkDefaultCacheEntry(DumpEntry e, CacheObjectContext 
coCtx) {
-        assertNotNull(e);
+                assertEquals(KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * 
backups), dfltDumpSz);
+                assertEquals(2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * 
backups)), grpDumpSz);
 
-        Integer key = e.key().<Integer>value(coCtx, true);
+                IntStream.range(0, KEYS_CNT).forEach(key -> 
assertTrue(keys.contains(key)));
+            }
+        };
 
-        assertEquals(key, e.value().<Integer>value(coCtx, true));
+        new DumpReader(
+            new DumpReaderConfiguration(
+                dumpDirectory(ign, name),
+                cnsmr,
+                DFLT_THREAD_CNT, DFLT_TIMEOUT,
+                true,
+                false
+            ),
+            log
+        ).run();
+
+        cnsmr.check();
     }
 
     /** */
-    protected void checkGroupEntry(DumpEntry e, CacheObjectContext coCtx0, 
CacheObjectContext coCtx1) {
+    protected void checkDefaultCacheEntry(DumpEntry e) {
         assertNotNull(e);
 
-        if (e.cacheId() == CU.cacheId(CACHE_0))
-            assertEquals(USER_FACTORY.apply(e.key().value(coCtx0, true)), 
e.value().value(coCtx0, true));
-        else {
-            assertNotNull(e.key().<Key>value(coCtx1, true));
-            assertNotNull(e.value().<Value>value(coCtx1, true));
-        }
+        Integer key = (Integer)e.key();
+
+        assertEquals(key, e.value());
     }
 
     /** */
@@ -419,18 +474,25 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    void createDump(IgniteEx ign) {
+    protected void createDump(IgniteEx ign) {
         createDump(ign, DMP_NAME);
     }
 
     /** */
     public static Dump dump(IgniteEx ign, String name) throws 
IgniteCheckedException {
         return new Dump(
-            ign.context(),
-            new File(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
ign.configuration().getSnapshotPath(), false), name)
+            dumpDirectory(ign, name),
+            true,
+            false,
+            log
         );
     }
 
+    /** */
+    public static File dumpDirectory(IgniteEx ign, String name) throws 
IgniteCheckedException {
+        return new File(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
ign.configuration().getSnapshotPath(), false), name);
+    }
+
     /** */
     public static void checkDumpWithCommand(IgniteEx ign, String name, int 
backups) throws Exception {
         CacheGroupContext gctx = 
ign.context().cache().cacheGroup(CU.cacheId(DEFAULT_CACHE_NAME));
@@ -458,6 +520,88 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
 
     /** */
     void createDump(IgniteEx ign, String name) {
-        ign.snapshot().createDump(name).get();
+        ign.context().cache().context().snapshotMgr().createSnapshot(name, 
null, false, true, true).get();
+    }
+
+    /** */
+    public abstract static class TestDumpConsumer implements DumpConsumer {
+        /** */
+        private boolean started;
+
+        /** */
+        private boolean stopped;
+
+        /** */
+        private boolean typesCb;
+
+        /** */
+        private boolean mappingcCb;
+
+        /** */
+        private boolean cacheCfgCb;
+
+        /** {@inheritDoc} */
+        @Override public void start() {
+            assertFalse(started);
+            assertFalse(mappingcCb);
+            assertFalse(typesCb);
+            assertFalse(cacheCfgCb);
+            assertFalse(stopped);
+
+            started = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMappings(Iterator<TypeMapping> mappings) {
+            assertTrue(started);
+            assertFalse(mappingcCb);
+            assertFalse(typesCb);
+            assertFalse(cacheCfgCb);
+            assertFalse(stopped);
+
+            mappingcCb = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTypes(Iterator<BinaryType> types) {
+            assertTrue(started);
+            assertTrue(mappingcCb);
+            assertFalse(typesCb);
+            assertFalse(cacheCfgCb);
+            assertFalse(stopped);
+
+            typesCb = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheConfigs(Iterator<StoredCacheData> caches) 
{
+            assertTrue(started);
+            assertTrue(mappingcCb);
+            assertTrue(typesCb);
+            assertFalse(cacheCfgCb);
+            assertFalse(stopped);
+
+            cacheCfgCb = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() {
+            assertTrue(started);
+            assertTrue(typesCb);
+            assertTrue(mappingcCb);
+            assertTrue(cacheCfgCb);
+            assertFalse(stopped);
+
+            stopped = true;
+        }
+
+        /** */
+        public void check() {
+            assertTrue(started);
+            assertTrue(typesCb);
+            assertTrue(mappingcCb);
+            assertTrue(cacheCfgCb);
+            assertTrue(stopped);
+        }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
index 893d94f77d9..9491a3f6534 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
@@ -36,9 +36,9 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -55,6 +55,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_RUNNING_DIR_KEY;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.junit.Assume.assumeFalse;
 
 /** */
 public class IgniteCacheDumpSelfTest extends AbstractCacheDumpTest {
@@ -262,6 +263,8 @@ public class IgniteCacheDumpSelfTest extends 
AbstractCacheDumpTest {
     /** */
     @Test
     public void testDumpWithImplicitExpireTime() throws Exception {
+        assumeFalse(useDataStreamer);
+
         explicitTtl = false;
 
         doTestDumpWithExpiry();
@@ -452,8 +455,8 @@ public class IgniteCacheDumpSelfTest extends 
AbstractCacheDumpTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void checkDefaultCacheEntry(DumpEntry e, 
CacheObjectContext coCtx) {
-        super.checkDefaultCacheEntry(e, coCtx);
+    @Override protected void checkDefaultCacheEntry(DumpEntry e) {
+        super.checkDefaultCacheEntry(e);
 
         if (explicitTtl != null) {
             assertTrue("Expire time must be set", e.expireTime() != 0);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
index f087e5ccf77..a08f92dae72 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java
@@ -36,7 +36,7 @@ import org.junit.runners.Parameterized;
 /** */
 public class IgniteConcurrentCacheDumpTest extends AbstractCacheDumpTest {
     /** */
-    @Parameterized.Parameters(name = 
"nodes={0},backups={1},persistence={2},mode={3}")
+    @Parameterized.Parameters(name = 
"nodes={0},backups={1},persistence={2},mode={3},useDataStreamer={4},onlyPrimary={5}")
     public static List<Object[]> params() {
         List<Object[]> params = new ArrayList<>();
 
@@ -44,7 +44,7 @@ public class IgniteConcurrentCacheDumpTest extends 
AbstractCacheDumpTest {
             for (int backups : new int[]{1, 2})
                 for (boolean persistence : new boolean[]{true, false})
                     for (CacheAtomicityMode mode : 
CacheAtomicityMode._values())
-                        params.add(new Object[]{nodes, backups, persistence, 
mode});
+                        params.add(new Object[]{nodes, backups, persistence, 
mode, false, false});
 
         return params;
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 1865af069b9..20d6a4bfbaf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -40,6 +40,8 @@ import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAll;
+
 /**
  * Test context.
  */
@@ -94,8 +96,7 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
      * @throws IgniteCheckedException If failed
      */
     public void start() throws IgniteCheckedException {
-        for (GridComponent comp : this)
-            comp.start();
+        startAll(this);
     }
 
     /**
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
index e111223dc7e..18a24fbd63b 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/dump/DumpCacheConfigTest.java
@@ -17,24 +17,27 @@
 
 package org.apache.ignite.internal.dump;
 
+import java.io.File;
 import java.util.Collection;
-import java.util.List;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.dump.DumpEntry;
+import org.apache.ignite.dump.DumpReader;
+import org.apache.ignite.dump.DumpReaderConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
-import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpEntry;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.cdc.SqlCdcTest.executeSql;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT;
-import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump;
 
 /** */
 public class DumpCacheConfigTest extends GridCommonAbstractTest {
@@ -69,57 +72,78 @@ public class DumpCacheConfigTest extends 
GridCommonAbstractTest {
     private void checkDump(IgniteEx srv, String name, boolean first) throws 
Exception {
         srv.snapshot().createDump(name).get(10_000L);
 
-        int grpId = CU.cacheId("T1");
-        int cnt = 0;
+        AtomicInteger cnt = new AtomicInteger();
 
-        Dump dump = dump(srv, name);
+        TestDumpConsumer cnsmr = new TestDumpConsumer() {
+            @Override public void onTypes(Iterator<BinaryType> types) {
+                super.onTypes(types);
 
-        for (String nodeDir : dump.nodesDirectories()) {
-            for (int part : dump.partitions(nodeDir, grpId)) {
-                try (DumpedPartitionIterator iter = dump.iterator(nodeDir, 
grpId, part)) {
-                    while (iter.hasNext()) {
-                        DumpEntry e = iter.next();
+                assertTrue(types.hasNext());
 
-                        assertNotNull(e);
+                BinaryType type = types.next();
 
-                        int id = e.key().value(null, false);
+                assertFalse(types.hasNext());
 
-                        BinaryObject val = (BinaryObject)e.value();
+                assertTrue(type.typeName().startsWith("SQL_PUBLIC_T1"));
+            }
 
-                        assertNotNull(val);
-                        assertEquals("Name-" + id, val.field("NAME"));
+            @Override public void onCacheConfigs(Iterator<StoredCacheData> 
caches) {
+                super.onCacheConfigs(caches);
 
-                        cnt++;
-                    }
-                }
-            }
+                assertTrue(caches.hasNext());
+
+                StoredCacheData data = caches.next();
+
+                assertFalse(caches.hasNext());
+
+                assertTrue(data.sql());
+
+                CacheConfiguration ccfg = data.config();
 
-            List<StoredCacheData> ccfgs = dump.configs(nodeDir, grpId);
+                assertEquals("T1", ccfg.getName());
 
-            assertNotNull(ccfgs);
-            assertEquals(1, ccfgs.size());
-            assertTrue(ccfgs.get(0).sql());
+                Collection<QueryEntity> qes = data.queryEntities();
 
-            CacheConfiguration ccfg = ccfgs.get(0).config();
+                assertNotNull(qes);
+                assertEquals(1, qes.size());
 
-            assertEquals("T1", ccfg.getName());
+                QueryEntity qe = qes.iterator().next();
 
-            Collection<QueryEntity> qes = ccfgs.get(0).queryEntities();
+                assertNotNull(qe);
+                assertEquals("T1", qe.getTableName());
+                assertEquals(first ? 2 : 3, qe.getFields().size());
+                assertTrue(qe.getFields().containsKey("ID"));
+                assertTrue(qe.getFields().containsKey("NAME"));
+                if (!first)
+                    assertTrue(qe.getFields().containsKey("ADDRESS"));
+            }
+
+            @Override public void onPartition(int grp, int part, 
Iterator<DumpEntry> data) {
+                while (data.hasNext()) {
+                    DumpEntry e = data.next();
+
+                    assertNotNull(e);
+
+                    BinaryObject val = (BinaryObject)e.value();
 
-            assertNotNull(qes);
-            assertEquals(1, qes.size());
+                    assertNotNull(val);
+                    assertEquals("Name-" + e.key(), val.field("NAME"));
+
+                    cnt.incrementAndGet();
+                }
+            }
+        };
 
-            QueryEntity qe = qes.iterator().next();
+        new DumpReader(
+            new DumpReaderConfiguration(
+                new File(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
srv.configuration().getSnapshotPath(), false), name),
+                cnsmr
+            ),
+            log
+        ).run();
 
-            assertNotNull(qe);
-            assertEquals("T1", qe.getTableName());
-            assertEquals(first ? 2 : 3, qe.getFields().size());
-            assertTrue(qe.getFields().containsKey("ID"));
-            assertTrue(qe.getFields().containsKey("NAME"));
-            if (!first)
-                assertTrue(qe.getFields().containsKey("ADDRESS"));
-        }
+        assertEquals(first ? KEYS_CNT : (KEYS_CNT * 2), cnt.get());
 
-        assertEquals(first ? KEYS_CNT : (KEYS_CNT * 2), cnt);
+        cnsmr.check();
     }
 }


Reply via email to