This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 63f1a80d83d IGNITE-20722 Support skipCopies for DumpReader (#11014)
63f1a80d83d is described below
commit 63f1a80d83da4ae5e3122057826e92ff4b6e3178
Author: yurinaryshkin <[email protected]>
AuthorDate: Mon Oct 30 16:38:56 2023 +0300
IGNITE-20722 Support skipCopies for DumpReader (#11014)
---
.../java/org/apache/ignite/dump/DumpReader.java | 12 ++++++
.../ignite/dump/DumpReaderConfiguration.java | 40 +++++++++++++++++-
.../snapshot/dump/AbstractCacheDumpTest.java | 9 +++--
.../snapshot/dump/IgniteCacheDumpSelfTest.java | 47 ++++++++++++++++++++--
4 files changed, 100 insertions(+), 8 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
index 34522d4809c..d2ed811ac3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -108,11 +109,22 @@ public class DumpReader implements Runnable {
AtomicBoolean skip = new AtomicBoolean(false);
+ Map<Integer, Set<Integer>> groups = cfg.skipCopies() ? new
HashMap<>() : null;
+
+ if (groups != null)
+ grpToNodes.keySet().forEach(grpId -> groups.put(grpId, new
HashSet<>()));
+
for (Map.Entry<Integer, List<String>> e :
grpToNodes.entrySet()) {
int grp = e.getKey();
for (String node : e.getValue()) {
for (int part : dump.partitions(node, grp)) {
+ if (groups != null && !groups.get(grp).add(part)) {
+ log.info("Skip copy partition [node=" + node +
", grp=" + grp + ", part=" + part + ']');
+
+ continue;
+ }
+
Runnable consumePart = () -> {
if (skip.get()) {
if (log.isDebugEnabled()) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
index 20da0ef0547..1b43554c3f0 100644
---
a/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/dump/DumpReaderConfiguration.java
@@ -58,12 +58,15 @@ public class DumpReaderConfiguration {
/** Cache group names. */
private String[] cacheGroupNames;
+ /** Skip copies. */
+ private final boolean skipCopies;
+
/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
- this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null);
+ this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null,
false);
}
/**
@@ -75,8 +78,35 @@ public class DumpReaderConfiguration {
* @param keepBinary If {@code true} then don't deserialize {@link
KeyCacheObject} and {@link CacheObject}.
* @param cacheGroupNames Cache group names.
*/
- public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt,
Duration timeout, boolean failFast, boolean keepBinary,
+ public DumpReaderConfiguration(File dir,
+ DumpConsumer cnsmr,
+ int thCnt,
+ Duration timeout,
+ boolean failFast,
+ boolean keepBinary,
String[] cacheGroupNames
+ ) {
+ this(dir, cnsmr, thCnt, timeout, failFast, keepBinary,
cacheGroupNames, false);
+ }
+
+ /**
+ * @param dir Root dump directory.
+ * @param cnsmr Dump consumer.
+ * @param thCnt Count of threads to consume dumped partitions.
+ * @param timeout Timeout of dump reader invocation.
+ * @param failFast Stop processing partitions if consumer fail to process
one.
+ * @param keepBinary If {@code true} then don't deserialize {@link
KeyCacheObject} and {@link CacheObject}.
+ * @param cacheGroupNames Cache group names.
+ * @param skipCopies Skip copies.
+ */
+ public DumpReaderConfiguration(File dir,
+ DumpConsumer cnsmr,
+ int thCnt,
+ Duration timeout,
+ boolean failFast,
+ boolean keepBinary,
+ String[] cacheGroupNames,
+ boolean skipCopies
) {
this.dir = dir;
this.cnsmr = cnsmr;
@@ -85,6 +115,7 @@ public class DumpReaderConfiguration {
this.failFast = failFast;
this.keepBinary = keepBinary;
this.cacheGroupNames = cacheGroupNames;
+ this.skipCopies = skipCopies;
}
/** @return Root dump directiory. */
@@ -121,4 +152,9 @@ public class DumpReaderConfiguration {
public String[] cacheGroupNames() {
return cacheGroupNames;
}
+
+ /** @return Skip copies. */
+ public boolean skipCopies() {
+ return skipCopies;
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
index 657d3ddc6f3..269939c8e2a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
@@ -291,7 +291,8 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
- KEYS_CNT);
+ KEYS_CNT,
+ false);
}
/** */
@@ -302,7 +303,8 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
Set<String> expectedFoundCaches,
int expectedDfltDumpSz,
int expectedGrpDumpSz,
- int expectedCount
+ int expectedCount,
+ boolean skipCopies
) throws Exception {
checkDumpWithCommand(ign, name, backups);
@@ -408,7 +410,8 @@ public abstract class AbstractCacheDumpTest extends
GridCommonAbstractTest {
DFLT_THREAD_CNT, DFLT_TIMEOUT,
true,
false,
- cacheGroupNames
+ cacheGroupNames,
+ skipCopies
),
log
).run();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
index 9ca07130366..564b590c878 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java
@@ -163,7 +163,9 @@ public class IgniteCacheDumpSelfTest extends
AbstractCacheDumpTest {
new HashSet<>(Arrays.asList(CACHE_0, CACHE_1)),
0,
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
- 0);
+ 0,
+ false
+ );
checkDump(
ign,
@@ -172,7 +174,8 @@ public class IgniteCacheDumpSelfTest extends
AbstractCacheDumpTest {
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
0,
- KEYS_CNT
+ KEYS_CNT,
+ false
);
checkDump(
@@ -182,7 +185,45 @@ public class IgniteCacheDumpSelfTest extends
AbstractCacheDumpTest {
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0,
CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
- KEYS_CNT
+ KEYS_CNT,
+ false
+ );
+ }
+ finally {
+ snpPoolSz = 1;
+ }
+ }
+
+ /** */
+ @Test
+ public void testSkipCopies() throws Exception {
+ snpPoolSz = 4;
+
+ try {
+ IgniteEx ign = startGridAndFillCaches();
+
+ createDump(ign);
+
+ checkDump(
+ ign,
+ DMP_NAME,
+ null,
+ new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0,
CACHE_1)),
+ KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
+ 2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
+ KEYS_CNT,
+ false
+ );
+
+ checkDump(
+ ign,
+ DMP_NAME,
+ null,
+ new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0,
CACHE_1)),
+ KEYS_CNT,
+ 2 * KEYS_CNT,
+ KEYS_CNT,
+ true
);
}
finally {