This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0ab09d603da IGNITE-26479 Fix data node filter update when changing
node version (#6651)
0ab09d603da is described below
commit 0ab09d603da216c91c55fc30ff5459d16faad6bf
Author: Denis Chudov <[email protected]>
AuthorDate: Sat Oct 11 01:12:31 2025 +0300
IGNITE-26479 Fix data node filter update when changing node version (#6651)
---
modules/compatibility-tests/build.gradle | 2 +
...tionRaftLogOnAnotherNodesCompatibilityTest.java | 25 ++-
.../internal/ItDataNodesCompatibilityTest.java | 165 +++++++++++++++++
.../org/apache/ignite/internal/IgniteCluster.java | 9 +
...niteDistributionZoneManagerNodeRestartTest.java | 35 +---
.../rebalance/ItRebalanceDistributedTest.java | 1 +
.../distributionzones/DataNodesManager.java | 198 ++++++++++++++++++---
.../distributionzones/DataNodesMapSerializer.java | 69 +++++++
.../distributionzones/DistributionZoneManager.java | 51 ++----
.../distributionzones/DistributionZonesUtil.java | 25 ++-
.../rebalance/DistributionZoneRebalanceEngine.java | 24 +--
.../DistributionZoneRebalanceEngineV2.java | 24 +--
.../BaseDistributionZoneManagerTest.java | 3 +
.../distributionzones/DataNodesManagerTest.java | 7 +-
.../DistributionZoneRebalanceEngineTest.java | 2 -
.../internal/metastorage/dsl/Operations.java | 2 +-
.../MetastorageCommandsCompatibilityTest.java | 2 -
.../partition/replicator/fixtures/Node.java | 1 +
.../runner/app/ItIgniteNodeRestartTest.java | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
20 files changed, 503 insertions(+), 144 deletions(-)
diff --git a/modules/compatibility-tests/build.gradle
b/modules/compatibility-tests/build.gradle
index 8978c90ccb0..73179666392 100644
--- a/modules/compatibility-tests/build.gradle
+++ b/modules/compatibility-tests/build.gradle
@@ -42,6 +42,8 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-client')
+ integrationTestImplementation project(':ignite-catalog')
+ integrationTestImplementation project(':ignite-distribution-zones')
integrationTestImplementation project(':ignite-runner')
integrationTestImplementation project(':ignite-cli')
integrationTestImplementation project(':ignite-page-memory')
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItApplyPartitionRaftLogOnAnotherNodesCompatibilityTest.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItApplyPartitionRaftLogOnAnotherNodesCompatibilityTest.java
index 7976df6edfa..4fdd41c6feb 100644
---
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItApplyPartitionRaftLogOnAnotherNodesCompatibilityTest.java
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItApplyPartitionRaftLogOnAnotherNodesCompatibilityTest.java
@@ -17,11 +17,16 @@
package org.apache.ignite.internal;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.function.Predicate;
import org.apache.ignite.Ignite;
-import org.junit.jupiter.api.Disabled;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
@@ -64,7 +69,6 @@ public class
ItApplyPartitionRaftLogOnAnotherNodesCompatibilityTest extends Comp
});
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26479")
@Test
void testIncreaseReplicas() throws Exception {
cluster.stop();
@@ -74,9 +78,24 @@ public class
ItApplyPartitionRaftLogOnAnotherNodesCompatibilityTest extends Comp
sql(String.format("ALTER ZONE %s SET REPLICAS=3,
DATA_NODES_FILTER='$..*'", ZONE_NAME));
// Let's wait for replication to complete on other nodes.
- Thread.sleep(3_000);
+ waitForZoneState(ZONE_NAME, zone -> zone.replicas() == 3);
assertThat(sql(cluster.node(1), String.format("SELECT * FROM %s",
TABLE_NAME)), hasSize(10));
assertThat(sql(cluster.node(2), String.format("SELECT * FROM %s",
TABLE_NAME)), hasSize(10));
}
+
+ private void waitForZoneState(String zoneName,
Predicate<CatalogZoneDescriptor> predicate)
+ throws InterruptedException {
+ assertTrue(waitForCondition(() -> {
+ boolean tested = true;
+
+ for (Ignite n : cluster.nodes()) {
+ IgniteImpl node = unwrapIgniteImpl(n);
+ CatalogZoneDescriptor zone =
node.catalogManager().activeCatalog(node.clock().currentLong()).zone(zoneName);
+ tested = tested && predicate.test(zone);
+ }
+
+ return tested;
+ }, 10_0000));
+ }
}
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItDataNodesCompatibilityTest.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItDataNodesCompatibilityTest.java
new file mode 100644
index 00000000000..6eb5258e9ff
--- /dev/null
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/ItDataNodesCompatibilityTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** Compatibility tests for data nodes and data nodes history. */
+@ParameterizedClass
+@MethodSource("baseVersions")
+public class ItDataNodesCompatibilityTest extends CompatibilityTestBase {
+ private static final String ZONE_NAME = "TEST_ZONE";
+
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ @Override
+ protected int nodesCount() {
+ return 3;
+ }
+
+ @Override
+ protected boolean restartWithCurrentEmbeddedVersion() {
+ return false;
+ }
+
+ @Override
+ protected void setupBaseVersion(Ignite baseIgnite) {
+ String createZoneDdl = String.format(
+ "CREATE ZONE %s WITH PARTITIONS=1, REPLICAS=1,
STORAGE_PROFILES='default', DATA_NODES_FILTER='$[?(@.nodeIndex == \"0\")]'",
+ ZONE_NAME
+ );
+
+ sql(baseIgnite, createZoneDdl);
+
+ sql(baseIgnite, String.format("CREATE TABLE %s(ID INT PRIMARY KEY, VAL
VARCHAR) ZONE %s", TABLE_NAME, ZONE_NAME));
+
+ String insertDml = String.format("INSERT INTO %s (ID, VAL) VALUES (?,
?) ", TABLE_NAME);
+
+ baseIgnite.transactions().runInTransaction(tx -> {
+ for (int i = 0; i < 10; i++) {
+ sql(baseIgnite, tx, insertDml, i, "str_" + i);
+ }
+ });
+ }
+
+ /**
+ * Tests that data nodes are correctly recovered on the newer version of
cluster.
+ *
+ * <ul>
+ * <li>write data nodes on old cluster, in legacy format, with some
data nodes filter</li>
+ * <li>start new version of cluster, check that data nodes are
recovered correctly</li>
+ * <li>changes filter to default and waits for data nodes
recalculation</li>
+ * <li>changes the filter once again, to another filter, waits for
data nodes recalculation</li>
+ * <li>restarts the cluster once again</li>
+ * <li>checks that the latest data nodes version is still active</li>
+ * </ul>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void testDataNodesChange() throws Exception {
+ cluster.stop();
+
+ int initialNodesCount = nodesCount();
+ cluster.startEmbedded(initialNodesCount, false);
+
+ IgniteImpl node = unwrapIgniteImpl(cluster.node(0));
+ int zoneId =
node.catalogManager().activeCatalog(node.clock().currentLong()).zone(ZONE_NAME).id();
+
+ log.info("Test: zoneId=" + zoneId);
+
+ Set<String> dataNodes = dataNodes(node, zoneId);
+
+ assertEquals(Set.of(cluster.node(0).name()), dataNodes);
+
+ log.info("Test: changing filter to default.");
+
+ sql(String.format(
+ "ALTER ZONE %s SET DATA_NODES_FILTER='%s'",
+ ZONE_NAME,
+ DEFAULT_FILTER
+ ));
+
+ // Let's wait for replication to complete on other nodes.
+ waitForZoneState(ZONE_NAME, zone ->
zone.filter().equals(DEFAULT_FILTER));
+
+ waitForCondition(() -> dataNodes(node, zoneId).size() == nodesCount(),
10_000);
+
+ log.info("Test: changing filter.");
+
+ String newFilter = "$[?(@.nodeIndex == \"1\")]";
+ sql(String.format(
+ "ALTER ZONE %s SET DATA_NODES_FILTER='%s'",
+ ZONE_NAME,
+ newFilter
+ ));
+
+ // Let's wait for replication to complete on other nodes.
+ waitForZoneState(ZONE_NAME, zone -> zone.filter().equals(newFilter));
+
+ waitForCondition(() -> dataNodes(node,
zoneId).equals(Set.of(cluster.node(1).name())), 10_000);
+
+ // Check that we read the new data nodes after one more restart.
+ cluster.stop();
+
+ cluster.startEmbedded(initialNodesCount, false);
+
+ IgniteImpl restartedNode = unwrapIgniteImpl(cluster.node(0));
+
+ Set<String> dataNodesAfterRestart = dataNodes(restartedNode, zoneId);
+
+ assertEquals(Set.of(cluster.node(1).name()), dataNodesAfterRestart);
+ }
+
+ private static Set<String> dataNodes(IgniteImpl node, int zoneId) {
+ CompletableFuture<Set<String>> fut =
node.distributionZoneManager().currentDataNodes(zoneId);
+ assertThat(fut, willCompleteSuccessfully());
+ return fut.join();
+ }
+
+ private void waitForZoneState(String zoneName,
Predicate<CatalogZoneDescriptor> predicate)
+ throws InterruptedException {
+ assertTrue(waitForCondition(() -> {
+ boolean tested = true;
+
+ for (Ignite n : cluster.nodes()) {
+ IgniteImpl node = unwrapIgniteImpl(n);
+ CatalogZoneDescriptor zone =
node.catalogManager().activeCatalog(node.clock().currentLong()).zone(zoneName);
+ tested = tested && predicate.test(zone);
+ }
+
+ return tested;
+ }, 10_0000));
+ }
+}
diff --git
a/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
b/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
index 594ea267cf9..6413cba72aa 100644
---
a/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
+++
b/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
@@ -285,6 +285,15 @@ public class IgniteCluster {
return
clusterConfiguration.nodeNamingStrategy().nodeName(clusterConfiguration,
nodeIndex);
}
+ /**
+ * Returns cluster nodes.
+ *
+ * @return Cluster nodes.
+ */
+ public List<Ignite> nodes() {
+ return nodes;
+ }
+
private ServerRegistration startEmbeddedNode(
@Nullable TestInfo testInfo,
int nodeIndex,
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 8f610f9dff2..3673a1138f9 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -72,7 +72,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -310,6 +309,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
DistributionZoneManager distributionZoneManager = new
DistributionZoneManager(
name,
+ () -> clusterSvc.topologyService().localMember().id(),
revisionUpdater,
metastore,
logicalTopologyService,
@@ -384,39 +384,6 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
startScaleDownBlocking = false;
}
- @Test
- public void testNodeAttributesRestoredAfterRestart() throws Exception {
- PartialNode node = startPartialNode(0);
-
- node.logicalTopology().putNode(A);
- node.logicalTopology().putNode(B);
- node.logicalTopology().putNode(C);
-
- createZone(getCatalogManager(node), ZONE_NAME, IMMEDIATE_TIMER_VALUE,
IMMEDIATE_TIMER_VALUE, null, DEFAULT_STORAGE_PROFILE);
-
- int zoneId = getZoneId(node, ZONE_NAME);
-
- DistributionZoneManager distributionZoneManager =
getDistributionZoneManager(node);
- CatalogManager catalogManager = getCatalogManager(node);
-
- assertDataNodesFromManager(distributionZoneManager,
metastore::appliedRevision, catalogManager::latestCatalogVersion, zoneId,
- Set.of(A, B, C), TIMEOUT_MILLIS);
-
- Map<UUID, NodeWithAttributes> nodeAttributesBeforeRestart =
distributionZoneManager.nodesAttributes();
-
- node.stop();
-
- node = startPartialNode(0);
-
- distributionZoneManager = getDistributionZoneManager(node);
-
- Map<UUID, NodeWithAttributes> nodeAttributesAfterRestart =
distributionZoneManager.nodesAttributes();
-
- assertEquals(3, nodeAttributesAfterRestart.size());
-
- assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
- }
-
@Test
public void testLogicalTopologyRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index f1dd90ab055..80f853be282 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1556,6 +1556,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
distributionZoneManager = new DistributionZoneManager(
name,
+ () -> clusterService.topologyService().localMember().id(),
registry,
metaStorageManager,
logicalTopologyService,
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
index cd3bad009bc..5cd91acfc7e 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
@@ -33,16 +33,19 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodeHistoryContextFromValues;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeNodesAttributes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.nodeNames;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonePartitionResetTimerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesNodesAttributes;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractZoneId;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
@@ -57,6 +60,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static org.apache.ignite.internal.util.CollectionUtils.union;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -65,6 +69,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,6 +77,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@@ -79,6 +85,7 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
+import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
@@ -102,11 +109,14 @@ import
org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.OperationType;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.Lazy;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;
@@ -148,7 +158,7 @@ public class DataNodesManager {
/** Executor for scheduling tasks for scale up and scale down processes. */
private final StripedScheduledThreadPoolExecutor executor;
- private final String localNodeName;
+ private final Lazy<UUID> localNodeId;
private final WatchListener scaleUpTimerPrefixListener;
@@ -162,10 +172,13 @@ public class DataNodesManager {
private final IntSupplier partitionDistributionResetTimeoutSupplier;
+ private final Supplier<Set<NodeWithAttributes>>
latestLogicalTopologyProvider;
+
/**
* Constructor.
*
* @param nodeName Local node name.
+ * @param nodeIdSupplier Node id supplier.
* @param busyLock External busy lock.
* @param metaStorageManager Meta storage manager.
* @param catalogManager Catalog manager.
@@ -173,24 +186,28 @@ public class DataNodesManager {
* @param failureProcessor Failure processor.
* @param partitionResetClosure Closure to reset partitions.
* @param partitionDistributionResetTimeoutSupplier Supplier for partition
distribution reset timeout.
+ * @param latestLogicalTopologyProvider Provider of the latest logical
topology.
*/
public DataNodesManager(
String nodeName,
+ Supplier<UUID> nodeIdSupplier,
IgniteSpinBusyLock busyLock,
MetaStorageManager metaStorageManager,
CatalogManager catalogManager,
ClockService clockService,
FailureProcessor failureProcessor,
BiConsumer<Long, Integer> partitionResetClosure,
- IntSupplier partitionDistributionResetTimeoutSupplier
+ IntSupplier partitionDistributionResetTimeoutSupplier,
+ Supplier<Set<NodeWithAttributes>> latestLogicalTopologyProvider
) {
this.metaStorageManager = metaStorageManager;
this.catalogManager = catalogManager;
this.clockService = clockService;
this.failureProcessor = failureProcessor;
- this.localNodeName = nodeName;
+ this.localNodeId = new Lazy<>(nodeIdSupplier);
this.partitionResetClosure = partitionResetClosure;
this.partitionDistributionResetTimeoutSupplier =
partitionDistributionResetTimeoutSupplier;
+ this.latestLogicalTopologyProvider = latestLogicalTopologyProvider;
this.busyLock = busyLock;
executor = createZoneManagerExecutor(
@@ -223,10 +240,13 @@ public class DataNodesManager {
allKeys.add(zoneScaleUpTimerKey(zone.id()));
allKeys.add(zoneScaleDownTimerKey(zone.id()));
allKeys.add(zonePartitionResetTimerKey(zone.id()));
+ allKeys.add(zoneDataNodesKey(zone.id()));
descriptors.put(zone.id(), zone);
}
+ List<CompletableFuture<?>> legacyInitFutures = new ArrayList<>();
+
return metaStorageManager.getAll(allKeys)
.thenAccept(entriesMap -> {
for (CatalogZoneDescriptor zone : descriptors.values()) {
@@ -234,10 +254,19 @@ public class DataNodesManager {
Entry scaleUpEntry =
entriesMap.get(zoneScaleUpTimerKey(zone.id()));
Entry scaleDownEntry =
entriesMap.get(zoneScaleDownTimerKey(zone.id()));
Entry partitionResetEntry =
entriesMap.get(zonePartitionResetTimerKey(zone.id()));
+ Entry legacyDataNodesEntry =
entriesMap.get(zoneDataNodesKey(zone.id()));
if (missingEntry(historyEntry)) {
- // Not critical because if we have no history in
this map, we look into meta storage.
- LOG.warn("Couldn't recover data nodes history for
zone [id={}, historyEntry={}].", zone.id(), historyEntry);
+ if (missingEntry(legacyDataNodesEntry)) {
+ // Not critical because if we have no history
in this map, we look into meta storage.
+ LOG.warn("Couldn't recover data nodes history
for zone [id={}, historyEntry={}].", zone.id(), historyEntry);
+ } else {
+
legacyInitFutures.add(initZoneWithLegacyDataNodes(
+ zone,
+ legacyDataNodesEntry.value(),
+ recoveryRevision
+ ));
+ }
continue;
}
@@ -258,7 +287,17 @@ public class DataNodesManager {
onScaleDownTimerChange(zone, scaleDownTimer);
restorePartitionResetTimer(zone.id(), scaleDownTimer,
recoveryRevision);
}
- });
+ })
+ .thenCompose(unused -> allOf(legacyInitFutures)
+ .handle((v, e) -> {
+ if (e != null) {
+ LOG.warn("Could not recover legacy data nodes
for zone.", e);
+ }
+
+ return nullCompletedFuture();
+ })
+ )
+ .thenAccept(unused -> { /* No-op. */ });
}
private static boolean missingEntry(Entry e) {
@@ -324,7 +363,8 @@ public class DataNodesManager {
newLogicalTopology,
oldLogicalTopology,
dataNodesHistoryContext
- ))
+ )),
+ false
);
}
@@ -369,7 +409,7 @@ public class DataNodesManager {
.collect(toSet());
Set<NodeWithAttributes> removedNodes =
latestDataNodes.dataNodes().stream()
- .filter(node -> !newLogicalTopology.contains(node) &&
!Objects.equals(node.nodeName(), localNodeName))
+ .filter(node -> !newLogicalTopology.contains(node) &&
!Objects.equals(node.nodeId(), localNodeId.get()))
.filter(node -> !scaleDownTimer.nodes().contains(node))
.collect(toSet());
@@ -408,7 +448,7 @@ public class DataNodesManager {
)
);
- List<Operation> operations = List.of(
+ List<Operation> operations = operations(
addNewEntryToDataNodesHistory(zoneId, dataNodesHistory,
currentDataNodes.timestamp(),
currentDataNodes.dataNodes(), addMandatoryEntry),
renewTimer(zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave),
@@ -518,7 +558,8 @@ public class DataNodesManager {
timestamp,
logicalTopology,
dataNodesHistoryContext
- ))
+ )),
+ true
);
}
@@ -549,7 +590,7 @@ public class DataNodesManager {
return DataNodesHistoryMetaStorageOperation.builder()
.zoneId(zoneId)
.condition(dataNodesHistoryEqualToOrNotExists(zoneId,
dataNodesHistory))
- .operations(List.of(
+ .operations(operations(
addNewEntryToDataNodesHistory(zoneId,
dataNodesHistory, timestamp, dataNodes),
clearTimer(zoneScaleUpTimerKey(zoneId)),
clearTimer(zoneScaleDownTimerKey(zoneId)),
@@ -587,7 +628,8 @@ public class DataNodesManager {
zoneDescriptor,
timestamp,
dataNodesHistoryContext
- ))
+ )),
+ true
);
}
@@ -638,7 +680,7 @@ public class DataNodesManager {
)
);
- List<Operation> operations = List.of(
+ List<Operation> operations = operations(
addNewEntryToDataNodesHistory(zoneId, dataNodesHistory,
currentDataNodes.timestamp(),
currentDataNodes.dataNodes()),
renewTimer(zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave),
@@ -686,7 +728,8 @@ public class DataNodesManager {
return () -> doOperation(
zoneDescriptor,
List.of(zoneDataNodesHistoryKey(zoneId),
scheduledTimer.metaStorageKey()),
- dataNodesHistoryContext -> applyTimerClosure0(zoneDescriptor,
scheduledTimer, dataNodesHistoryContext)
+ dataNodesHistoryContext -> applyTimerClosure0(zoneDescriptor,
scheduledTimer, dataNodesHistoryContext),
+ true
);
}
@@ -745,7 +788,7 @@ public class DataNodesManager {
)
)
.operations(
- List.of(
+ operations(
addNewEntryToDataNodesHistory(
zoneId,
dataNodesHistory,
@@ -992,6 +1035,21 @@ public class DataNodesManager {
return completedFuture(dataNodeHistoryContextFromValues(entries));
}
+ private CompletableFuture<DataNodesHistoryContext>
ensureContextIsPresentAndInitZoneIfNeeded(
+ @Nullable DataNodesHistoryContext context,
+ List<ByteArray> keys,
+ int zoneId
+ ) {
+ if (context == null) {
+ // Probably this is a transition from older version of cluster,
need to initialize zone according to the
+ // current set of meta storage entries.
+ return initZone(zoneId)
+ .thenCompose(ignored -> getDataNodeHistoryContextMs(keys));
+ } else {
+ return completedFuture(context);
+ }
+ }
+
@Nullable
private static <T> T deserializeEntry(@Nullable Entry e, Function<byte[],
T> deserializer) {
if (e == null || e.value() == null || e.empty() || e.tombstone()) {
@@ -1007,14 +1065,16 @@ public class DataNodesManager {
* @param zone Zone descriptor.
* @param keysToRead Keys to read from meta storage, to get {@link
DataNodesHistoryContext}.
* @param operation Operation.
+ * @param ensureContextIsPresent If true, then ensures that {@link
DataNodesHistoryContext} is not {@code null}.
* @return Future reflecting the completion of meta storage operation.
*/
private CompletableFuture<Void> doOperation(
CatalogZoneDescriptor zone,
List<ByteArray> keysToRead,
- Function<DataNodesHistoryContext,
CompletableFuture<DataNodesHistoryMetaStorageOperation>> operation
+ Function<DataNodesHistoryContext,
CompletableFuture<DataNodesHistoryMetaStorageOperation>> operation,
+ boolean ensureContextIsPresent
) {
- return msInvokeWithRetry(msGetter ->
msGetter.get(keysToRead).thenCompose(operation), zone);
+ return msInvokeWithRetry(msGetter ->
msGetter.get(keysToRead).thenCompose(operation), zone, ensureContextIsPresent);
}
private CompletableFuture<Void> msInvokeWithRetry(
@@ -1022,9 +1082,10 @@ public class DataNodesManager {
DataNodeHistoryContextMetaStorageGetter,
CompletableFuture<DataNodesHistoryMetaStorageOperation>
> metaStorageOperationSupplier,
- CatalogZoneDescriptor zone
+ CatalogZoneDescriptor zone,
+ boolean ensureContextIsPresent
) {
- return msInvokeWithRetry(metaStorageOperationSupplier,
MAX_ATTEMPTS_ON_RETRY, zone);
+ return msInvokeWithRetry(metaStorageOperationSupplier,
MAX_ATTEMPTS_ON_RETRY, zone, ensureContextIsPresent);
}
/**
@@ -1042,7 +1103,8 @@ public class DataNodesManager {
CompletableFuture<DataNodesHistoryMetaStorageOperation>
> metaStorageOperationSupplier,
int attemptsLeft,
- CatalogZoneDescriptor zone
+ CatalogZoneDescriptor zone,
+ boolean ensureContextIsPresent
) {
if (attemptsLeft <= 0) {
throw new AssertionError("Failed to perform meta storage invoke,
maximum number of attempts reached [zone=" + zone + "].");
@@ -1050,10 +1112,14 @@ public class DataNodesManager {
// Get locally on the first attempt, otherwise it means that invoke
has failed because of different value in meta storage,
// so we need to retrieve the value from meta storage.
- DataNodeHistoryContextMetaStorageGetter msGetter = attemptsLeft ==
MAX_ATTEMPTS_ON_RETRY
+ DataNodeHistoryContextMetaStorageGetter msGetter0 = attemptsLeft ==
MAX_ATTEMPTS_ON_RETRY
? this::getDataNodeHistoryContextMsLocally
: this::getDataNodeHistoryContextMs;
+ DataNodeHistoryContextMetaStorageGetter msGetter =
ensureContextIsPresent
+ ? keys -> msGetter0.get(keys).thenCompose(context ->
ensureContextIsPresentAndInitZoneIfNeeded(context, keys, zone.id()))
+ : msGetter0;
+
CompletableFuture<DataNodesHistoryMetaStorageOperation>
metaStorageOperationFuture =
metaStorageOperationSupplier.apply(msGetter);
@@ -1070,7 +1136,12 @@ public class DataNodesManager {
return nullCompletedFuture();
} else {
- return
msInvokeWithRetry(metaStorageOperationSupplier, attemptsLeft - 1, zone);
+ return msInvokeWithRetry(
+ metaStorageOperationSupplier,
+ attemptsLeft - 1,
+ zone,
+ ensureContextIsPresent
+ );
}
})
.whenComplete((v, e) -> {
@@ -1100,6 +1171,53 @@ public class DataNodesManager {
int zoneId,
HybridTimestamp timestamp,
Set<NodeWithAttributes> dataNodes
+ ) {
+ return initZone(zoneId, timestamp, dataNodes, false);
+ }
+
+ private CompletableFuture<?> initZoneWithLegacyDataNodes(
+ CatalogZoneDescriptor zone,
+ byte[] legacyDataNodesBytes,
+ long recoveryRevision
+ ) {
+ Entry nodeAttributesEntry =
metaStorageManager.getLocally(zonesNodesAttributes(), recoveryRevision);
+ Map<UUID, NodeWithAttributes> nodesAttributes =
deserializeNodesAttributes(nodeAttributesEntry.value());
+
+ Set<NodeWithAttributes> unfilteredDataNodes =
DataNodesMapSerializer.deserialize(legacyDataNodesBytes).keySet().stream()
+ .map(node -> {
+ NodeWithAttributes nwa =
nodesAttributes.get(node.nodeId());
+
+ if (nwa == null) {
+ return new NodeWithAttributes(node.nodeName(),
node.nodeId(), null);
+ } else {
+ Map<String, String> userAttributes =
nwa.userAttributes();
+ List<String> storageProfiles = nwa.storageProfiles();
+ return new NodeWithAttributes(node.nodeName(),
node.nodeId(), userAttributes, storageProfiles);
+ }
+ })
+ .collect(toSet());
+
+ Set<NodeWithAttributes> dataNodes =
filterDataNodes(unfilteredDataNodes, zone);
+
+ LOG.info("Recovering data nodes of distribution zone from legacy data
nodes [zoneId={}, unfilteredDataNodes={}, "
+ + "filter='{}', dataNodes={}]", zone.id(),
nodeNames(unfilteredDataNodes), zone.filter(), nodeNames(dataNodes));
+
+ return initZone(zone.id(), clockService.current(), dataNodes, true);
+ }
+
+ private CompletableFuture<?> initZone(int zoneId) {
+ CatalogZoneDescriptor zone = zoneDescriptor(zoneId);
+ Set<NodeWithAttributes> topologyNodes =
latestLogicalTopologyProvider.get();
+ Set<NodeWithAttributes> filteredNodes = filterDataNodes(topologyNodes,
zone);
+
+ return initZone(zoneId, clockService.now(), filteredNodes, true);
+ }
+
+ private CompletableFuture<?> initZone(
+ int zoneId,
+ HybridTimestamp timestamp,
+ Set<NodeWithAttributes> dataNodes,
+ boolean removeLegacyDataNodes
) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
@@ -1112,12 +1230,14 @@ public class DataNodesManager {
notTombstone(zoneDataNodesHistoryKey(zoneId))
);
- Update update = ops(
+ Update update = new Operations(operations(
addNewEntryToDataNodesHistory(zoneId, new
DataNodesHistory(), timestamp, dataNodes),
clearTimer(zoneScaleUpTimerKey(zoneId)),
clearTimer(zoneScaleDownTimerKey(zoneId)),
- clearTimer(zonePartitionResetTimerKey(zoneId))
- ).yield(true);
+ clearTimer(zonePartitionResetTimerKey(zoneId)),
+ removeLegacyDataNodes ? remove(zoneDataNodesKey(zoneId)) :
noop(),
+ removeLegacyDataNodes ? remove(zonesNodesAttributes()) :
noop()
+ )).yield(true);
Iif iif = iif(condition, update, ops().yield(false));
@@ -1306,6 +1426,34 @@ public class DataNodesManager {
return nullCompletedFuture();
}
+ private CatalogZoneDescriptor zoneDescriptor(int zoneId) {
+ CatalogZoneDescriptor zone =
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneId);
+
+ if (zone == null) {
+ throw new DistributionZoneNotFoundException(zoneId);
+ }
+
+ return zone;
+ }
+
+ /**
+ * Utility method that creates list of operations filtering out NO_OP
operations.
+ *
+ * @param operations Operations.
+ * @return Operations list.
+ */
+ private static List<Operation> operations(Operation... operations) {
+ List<Operation> res = new ArrayList<>();
+
+ for (Operation op : operations) {
+ if (op.type() != OperationType.NO_OP) {
+ res.add(op);
+ }
+ }
+
+ return res;
+ }
+
@TestOnly
public ZoneTimers zoneTimers(int zoneId) {
return zoneTimers.computeIfAbsent(zoneId, k -> new ZoneTimers(zoneId,
executor));
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
new file mode 100644
index 00000000000..e0c51e8313b
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for data nodes' maps. Deprecated, {@link
DataNodesHistorySerializer} should be used instead,
+ * preserved for backward compatibility.
+ */
+@Deprecated
+public class DataNodesMapSerializer extends VersionedSerializer<Map<Node,
Integer>> {
+ /** Serializer instance. */
+ public static final DataNodesMapSerializer INSTANCE = new
DataNodesMapSerializer();
+
+ private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(Map<Node, Integer> map, IgniteDataOutput
out) throws IOException {
+ throw new UnsupportedOperationException("Data nodes map is a legacy
structure that should not be used anymore.");
+ }
+
+ @Override
+ protected Map<Node, Integer> readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ int size = in.readVarIntAsInt();
+
+ Map<Node, Integer> map = new HashMap<>(IgniteUtils.capacity(size));
+ for (int i = 0; i < size; i++) {
+ Node node = nodeSerializer.readExternal(in);
+ int count = in.readVarIntAsInt();
+
+ map.put(node, count);
+ }
+
+ return map;
+ }
+
+ /**
+ * Deserializes a map from bytes.
+ *
+ * @param bytes Bytes.
+ */
+ public static Map<Node, Integer> deserialize(byte[] bytes) {
+ return VersionedSerialization.fromBytes(bytes, INSTANCE);
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 47f4949fe25..365f8ae812a 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -40,7 +40,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesNodesAttributes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesRecoverableStateRevision;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -69,6 +68,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
@@ -150,15 +150,6 @@ public class DistributionZoneManager extends
*/
private final ConcurrentSkipListMap<Long, Set<NodeWithAttributes>>
logicalTopologyByRevision = new ConcurrentSkipListMap<>();
- /**
- * Local mapping of {@code nodeId} -> node's attributes, where {@code
nodeId} is a node id, that changes between restarts.
- * This map is updated every time we receive a topology event in a {@code
topologyWatchListener}.
- * TODO: https://issues.apache.org/jira/browse/IGNITE-24608 properly clean
up this map
- *
- * @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/distribution-zones/tech-notes/filters.md">Filter
documentation</a>
- */
- private Map<UUID, NodeWithAttributes> nodesAttributes = new
ConcurrentHashMap<>();
-
/** Watch listener for logical topology keys. */
private final WatchListener topologyWatchListener;
@@ -190,6 +181,7 @@ public class DistributionZoneManager extends
@TestOnly
public DistributionZoneManager(
String nodeName,
+ Supplier<UUID> nodeIdSupplier,
RevisionListenerRegistry registry,
MetaStorageManager metaStorageManager,
LogicalTopologyService logicalTopologyService,
@@ -200,6 +192,7 @@ public class DistributionZoneManager extends
) {
this(
nodeName,
+ nodeIdSupplier,
registry,
metaStorageManager,
logicalTopologyService,
@@ -216,6 +209,7 @@ public class DistributionZoneManager extends
* Creates a new distribution zone manager.
*
* @param nodeName Node name.
+ * @param nodeIdSupplier Node id supplier.
* @param registry Registry for versioned values.
* @param metaStorageManager Meta Storage manager.
* @param logicalTopologyService Logical topology service.
@@ -226,6 +220,7 @@ public class DistributionZoneManager extends
*/
public DistributionZoneManager(
String nodeName,
+ Supplier<UUID> nodeIdSupplier,
RevisionListenerRegistry registry,
MetaStorageManager metaStorageManager,
LogicalTopologyService logicalTopologyService,
@@ -266,13 +261,15 @@ public class DistributionZoneManager extends
dataNodesManager = new DataNodesManager(
nodeName,
+ nodeIdSupplier,
busyLock,
metaStorageManager,
catalogManager,
clockService,
failureProcessor,
this::fireTopologyReduceLocalEvent,
- partitionDistributionResetTimeoutConfiguration::currentValue
+ partitionDistributionResetTimeoutConfiguration::currentValue,
+ this::logicalTopology
);
this.metricManager = metricManager;
@@ -378,6 +375,10 @@ public class DistributionZoneManager extends
return dataNodesManager.dataNodes(zoneId, timestamp, catalogVersion);
}
+ public static Set<Node> dataNodes(Map<Node, Integer> dataNodesMap) {
+ return dataNodesMap.entrySet().stream().filter(e -> e.getValue() >
0).map(Map.Entry::getKey).collect(toSet());
+ }
+
private CompletableFuture<Void>
onUpdateScaleUpBusy(AlterZoneEventParameters parameters) {
HybridTimestamp timestamp =
metaStorageManager.timestampByRevisionLocally(parameters.causalityToken());
@@ -523,25 +524,13 @@ public class DistributionZoneManager extends
private void restoreGlobalStateFromLocalMetaStorage(long recoveryRevision)
{
Entry lastHandledTopologyEntry =
metaStorageManager.getLocally(zonesLastHandledTopology(), recoveryRevision);
- Entry nodeAttributesEntry =
metaStorageManager.getLocally(zonesNodesAttributes(), recoveryRevision);
-
if (lastHandledTopologyEntry.value() != null) {
- // We save zonesLastHandledTopology and zonesNodesAttributes in
Meta Storage in a one batch, so it is impossible
- // that one value is not null, but other is null.
- assert nodeAttributesEntry.value() != null;
-
logicalTopologyByRevision.put(recoveryRevision,
deserializeLogicalTopologySet(lastHandledTopologyEntry.value()));
-
- nodesAttributes =
DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value());
}
assert lastHandledTopologyEntry.value() == null
||
logicalTopology(recoveryRevision).equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
: "Initial value of logical topology was changed after
initialization from the Meta Storage manager.";
-
- assert nodeAttributesEntry.value() == null
- ||
nodesAttributes.equals(DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value()))
- : "Initial value of nodes' attributes was changed after
initialization from the Meta Storage manager.";
}
/**
@@ -605,8 +594,7 @@ public class DistributionZoneManager extends
}
/**
- * Reaction on an update of logical topology. In this method {@link
DistributionZoneManager#logicalTopology},
- * {@link DistributionZoneManager#nodesAttributes} are updated.
+ * Reaction on an update of logical topology. In this method {@link
DistributionZoneManager#logicalTopology} is updated.
* This fields are saved to Meta Storage, also timers are scheduled.
* Note that all futures of Meta Storage updates that happen in this
method are returned from this method.
*
@@ -638,8 +626,6 @@ public class DistributionZoneManager extends
futures.add(f);
}
- newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n));
-
futures.add(saveRecoverableStateToMetastorage(revision,
newLogicalTopology));
return allOf(futures.toArray(CompletableFuture[]::new));
@@ -667,7 +653,6 @@ public class DistributionZoneManager extends
Set<NodeWithAttributes> newLogicalTopology
) {
Operation[] puts = {
- put(zonesNodesAttributes(),
NodesAttributesSerializer.serialize(nodesAttributes())),
put(zonesRecoverableStateRevision(),
longToBytesKeepingOrder(revision)),
put(
zonesLastHandledTopology(),
@@ -740,16 +725,6 @@ public class DistributionZoneManager extends
additionalNodeFilter = filter;
}
- /**
- * Returns local mapping of {@code nodeId} -> node's attributes, where
{@code nodeId} is a node id, that changes between restarts.
- * This map is updated every time we receive a topology event in a {@code
topologyWatchListener}.
- *
- * @return Mapping {@code nodeId} -> node's attributes.
- */
- public Map<UUID, NodeWithAttributes> nodesAttributes() {
- return nodesAttributes;
- }
-
public Set<NodeWithAttributes> logicalTopology() {
return logicalTopology(Long.MAX_VALUE);
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index d82cd68fc04..7c12db6d81e 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -73,6 +73,10 @@ public class DistributionZonesUtil {
/** Key prefix for zone's data nodes history. */
public static final String DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX =
DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "history.";
+ /** Key prefix for zone's data nodes. Deprecated, preserved for backward
compatibility. */
+ @Deprecated
+ public static final String DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX =
DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "value.";
+
/** Key prefix for zone's data nodes history, in {@code byte[]}
representation. */
public static final byte[]
DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES =
DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX.getBytes(StandardCharsets.UTF_8);
@@ -119,7 +123,11 @@ public class DistributionZonesUtil {
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
- /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_NODES_ATTRIBUTES}. */
+ /**
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_NODES_ATTRIBUTES}.
+ * Deprecated, preserved for backward compatibility.
+ */
+ @Deprecated
private static final ByteArray DISTRIBUTION_ZONES_NODES_ATTRIBUTES_KEY =
new ByteArray(DISTRIBUTION_ZONES_NODES_ATTRIBUTES);
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_RECOVERABLE_STATE_REVISION}. */
@@ -167,6 +175,18 @@ public class DistributionZonesUtil {
return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX);
}
+ /**
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
+ * Preserved for backward compatibility.
+ *
+ * @param zoneId Zone id.
+ * @return ByteArray representation.
+ */
+ @Deprecated
+ public static ByteArray zoneDataNodesKey(int zoneId) {
+ return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX +
zoneId);
+ }
+
/**
* ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX}.
*
@@ -261,8 +281,9 @@ public class DistributionZonesUtil {
}
/**
- * The key that represents nodes' attributes in Meta Storage.
+ * The key that represents nodes' attributes in Meta Storage. Deprecated,
preserved for backward compatibility.
*/
+ @Deprecated
public static ByteArray zonesNodesAttributes() {
return DISTRIBUTION_ZONES_NODES_ATTRIBUTES_KEY;
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index e91de125c40..91cc90b8ec7 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
@@ -32,9 +31,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -46,7 +43,6 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
-import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -207,17 +203,13 @@ public class DistributionZoneRebalanceEngine {
private WatchListener createDistributionZonesDataNodesListener() {
return evt -> IgniteUtils.inBusyLockAsync(busyLock, () -> {
- Set<NodeWithAttributes> dataNodesWithAttributes =
parseDataNodes(evt.entryEvent().newEntry().value(), evt.timestamp());
+ Set<NodeWithAttributes> dataNodes =
parseDataNodes(evt.entryEvent().newEntry().value(), evt.timestamp());
- if (dataNodesWithAttributes == null) {
+ if (dataNodes == null) {
// The zone was removed so data nodes was removed too.
return nullCompletedFuture();
}
- Set<Node> dataNodes = dataNodesWithAttributes.stream()
- .map(NodeWithAttributes::node)
- .collect(toSet());
-
int zoneId = extractZoneId(evt.entryEvent().newEntry().key(),
DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES);
// It is safe to get the latest version of the catalog as we are
in the metastore thread.
@@ -235,21 +227,17 @@ public class DistributionZoneRebalanceEngine {
return nullCompletedFuture();
}
- Map<UUID, NodeWithAttributes> nodesAttributes =
distributionZoneManager.nodesAttributes();
-
- Set<String> filteredDataNodes =
nodeNames(filterDataNodes(dataNodesWithAttributes, zoneDescriptor));
+ Set<String> filteredDataNodes =
nodeNames(filterDataNodes(dataNodes, zoneDescriptor));
if (LOG.isInfoEnabled()) {
var matchedNodes = new ArrayList<NodeWithAttributes>();
var filteredOutNodes = new ArrayList<NodeWithAttributes>();
- for (Node dataNode : dataNodes) {
- NodeWithAttributes nodeWithAttributes =
nodesAttributes.get(dataNode.nodeId());
-
+ for (NodeWithAttributes dataNode : dataNodes) {
if (filteredDataNodes.contains(dataNode.nodeName())) {
- matchedNodes.add(nodeWithAttributes);
+ matchedNodes.add(dataNode);
} else {
- filteredOutNodes.add(nodeWithAttributes);
+ filteredOutNodes.add(dataNode);
}
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index fccd99868d0..3aab7a67201 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
@@ -31,9 +30,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -43,7 +40,6 @@ import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
-import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -143,17 +139,13 @@ public class DistributionZoneRebalanceEngineV2 {
private WatchListener createDistributionZonesDataNodesListener() {
return evt -> IgniteUtils.inBusyLockAsync(busyLock, () -> {
- Set<NodeWithAttributes> dataNodesWithAttributes =
parseDataNodes(evt.entryEvent().newEntry().value(), evt.timestamp());
+ Set<NodeWithAttributes> dataNodes =
parseDataNodes(evt.entryEvent().newEntry().value(), evt.timestamp());
- if (dataNodesWithAttributes == null) {
+ if (dataNodes == null) {
// The zone was removed so data nodes were removed too.
return nullCompletedFuture();
}
- Set<Node> dataNodes = dataNodesWithAttributes.stream()
- .map(NodeWithAttributes::node)
- .collect(toSet());
-
int zoneId = extractZoneId(evt.entryEvent().newEntry().key(),
DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES);
// It is safe to get the latest version of the catalog as we are
in the metastore thread.
@@ -172,21 +164,17 @@ public class DistributionZoneRebalanceEngineV2 {
return nullCompletedFuture();
}
- Map<UUID, NodeWithAttributes> nodesAttributes =
distributionZoneManager.nodesAttributes();
-
- Set<String> filteredDataNodes =
nodeNames(filterDataNodes(dataNodesWithAttributes, zoneDescriptor));
+ Set<String> filteredDataNodes =
nodeNames(filterDataNodes(dataNodes, zoneDescriptor));
if (LOG.isInfoEnabled()) {
var matchedNodes = new ArrayList<NodeWithAttributes>();
var filteredOutNodes = new ArrayList<NodeWithAttributes>();
- for (Node dataNode : dataNodes) {
- NodeWithAttributes nodeWithAttributes =
nodesAttributes.get(dataNode.nodeId());
-
+ for (NodeWithAttributes dataNode : dataNodes) {
if (filteredDataNodes.contains(dataNode.nodeName())) {
- matchedNodes.add(nodeWithAttributes);
+ matchedNodes.add(dataNode);
} else {
- filteredOutNodes.add(nodeWithAttributes);
+ filteredOutNodes.add(dataNode);
}
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index f62b8735bd2..56ad9199de3 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.catalog.CatalogManager;
@@ -100,6 +101,7 @@ public abstract class BaseDistributionZoneManagerTest
extends BaseIgniteAbstract
@BeforeEach
void setUp() throws Exception {
String nodeName = "test";
+ UUID nodeId = UUID.randomUUID();
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
@@ -130,6 +132,7 @@ public abstract class BaseDistributionZoneManagerTest
extends BaseIgniteAbstract
distributionZoneManager = new DistributionZoneManager(
nodeName,
+ () -> nodeId,
revisionUpdater,
metaStorageManager,
new LogicalTopologyServiceImpl(topology, cmgManager),
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
index 9d22d1024a8..7af0871496e 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -138,15 +139,19 @@ public class DataNodesManagerTest extends
BaseIgniteAbstractTest {
ClockService clockService = new TestClockService(clock, new
ClockWaiter(NODE_NAME, clock, scheduledExecutorService));
+ UUID nodeId = UUID.randomUUID();
+
dataNodesManager = new DataNodesManager(
NODE_NAME,
+ () -> nodeId,
new IgniteSpinBusyLock(),
metaStorageManager,
catalogManager,
clockService,
new NoOpFailureManager(),
partitionResetClosure,
- () -> 1
+ () -> 1,
+ Collections::emptySet
);
currentTopology = new HashSet<>(Set.of(A, B));
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index bea0f9f70ef..389a040e467 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -161,8 +161,6 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
id(5), new NodeWithAttributes("node5", id(5), Map.of(),
List.of(DEFAULT_STORAGE_PROFILE))
);
-
when(distributionZoneManager.nodesAttributes()).thenReturn(nodeWithAttributesMap);
-
doAnswer(invocation -> {
ByteArray key = invocation.getArgument(0);
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Operations.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Operations.java
index b274c3224b9..9ff08baffde 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Operations.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/dsl/Operations.java
@@ -35,7 +35,7 @@ public final class Operations {
private static final MetaStorageMessagesFactory MSG_FACTORY = new
MetaStorageMessagesFactory();
/** No-op operation singleton. */
- private static final Operation NO_OP =
MSG_FACTORY.operation().type(OperationType.NO_OP).key(EMPTY_BYTE_BUFFER).build();
+ private static final Operation NO_OP =
MSG_FACTORY.operation().type(OperationType.NO_OP).build();
/** Operations. */
private final List<Operation> operations;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsCompatibilityTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsCompatibilityTest.java
index 3d8e5cf8059..a9d4887c6c3 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsCompatibilityTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsCompatibilityTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -161,7 +160,6 @@ public class MetastorageCommandsCompatibilityTest extends
BaseIgniteAbstractTest
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26664")
void testMultiInvokeCommand() {
MultiInvokeCommand command = decodeCommand(
"cAzfAQxHAAAAAAAAAAAqAAAAAAAAAEXfAQnfAQvfAQgC3wEGAwVwdXQxAwMFdmFsMd8BBwMCAd8BBd8BBd8BAgMLdG9tYnN0b25lMRDfAQIDDm5vdFRvbWJzd"
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 8458a276da2..5233c985ce6 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -705,6 +705,7 @@ public class Node {
distributionZoneManager = new DistributionZoneManager(
name,
+ () -> clusterService.topologyService().localMember().id(),
registry,
metaStorageManager,
logicalTopologyService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 737109f25e5..6f3454d6d65 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -709,6 +709,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
DistributionZoneManager distributionZoneManager = new
DistributionZoneManager(
name,
+ () -> clusterSvc.topologyService().localMember().id(),
registry,
metaStorageMgr,
logicalTopologyService,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0f87090dfbc..b4086f773a5 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -993,6 +993,7 @@ public class IgniteImpl implements Ignite {
distributionZoneManager = new DistributionZoneManager(
name,
+ () -> clusterSvc.topologyService().localMember().id(),
registry,
metaStorageMgr,
logicalTopologyService,