This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 63f1a80d83d IGNITE-20722 Support skipCopies for DumpReader (#11014)
63f1a80d83d is described below

commit 63f1a80d83da4ae5e3122057826e92ff4b6e3178
Author: yurinaryshkin <[email protected]>
AuthorDate: Mon Oct 30 16:38:56 2023 +0300

    IGNITE-20722 Support skipCopies for DumpReader (#11014)
---
 .../java/org/apache/ignite/dump/DumpReader.java    | 12 ++++++
 .../ignite/dump/DumpReaderConfiguration.java       | 40 +++++++++++++++++-
 .../snapshot/dump/AbstractCacheDumpTest.java       |  9 +++--
 .../snapshot/dump/IgniteCacheDumpSelfTest.java     | 47 ++++++++++++++++++++--
 4 files changed, 100 insertions(+), 8 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
index 34522d4809c..d2ed811ac3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -108,11 +109,22 @@ public class DumpReader implements Runnable {
 
                 AtomicBoolean skip = new AtomicBoolean(false);
 
+                Map<Integer, Set<Integer>> groups = cfg.skipCopies() ? new 
HashMap<>() : null;
+
+                if (groups != null)
+                    grpToNodes.keySet().forEach(grpId -> groups.put(grpId, new 
HashSet<>()));
+
                 for (Map.Entry<Integer, List<String>> e : 
grpToNodes.entrySet()) {
                     int grp = e.getKey();
 
                     for (String node : e.getValue()) {
                         for (int part : dump.partitions(node, grp)) {
+                            if (groups != null && !groups.get(grp).add(part)) {
+                                log.info("Skip copy partition [node=" + node + 
", grp=" + grp + ", part=" + part + ']');
+
+                                continue;
+                            }
+
                             Runnable consumePart = () -> {
                                 if (skip.get()) {
                                     if (log.isDebugEnabled()) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
index 20da0ef0547..1b43554c3f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
@@ -58,12 +58,15 @@ public class DumpReaderConfiguration {
     /** Cache group names. */
     private String[] cacheGroupNames;
 
+    /** Skip copies. */
+    private final boolean skipCopies;
+
     /**
      * @param dir Root dump directory.
      * @param cnsmr Dump consumer.
      */
     public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
-        this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null);
+        this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null, 
false);
     }
 
     /**
@@ -75,8 +78,35 @@ public class DumpReaderConfiguration {
      * @param keepBinary If {@code true} then don't deserialize {@link 
KeyCacheObject} and {@link CacheObject}.
      * @param cacheGroupNames Cache group names.
      */
-    public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, 
Duration timeout, boolean failFast, boolean keepBinary,
+    public DumpReaderConfiguration(File dir,
+        DumpConsumer cnsmr,
+        int thCnt,
+        Duration timeout,
+        boolean failFast,
+        boolean keepBinary,
         String[] cacheGroupNames
+    ) {
+        this(dir, cnsmr, thCnt, timeout, failFast, keepBinary, 
cacheGroupNames, false);
+    }
+
+    /**
+     * @param dir Root dump directory.
+     * @param cnsmr Dump consumer.
+     * @param thCnt Count of threads to consume dumped partitions.
+     * @param timeout Timeout of dump reader invocation.
+     * @param failFast Stop processing partitions if consumer fail to process 
one.
+     * @param keepBinary If {@code true} then don't deserialize {@link 
KeyCacheObject} and {@link CacheObject}.
+     * @param cacheGroupNames Cache group names.
+     * @param skipCopies Skip copies.
+     */
+    public DumpReaderConfiguration(File dir,
+        DumpConsumer cnsmr,
+        int thCnt,
+        Duration timeout,
+        boolean failFast,
+        boolean keepBinary,
+        String[] cacheGroupNames,
+        boolean skipCopies
     ) {
         this.dir = dir;
         this.cnsmr = cnsmr;
@@ -85,6 +115,7 @@ public class DumpReaderConfiguration {
         this.failFast = failFast;
         this.keepBinary = keepBinary;
         this.cacheGroupNames = cacheGroupNames;
+        this.skipCopies = skipCopies;
     }
 
     /** @return Root dump directiory. */
@@ -121,4 +152,9 @@ public class DumpReaderConfiguration {
     public String[] cacheGroupNames() {
         return cacheGroupNames;
     }
+
+    /** @return Skip copies. */
+    public boolean skipCopies() {
+        return skipCopies;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
index 657d3ddc6f3..269939c8e2a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
@@ -291,7 +291,8 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
             new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
             KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
             2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
-            KEYS_CNT);
+            KEYS_CNT,
+            false);
     }
 
     /** */
@@ -302,7 +303,8 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
         Set<String> expectedFoundCaches,
         int expectedDfltDumpSz,
         int expectedGrpDumpSz,
-        int expectedCount
+        int expectedCount,
+        boolean skipCopies
     ) throws Exception {
         checkDumpWithCommand(ign, name, backups);
 
@@ -408,7 +410,8 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
                 DFLT_THREAD_CNT, DFLT_TIMEOUT,
                 true,
                 false,
-                cacheGroupNames
+                cacheGroupNames,
+                skipCopies
             ),
             log
         ).run();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
index 9ca07130366..564b590c878 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
@@ -163,7 +163,9 @@ public class IgniteCacheDumpSelfTest extends 
AbstractCacheDumpTest {
                 new HashSet<>(Arrays.asList(CACHE_0, CACHE_1)),
                 0,
                 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
-                0);
+                0,
+                false
+            );
 
             checkDump(
                 ign,
@@ -172,7 +174,8 @@ public class IgniteCacheDumpSelfTest extends 
AbstractCacheDumpTest {
                 new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME)),
                 KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
                 0,
-                KEYS_CNT
+                KEYS_CNT,
+                false
             );
 
             checkDump(
@@ -182,7 +185,45 @@ public class IgniteCacheDumpSelfTest extends 
AbstractCacheDumpTest {
                 new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, 
CACHE_1)),
                 KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
                 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
-                KEYS_CNT
+                KEYS_CNT,
+                false
+            );
+        }
+        finally {
+            snpPoolSz = 1;
+        }
+    }
+
+    /** */
+    @Test
+    public void testSkipCopies() throws Exception {
+        snpPoolSz = 4;
+
+        try {
+            IgniteEx ign = startGridAndFillCaches();
+
+            createDump(ign);
+
+            checkDump(
+                ign,
+                DMP_NAME,
+                null,
+                new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, 
CACHE_1)),
+                KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
+                2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
+                KEYS_CNT,
+                false
+            );
+
+            checkDump(
+                ign,
+                DMP_NAME,
+                null,
+                new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, 
CACHE_1)),
+                KEYS_CNT,
+                2 * KEYS_CNT,
+                KEYS_CNT,
+                true
             );
         }
         finally {

Reply via email to