This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 098ddecab6e IGNITE-26602 Snapshot component optimizations for MultiDC
(#12562)
098ddecab6e is described below
commit 098ddecab6e35d564f495adb3b751f6a14c3a276
Author: Anton Vinogradov <[email protected]>
AuthorDate: Wed Dec 10 15:59:15 2025 +0300
IGNITE-26602 Snapshot component optimizations for MultiDC (#12562)
---
.../snapshot/SnapshotRestoreProcess.java | 21 ++-
.../IgniteSnapshotRestoreFromRemoteMdcTest.java | 165 +++++++++++++++++++++
.../ignite/testsuites/IgniteSnapshotTestSuite.java | 2 +
3 files changed, 184 insertions(+), 4 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 70460e3b525..1686f3d7028 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -1577,7 +1577,7 @@ public class SnapshotRestoreProcess {
* @param metas Map of snapshot metadata distribution across the cluster.
* @return Map of cache partitions per each node.
*/
- private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+ private Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
Map<UUID, List<SnapshotMetadata>> metas,
BiPredicate<Integer, Integer> filter
) {
@@ -1586,10 +1586,23 @@ public class SnapshotRestoreProcess {
List<UUID> nodes = new ArrayList<>(metas.keySet());
Collections.shuffle(nodes);
- Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
- nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
+ Map<UUID, List<SnapshotMetadata>> orderedMetas = new LinkedHashMap<>();
- for (Map.Entry<UUID, List<SnapshotMetadata>> e :
shuffleMetas.entrySet()) {
+ String locDc = ctx.discovery().localNode().dataCenterId();
+
+ if (locDc != null) {
+ List<UUID> sameDcNodes = nodes.stream()
+ .map(uuid -> ctx.discovery().node(uuid))
+ .filter(node -> Objects.equals(node.dataCenterId(), locDc))
+ .map(ClusterNode::id)
+ .collect(Collectors.toList());
+
+ sameDcNodes.forEach(k -> orderedMetas.put(k, metas.get(k))); //
Getting same DC files first.
+ }
+
+ nodes.forEach(k -> orderedMetas.put(k, metas.get(k)));
+
+ for (Map.Entry<UUID, List<SnapshotMetadata>> e :
orderedMetas.entrySet()) {
UUID nodeId = e.getKey();
for (SnapshotMetadata meta :
ofNullable(e.getValue()).orElse(Collections.emptyList())) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteMdcTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteMdcTest.java
new file mode 100644
index 00000000000..9a63f2fb83b
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteMdcTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.util.IgniteUtils.defaultWorkDirectory;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteMdcTest extends
AbstractSnapshotSelfTest {
+ /** Cache. */
+ private static final String CACHE = "cache";
+
+ /** */
+ private static final String DC_ID_0 = "DC_ID_0";
+
+ /** */
+ private static final String DC_ID_1 = "DC_ID_1";
+
+ /** @throws Exception If fails. */
+ @Before
+ public void before() throws Exception {
+ cleanPersistenceDir();
+ cleanupDedicatedPersistenceDirs();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testMdcAwareSnapshotFromCurrentDc() throws Exception {
+ testMdcAwareSnapshot(true);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testMdcAwareSnapshotFromBothDc() throws Exception {
+ testMdcAwareSnapshot(false);
+ }
+
+ /** @throws Exception If failed. */
+ private void testMdcAwareSnapshot(boolean replicatedCache) throws
Exception {
+ Ignite supplier = startGridWithCustomWorkdir("supplier", DC_ID_0);
+
+ IgniteEx other = startGridWithCustomWorkdir("other_dc_node", DC_ID_1);
+
+ other.cluster().state(ClusterState.ACTIVE);
+
+ fillCache(other, replicatedCache);
+
+ snp(other).createSnapshot(SNAPSHOT_NAME, null, false,
false).get(TIMEOUT);
+
+ other.cache(CACHE).destroy();
+
+ awaitPartitionMapExchange();
+
+ Ignite demander = startGridWithCustomWorkdir("demander", DC_ID_0);
+
+ resetBaselineTopology();
+
+ TestRecordingCommunicationSpi spi =
TestRecordingCommunicationSpi.spi(demander);
+
+ AtomicInteger supReqCnt = new AtomicInteger();
+ AtomicInteger otherReqCnt = new AtomicInteger();
+
+ spi.blockMessages((node, message) -> {
+ if (message instanceof SnapshotFilesRequestMessage) {
+ if (node.id().equals(supplier.cluster().localNode().id()))
+ supReqCnt.incrementAndGet();
+ else if (node.id().equals(other.cluster().localNode().id()))
+ otherReqCnt.incrementAndGet();
+ }
+
+ return false;
+ });
+
+ other.snapshot().restoreSnapshot(SNAPSHOT_NAME,
Collections.singleton(CACHE)).get(60_000);
+
+ assertTrue(supReqCnt.get() > 0);
+ assertEquals(!replicatedCache, otherReqCnt.get() > 0);
+
+ assertCacheKeys(other.cache(CACHE), CACHE_KEYS_RANGE);
+ }
+
+ /** */
+ private void fillCache(IgniteEx ignite, boolean replicatedCache) {
+ CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName(CACHE);
+
+ if (replicatedCache)
+ ccfg.setCacheMode(CacheMode.REPLICATED); // Fill all nodes with
partitions.
+
+ ignite.createCache(ccfg);
+
+ try (IgniteDataStreamer<Integer, Object> ds =
ignite.dataStreamer(ccfg.getName())) {
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ ds.addData(i, valueBuilder().apply(i));
+ }
+ }
+
+ /** */
+ private IgniteEx startGridWithCustomWorkdir(String instanceName, String
dcId) throws Exception {
+ IgniteConfiguration cfg = getConfiguration(instanceName)
+
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId));
+
+ cfg.setWorkDirectory(Paths.get(defaultWorkDirectory(),
U.maskForFileName(instanceName)).toString());
+
+ return startGrid(cfg);
+ }
+
+ /** */
+ protected static void cleanupDedicatedPersistenceDirs() {
+ try (DirectoryStream<Path> ds =
Files.newDirectoryStream(Path.of(defaultWorkDirectory()))) {
+ for (Path dir : ds)
+ U.delete(dir);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index ce1ab028910..a3c2567e3d2 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -27,6 +27,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteMdcTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRollingUpgradeTest;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
@@ -51,6 +52,7 @@ public class IgniteSnapshotTestSuite {
GridTestUtils.addTestIfNeeded(suite,
IgniteSnapshotManagerSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteClusterSnapshotSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteSnapshotRemoteRequestTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgniteSnapshotRestoreFromRemoteMdcTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteClusterSnapshotCheckTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteSnapshotWithMetastorageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotMXBeanTest.class,
ignoredTests);