This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 098ddecab6e IGNITE-26602 Snapshot component optimizations for MultiDC 
(#12562)
098ddecab6e is described below

commit 098ddecab6e35d564f495adb3b751f6a14c3a276
Author: Anton Vinogradov <[email protected]>
AuthorDate: Wed Dec 10 15:59:15 2025 +0300

    IGNITE-26602 Snapshot component optimizations for MultiDC (#12562)
---
 .../snapshot/SnapshotRestoreProcess.java           |  21 ++-
 .../IgniteSnapshotRestoreFromRemoteMdcTest.java    | 165 +++++++++++++++++++++
 .../ignite/testsuites/IgniteSnapshotTestSuite.java |   2 +
 3 files changed, 184 insertions(+), 4 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 70460e3b525..1686f3d7028 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -1577,7 +1577,7 @@ public class SnapshotRestoreProcess {
      * @param metas Map of snapshot metadata distribution across the cluster.
      * @return Map of cache partitions per each node.
      */
-    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+    private Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
         Map<UUID, List<SnapshotMetadata>> metas,
         BiPredicate<Integer, Integer> filter
     ) {
@@ -1586,10 +1586,23 @@ public class SnapshotRestoreProcess {
         List<UUID> nodes = new ArrayList<>(metas.keySet());
         Collections.shuffle(nodes);
 
-        Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
-        nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
+        Map<UUID, List<SnapshotMetadata>> orderedMetas = new LinkedHashMap<>();
 
-        for (Map.Entry<UUID, List<SnapshotMetadata>> e : 
shuffleMetas.entrySet()) {
+        String locDc = ctx.discovery().localNode().dataCenterId();
+
+        if (locDc != null) {
+            List<UUID> sameDcNodes = nodes.stream()
+                .map(uuid -> ctx.discovery().node(uuid))
+                .filter(node -> Objects.equals(node.dataCenterId(), locDc))
+                .map(ClusterNode::id)
+                .collect(Collectors.toList());
+
+            sameDcNodes.forEach(k -> orderedMetas.put(k, metas.get(k))); // 
Getting same DC files first.
+        }
+
+        nodes.forEach(k -> orderedMetas.put(k, metas.get(k)));
+
+        for (Map.Entry<UUID, List<SnapshotMetadata>> e : 
orderedMetas.entrySet()) {
             UUID nodeId = e.getKey();
 
             for (SnapshotMetadata meta : 
ofNullable(e.getValue()).orElse(Collections.emptyList())) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteMdcTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteMdcTest.java
new file mode 100644
index 00000000000..9a63f2fb83b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteMdcTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.util.IgniteUtils.defaultWorkDirectory;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteMdcTest extends 
AbstractSnapshotSelfTest {
+    /** Cache. */
+    private static final String CACHE = "cache";
+
+    /** */
+    private static final String DC_ID_0 = "DC_ID_0";
+
+    /** */
+    private static final String DC_ID_1 = "DC_ID_1";
+
+    /** @throws Exception If fails. */
+    @Before
+    public void before() throws Exception {
+        cleanPersistenceDir();
+        cleanupDedicatedPersistenceDirs();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testMdcAwareSnapshotFromCurrentDc() throws Exception {
+        testMdcAwareSnapshot(true);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testMdcAwareSnapshotFromBothDc() throws Exception {
+        testMdcAwareSnapshot(false);
+    }
+
+    /** @throws Exception If failed. */
+    private void testMdcAwareSnapshot(boolean replicatedCache) throws 
Exception {
+        Ignite supplier = startGridWithCustomWorkdir("supplier", DC_ID_0);
+
+        IgniteEx other = startGridWithCustomWorkdir("other_dc_node", DC_ID_1);
+
+        other.cluster().state(ClusterState.ACTIVE);
+
+        fillCache(other, replicatedCache);
+
+        snp(other).createSnapshot(SNAPSHOT_NAME, null, false, 
false).get(TIMEOUT);
+
+        other.cache(CACHE).destroy();
+
+        awaitPartitionMapExchange();
+
+        Ignite demander = startGridWithCustomWorkdir("demander", DC_ID_0);
+
+        resetBaselineTopology();
+
+        TestRecordingCommunicationSpi spi = 
TestRecordingCommunicationSpi.spi(demander);
+
+        AtomicInteger supReqCnt = new AtomicInteger();
+        AtomicInteger otherReqCnt = new AtomicInteger();
+
+        spi.blockMessages((node, message) -> {
+            if (message instanceof SnapshotFilesRequestMessage) {
+                if (node.id().equals(supplier.cluster().localNode().id()))
+                    supReqCnt.incrementAndGet();
+                else if (node.id().equals(other.cluster().localNode().id()))
+                    otherReqCnt.incrementAndGet();
+            }
+
+            return false;
+        });
+
+        other.snapshot().restoreSnapshot(SNAPSHOT_NAME, 
Collections.singleton(CACHE)).get(60_000);
+
+        assertTrue(supReqCnt.get() > 0);
+        assertEquals(!replicatedCache, otherReqCnt.get() > 0);
+
+        assertCacheKeys(other.cache(CACHE), CACHE_KEYS_RANGE);
+    }
+
+    /** */
+    private void fillCache(IgniteEx ignite, boolean replicatedCache) {
+        CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(CACHE);
+
+        if (replicatedCache)
+            ccfg.setCacheMode(CacheMode.REPLICATED); // Fill all nodes with 
partitions.
+
+        ignite.createCache(ccfg);
+
+        try (IgniteDataStreamer<Integer, Object> ds = 
ignite.dataStreamer(ccfg.getName())) {
+            for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+                ds.addData(i, valueBuilder().apply(i));
+        }
+    }
+
+    /** */
+    private IgniteEx startGridWithCustomWorkdir(String instanceName, String 
dcId) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(instanceName)
+            
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId));
+
+        cfg.setWorkDirectory(Paths.get(defaultWorkDirectory(), 
U.maskForFileName(instanceName)).toString());
+
+        return startGrid(cfg);
+    }
+
+    /** */
+    protected static void cleanupDedicatedPersistenceDirs() {
+        try (DirectoryStream<Path> ds = 
Files.newDirectoryStream(Path.of(defaultWorkDirectory()))) {
+            for (Path dir : ds)
+                U.delete(dir);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index ce1ab028910..a3c2567e3d2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteMdcTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRollingUpgradeTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
@@ -51,6 +52,7 @@ public class IgniteSnapshotTestSuite {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteSnapshotManagerSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteClusterSnapshotSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteSnapshotRemoteRequestTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgniteSnapshotRestoreFromRemoteMdcTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteClusterSnapshotCheckTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteSnapshotWithMetastorageTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotMXBeanTest.class, 
ignoredTests);

Reply via email to