This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2405febe72 IGNITE-18240 Store logical topology as a single KV entry
instead of a number of entries (#1371)
2405febe72 is described below
commit 2405febe72fd295a1044bcdb6787b7f6a8389d7a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Nov 24 17:36:06 2022 +0400
IGNITE-18240 Store logical topology as a single KV entry instead of a
number of entries (#1371)
---
.../management/raft/RaftStorageManager.java | 128 ++++++++++++++-------
.../raft/commands/NodesLeaveCommand.java | 2 +-
.../AbstractClusterStateStorageManagerTest.java | 39 +++++++
3 files changed, 126 insertions(+), 43 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
index f3c94cbfea..77c79da236 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
@@ -18,20 +18,25 @@
package org.apache.ignite.internal.cluster.management.raft;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -42,16 +47,20 @@ class RaftStorageManager {
/** Storage key for the CMG state. */
private static final byte[] CMG_STATE_KEY = "cmg_state".getBytes(UTF_8);
- /** Prefix for the keys for logical topology nodes. */
- private static final byte[] LOGICAL_TOPOLOGY_PREFIX =
"logical_".getBytes(UTF_8);
+ /** Storage key for the logical topology. */
+ private static final byte[] LOGICAL_TOPOLOGY_KEY =
"logical".getBytes(UTF_8);
/** Prefix for validation tokens. */
private static final byte[] VALIDATED_NODE_PREFIX =
"validation_".getBytes(UTF_8);
private final ClusterStateStorage storage;
+ private volatile LogicalTopology currentLogicalTopology;
+
RaftStorageManager(ClusterStateStorage storage) {
this.storage = storage;
+
+ currentLogicalTopology = readLogicalTopology();
}
/**
@@ -79,11 +88,13 @@ class RaftStorageManager {
* Retrieves the current logical topology.
*/
Collection<ClusterNode> getLogicalTopology() {
- try (Cursor<ClusterNode> cursor =
storage.getWithPrefix(LOGICAL_TOPOLOGY_PREFIX, (k, v) -> fromBytes(v))) {
- return cursor.stream().collect(toList());
- } catch (Exception e) {
- throw new IgniteInternalException("Unable to get data from
storage", e);
- }
+ return currentLogicalTopology.nodes;
+ }
+
+ private LogicalTopology readLogicalTopology() {
+ byte[] bytes = storage.get(LOGICAL_TOPOLOGY_KEY);
+
+ return bytes == null ? LogicalTopology.INITIAL : fromBytes(bytes);
}
/**
@@ -92,16 +103,17 @@ class RaftStorageManager {
* @param node Node to save.
*/
void putLogicalTopologyNode(ClusterNode node) {
- byte[] nodeNameBytes = node.name().getBytes(UTF_8);
-
- byte[] nodeIdBytes = node.id().getBytes(UTF_8);
+ replaceLogicalTopologyWith(currentLogicalTopology.addNode(node));
+ }
- byte[] key = logicalTopologyKey(nodeNameBytes, nodeIdBytes);
+ private void replaceLogicalTopologyWith(LogicalTopology newTopology) {
+ if (newTopology == currentLogicalTopology) {
+ return;
+ }
- // Replace all nodes with the same consistent ID.
- byte[] prefix = Arrays.copyOf(key, key.length - nodeIdBytes.length);
+ storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology));
- storage.replaceAll(prefix, key, toBytes(node));
+ currentLogicalTopology = newTopology;
}
/**
@@ -110,36 +122,14 @@ class RaftStorageManager {
* @param nodes Nodes to remove.
*/
void removeLogicalTopologyNodes(Set<ClusterNode> nodes) {
- Collection<byte[]> keys = nodes.stream()
- .map(RaftStorageManager::logicalTopologyKey)
- .collect(toList());
-
- storage.removeAll(keys);
+
replaceLogicalTopologyWith(currentLogicalTopology.removeNodesByIds(nodes));
}
/**
* Returns {@code true} if a given node is present in the logical topology
or {@code false} otherwise.
*/
boolean isNodeInLogicalTopology(ClusterNode node) {
- byte[] value = storage.get(logicalTopologyKey(node));
-
- return value != null;
- }
-
- private static byte[] logicalTopologyKey(ClusterNode node) {
- byte[] nodeNameBytes = node.name().getBytes(UTF_8);
-
- byte[] nodeIdBytes = node.id().getBytes(UTF_8);
-
- return logicalTopologyKey(nodeNameBytes, nodeIdBytes);
- }
-
- private static byte[] logicalTopologyKey(byte[] nodeNameBytes, byte[]
nodeIdBytes) {
- return ByteBuffer.allocate(LOGICAL_TOPOLOGY_PREFIX.length +
nodeNameBytes.length + nodeIdBytes.length)
- .put(LOGICAL_TOPOLOGY_PREFIX)
- .put(nodeNameBytes)
- .put(nodeIdBytes)
- .array();
+ return currentLogicalTopology.containsNodeById(node);
}
/**
@@ -185,8 +175,6 @@ class RaftStorageManager {
try (cursor) {
return cursor.stream().collect(toList());
- } catch (Exception e) {
- throw new IgniteInternalException("Unable to get data from
storage", e);
}
}
@@ -207,5 +195,61 @@ class RaftStorageManager {
*/
void restoreSnapshot(Path snapshotPath) {
storage.restoreSnapshot(snapshotPath);
+
+ currentLogicalTopology = readLogicalTopology();
+ }
+
+ private static class LogicalTopology implements Serializable {
+ private static final long serialVersionUID = 0L;
+
+ private static final LogicalTopology INITIAL = new LogicalTopology(0,
emptySet());
+
+ private final long version;
+
+ @IgniteToStringInclude
+ private final Set<ClusterNode> nodes;
+
+ private LogicalTopology(long version, Collection<ClusterNode> nodes) {
+ this.version = version;
+ this.nodes = Set.copyOf(nodes);
+ }
+
+ LogicalTopology addNode(ClusterNode nodeToAdd) {
+ Map<String, ClusterNode> map =
nodes.stream().collect(toMap(ClusterNode::name, identity()));
+
+ ClusterNode oldNode = map.put(nodeToAdd.name(), nodeToAdd);
+ if (oldNode != null && oldNode.id().equals(nodeToAdd.id())) {
+ // We already have this node, nothing needs to be changed.
+ return this;
+ }
+
+ return new LogicalTopology(version + 1, map.values());
+ }
+
+ LogicalTopology removeNodesByIds(Set<ClusterNode> nodesToRemove) {
+ Map<String, ClusterNode> mapById =
nodes.stream().collect(toMap(ClusterNode::id, identity()));
+
+ int originalSize = mapById.size();
+
+ for (ClusterNode nodeToRemove : nodesToRemove) {
+ mapById.remove(nodeToRemove.id());
+ }
+
+ if (mapById.size() == originalSize) {
+ // Nothing was actually removed.
+ return this;
+ }
+
+ return new LogicalTopology(version + 1, mapById.values());
+ }
+
+ boolean containsNodeById(ClusterNode needle) {
+ return nodes.stream().anyMatch(node ->
node.id().equals(needle.id()));
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(LogicalTopology.class, this);
+ }
}
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
index 07dcdad9c2..501b0429da 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
@@ -24,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.client.WriteCommand;
/**
- * Command that gets executed when a node needs to be removed from the logical
topology.
+ * Command that gets executed when nodes need to be removed from the logical
topology.
*/
@Transferable(CmgMessageGroup.Commands.NODES_LEAVE)
public interface NodesLeaveCommand extends WriteCommand, NetworkMessage {
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
index 63e695b37d..3959f07814 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
@@ -25,10 +25,14 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
@@ -221,4 +225,39 @@ public abstract class
AbstractClusterStateStorageManagerTest {
assertThat(storageManager.getValidatedNodeIds(),
containsInAnyOrder("node2"));
}
+
+ @Test
+ void logicalTopologyAdditionUsesNameAsNodeKey() {
+ storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node",
new NetworkAddress("host", 1000)));
+
+ storageManager.putLogicalTopologyNode(new ClusterNode("id2", "node",
new NetworkAddress("host", 1000)));
+
+ Collection<ClusterNode> topology = storageManager.getLogicalTopology();
+
+ assertThat(topology, hasSize(1));
+
+ assertThat(topology.iterator().next().id(), is("id2"));
+ }
+
+ @Test
+ void logicalTopologyRemovalUsesIdAsNodeKey() {
+ storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node",
new NetworkAddress("host", 1000)));
+
+ storageManager.removeLogicalTopologyNodes(Set.of(new
ClusterNode("id2", "node", new NetworkAddress("host", 1000))));
+
+ assertThat(storageManager.getLogicalTopology(), hasSize(1));
+ assertThat(storageManager.getLogicalTopology().iterator().next().id(),
is("id1"));
+
+ storageManager.removeLogicalTopologyNodes(Set.of(new
ClusterNode("id1", "another-name", new NetworkAddress("host", 1000))));
+
+ assertThat(storageManager.getLogicalTopology(), is(empty()));
+ }
+
+ @Test
+ void inLogicalTopologyTestUsesIdAsNodeKey() {
+ storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node",
new NetworkAddress("host", 1000)));
+
+ assertTrue(storageManager.isNodeInLogicalTopology(new
ClusterNode("id1", "node", new NetworkAddress("host", 1000))));
+ assertFalse(storageManager.isNodeInLogicalTopology(new
ClusterNode("another-id", "node", new NetworkAddress("host", 1000))));
+ }
}