This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7ff9534a53 IGNITE-23484 Do not use ByteUtils#toBytes in zones
management (#4594)
7ff9534a53 is described below
commit 7ff9534a53c17430272788a87605ba55ad83b11e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Oct 28 21:35:53 2024 +0400
IGNITE-23484 Do not use ByteUtils#toBytes in zones management (#4594)
---
.../ItDistributionZonesFiltersTest.java | 9 +-
...niteDistributionZoneManagerNodeRestartTest.java | 36 +++++---
.../distributionzones/AugmentationSerializer.java | 66 +++++++++++++++
.../distributionzones/DataNodesMapSerializer.java | 79 ++++++++++++++++++
.../distributionzones/DistributionZoneManager.java | 57 ++++++++-----
.../distributionzones/DistributionZonesUtil.java | 23 +++--
.../LogicalTopologySetSerializer.java | 75 +++++++++++++++++
.../ignite/internal/distributionzones/Node.java | 5 +-
.../internal/distributionzones/NodeSerializer.java | 46 ++++++++++
.../distributionzones/NodeWithAttributes.java | 5 +-
.../NodeWithAttributesSerializer.java | 85 +++++++++++++++++++
.../NodesAttributesSerializer.java | 88 ++++++++++++++++++++
.../TopologyAugmentationMapSerializer.java | 76 +++++++++++++++++
.../CausalityDataNodesEngine.java | 13 +--
.../DataNodesMapSerializerTest.java | 57 +++++++++++++
.../LogicalTopologySetSerializerTest.java | 93 +++++++++++++++++++++
.../NodeWithAttributesSerializerTest.java | 63 ++++++++++++++
.../NodesAttributesSerializerTest.java | 97 ++++++++++++++++++++++
.../TopologyAugmentationMapSerializerTest.java | 82 ++++++++++++++++++
.../DistributionZoneCausalityDataNodesTest.java | 15 ++--
.../DistributionZoneRebalanceEngineTest.java | 4 +-
.../DistributionZonesTestUtil.java | 10 +--
22 files changed, 1015 insertions(+), 69 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
index 149d3529bc..6db9a426b9 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
@@ -24,16 +24,15 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -156,7 +155,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
- (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ (v) -> deserializeDataNodesMap(v).size(),
4,
TIMEOUT_MILLIS
);
@@ -414,7 +413,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
- (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ (v) -> deserializeDataNodesMap(v).size(),
expectedDataNodesSize,
TIMEOUT_MILLIS
);
@@ -431,7 +430,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
- (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ (v) -> deserializeDataNodesMap(v).size(),
expectedDataNodesSize,
TIMEOUT_MILLIS
);
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index bf2806ed41..c3e9ec3ace 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -31,6 +31,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKeyPrefix;
@@ -43,9 +44,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -127,7 +126,6 @@ import
org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceF
import
org.apache.ignite.internal.security.authentication.validator.AuthenticationProvidersValidatorImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.worker.fixtures.NoOpCriticalWorkerRegistry;
import org.apache.ignite.network.NetworkAddress;
@@ -480,7 +478,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertTrue(waitForCondition(() ->
logicalTopology.equals(finalDistributionZoneManager.logicalTopology()),
TIMEOUT_MILLIS));
- assertValueInStorage(metastore, zonesLastHandledTopology(),
ByteUtils::fromBytes, logicalTopology, TIMEOUT_MILLIS);
+ assertValueInStorage(metastore, zonesLastHandledTopology(),
this::deserializeLogicalTopologySet, logicalTopology, TIMEOUT_MILLIS);
int zoneId = getDefaultZoneId(node);
@@ -506,7 +504,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertTrue(waitForCondition(() ->
newLogicalTopology.equals(finalDistributionZoneManager.logicalTopology()),
TIMEOUT_MILLIS));
- assertValueInStorage(metastore, zonesLastHandledTopology(),
ByteUtils::fromBytes, logicalTopology, TIMEOUT_MILLIS);
+ assertValueInStorage(metastore, zonesLastHandledTopology(),
this::deserializeLogicalTopologySet, logicalTopology, TIMEOUT_MILLIS);
node.stop();
@@ -520,7 +518,13 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
assertEquals(newLogicalTopology,
distributionZoneManager.logicalTopology());
- assertValueInStorage(metastore, zonesLastHandledTopology(),
ByteUtils::fromBytes, newLogicalTopology, TIMEOUT_MILLIS);
+ assertValueInStorage(
+ metastore,
+ zonesLastHandledTopology(),
+ this::deserializeLogicalTopologySet,
+ newLogicalTopology,
+ TIMEOUT_MILLIS
+ );
assertDataNodesFromManager(
distributionZoneManager,
@@ -572,7 +576,13 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
assertEquals(newLogicalTopology,
distributionZoneManager.logicalTopology());
- assertValueInStorage(metastore, zonesLastHandledTopology(),
ByteUtils::fromBytes, newLogicalTopology, TIMEOUT_MILLIS);
+ assertValueInStorage(
+ metastore,
+ zonesLastHandledTopology(),
+ this::deserializeLogicalTopologySet,
+ newLogicalTopology,
+ TIMEOUT_MILLIS
+ );
int zoneId = getDefaultZoneId(node);
@@ -609,7 +619,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertValueInStorage(
metastore,
zoneDataNodesKey(zoneId),
- (v) ->
dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
Set.of(A.name(), B.name(), C.name()),
TIMEOUT_MILLIS
);
@@ -625,7 +635,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
ByteArray dataNodeKeyForZone = new ByteArray(dataNodeKey[0]);
// Here we remove data nodes value for newly created zone, so it
is tombstone
- metastore.put(dataNodeKeyForZone,
toBytes(toDataNodesMap(emptySet()))).get();
+ metastore.put(dataNodeKeyForZone,
DataNodesMapSerializer.serialize(toDataNodesMap(emptySet()))).get();
metastore.remove(dataNodeKeyForZone).get();
@@ -650,6 +660,10 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
assertThat(metastore.get(new
ByteArray(dataNodeKey[0])).thenApply(Entry::tombstone), willBe(true));
}
+ private Set<NodeWithAttributes> deserializeLogicalTopologySet(byte[]
bytes) {
+ return DistributionZonesUtil.deserializeLogicalTopologySet(bytes);
+ }
+
@ParameterizedTest(name = "defaultZone={0}")
@ValueSource(booleans = {true, false})
public void testLocalDataNodesAreRestoredAfterRestart(boolean defaultZone)
throws Exception {
@@ -751,7 +765,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertValueInStorage(
metastore,
zoneDataNodesKey(zoneId),
- (v) ->
dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
Set.of(A.name(), B.name(), C.name()),
TIMEOUT_MILLIS
);
@@ -812,7 +826,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertValueInStorage(
metastore,
zoneDataNodesKey(zoneId),
- (v) ->
dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
Set.of(A.name(), C.name()),
TIMEOUT_MILLIS
);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/AugmentationSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/AugmentationSerializer.java
new file mode 100644
index 0000000000..057802f2e3
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/AugmentationSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link Augmentation} instances.
+ */
+public class AugmentationSerializer extends VersionedSerializer<Augmentation> {
+ /** Serializer instance. */
+ public static final AugmentationSerializer INSTANCE = new
AugmentationSerializer();
+
+ private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(Augmentation augmentation,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(augmentation.nodes().size());
+ for (Node node : augmentation.nodes()) {
+ nodeSerializer.writeExternal(node, out);
+ }
+
+ out.writeBoolean(augmentation.addition());
+ }
+
+ @Override
+ protected Augmentation readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
+ Set<Node> nodes = readNodes(in);
+ boolean addition = in.readBoolean();
+
+ return new Augmentation(nodes, addition);
+ }
+
+ private Set<Node> readNodes(IgniteDataInput in) throws IOException {
+ int length = in.readVarIntAsInt();
+
+ Set<Node> nodes = new HashSet<>(IgniteUtils.capacity(length));
+ for (int i = 0; i < length; i++) {
+ nodes.add(nodeSerializer.readExternal(in));
+ }
+
+ return nodes;
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
new file mode 100644
index 0000000000..eace2f8b9f
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for data nodes' maps.
+ */
+public class DataNodesMapSerializer extends VersionedSerializer<Map<Node,
Integer>> {
+ /** Serializer instance. */
+ public static final DataNodesMapSerializer INSTANCE = new
DataNodesMapSerializer();
+
+ private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(Map<Node, Integer> map, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(map.size());
+ for (Map.Entry<Node, Integer> entry : map.entrySet()) {
+ nodeSerializer.writeExternal(entry.getKey(), out);
+ out.writeVarInt(entry.getValue());
+ }
+ }
+
+ @Override
+ protected Map<Node, Integer> readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ int size = in.readVarIntAsInt();
+
+ Map<Node, Integer> map = new HashMap<>(IgniteUtils.capacity(size));
+ for (int i = 0; i < size; i++) {
+ Node node = nodeSerializer.readExternal(in);
+ int count = in.readVarIntAsInt();
+
+ map.put(node, count);
+ }
+
+ return map;
+ }
+
+ /**
+ * Serializes a map to bytes.
+ *
+ * @param map Map to serialize.
+ */
+ public static byte[] serialize(Map<Node, Integer> map) {
+ return VersionedSerialization.toBytes(map, INSTANCE);
+ }
+
+ /**
+ * Deserializes a map from bytes.
+ *
+ * @param bytes Bytes.
+ */
+ public static Map<Node, Integer> deserialize(byte[] bytes) {
+ return VersionedSerialization.fromBytes(bytes, INSTANCE);
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 5911f02400..0f0803d8fa 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -35,6 +35,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneRemoval;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndTriggerKeys;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
@@ -61,9 +62,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -71,7 +70,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -434,7 +432,7 @@ public class DistributionZoneManager implements
IgniteComponent {
// This case means that there won't any logical topology
updates before restart.
topologyAugmentationMap = new ConcurrentSkipListMap<>();
} else {
- topologyAugmentationMap =
fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+ topologyAugmentationMap =
TopologyAugmentationMapSerializer.deserialize(topologyAugmentationMapLocalMetaStorage.value());
}
ZoneState zoneState = new ZoneState(executor,
topologyAugmentationMap);
@@ -528,7 +526,11 @@ public class DistributionZoneManager implements
IgniteComponent {
// Update data nodes for a zone only if the corresponding data
nodes keys weren't initialised in ms yet.
CompoundCondition triggerKeyCondition =
conditionForZoneCreation(zoneId);
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKeys(zoneId, revision,
toBytes(toDataNodesMap(dataNodes)));
+ Update dataNodesAndTriggerKeyUpd = updateDataNodesAndTriggerKeys(
+ zoneId,
+ revision,
+ DataNodesMapSerializer.serialize(toDataNodesMap(dataNodes))
+ );
Iif iif = iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
@@ -677,16 +679,17 @@ public class DistributionZoneManager implements
IgniteComponent {
// that one value is not null, but other is null.
assert nodeAttributesEntry.value() != null;
- logicalTopology = fromBytes(lastHandledTopologyEntry.value());
+ logicalTopology =
deserializeLogicalTopologySet(lastHandledTopologyEntry.value());
- nodesAttributes = fromBytes(nodeAttributesEntry.value());
+ nodesAttributes =
DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value());
}
- assert lastHandledTopologyEntry.value() == null ||
logicalTopology.equals(fromBytes(lastHandledTopologyEntry.value()))
+ assert lastHandledTopologyEntry.value() == null
+ ||
logicalTopology.equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
: "Initial value of logical topology was changed after
initialization from the Meta Storage manager.";
assert nodeAttributesEntry.value() == null
- ||
nodesAttributes.equals(fromBytes(nodeAttributesEntry.value()))
+ ||
nodesAttributes.equals(DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value()))
: "Initial value of nodes' attributes was changed after
initialization from the Meta Storage manager.";
}
@@ -723,7 +726,7 @@ public class DistributionZoneManager implements
IgniteComponent {
} else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
newLogicalTopologyBytes = e.value();
- newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ newLogicalTopology =
deserializeLogicalTopologySet(newLogicalTopologyBytes);
}
}
@@ -831,18 +834,24 @@ public class DistributionZoneManager implements
IgniteComponent {
) {
Operation[] puts = new Operation[3 + zoneIds.size()];
- puts[0] = put(zonesNodesAttributes(), toBytes(nodesAttributes()));
+ puts[0] = put(zonesNodesAttributes(),
NodesAttributesSerializer.serialize(nodesAttributes()));
puts[1] = put(zonesRecoverableStateRevision(),
longToBytesKeepingOrder(revision));
- puts[2] = put(zonesLastHandledTopology(), toBytes(newLogicalTopology));
+ puts[2] = put(
+ zonesLastHandledTopology(),
+ LogicalTopologySetSerializer.serialize(newLogicalTopology)
+ );
int i = 3;
// TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly
utilise topology augmentation map. Also this map
// TODO: can be saved only once for all zones.
for (Integer zoneId : zoneIds) {
- puts[i++] = put(zoneTopologyAugmentation(zoneId),
toBytes(zonesState.get(zoneId).topologyAugmentationMap()));
+ puts[i++] = put(
+ zoneTopologyAugmentation(zoneId),
+
TopologyAugmentationMapSerializer.serialize(zonesState.get(zoneId).topologyAugmentationMap())
+ );
}
Iif iif = iif(
@@ -982,7 +991,11 @@ public class DistributionZoneManager implements
IgniteComponent {
// Remove redundant nodes that are not presented in the data
nodes.
newDataNodes.entrySet().removeIf(e -> e.getValue() == 0);
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndScaleUpTriggerKey(
+ zoneId,
+ revision,
+ DataNodesMapSerializer.serialize(newDataNodes)
+ );
Iif iif = iif(
triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision,
scaleDownTriggerRevision, zoneId),
@@ -1088,7 +1101,11 @@ public class DistributionZoneManager implements
IgniteComponent {
// Remove redundant nodes that are not presented in the data
nodes.
newDataNodes.entrySet().removeIf(e -> e.getValue() == 0);
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndScaleDownTriggerKey(zoneId, revision, toBytes(newDataNodes));
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndScaleDownTriggerKey(
+ zoneId,
+ revision,
+ DataNodesMapSerializer.serialize(newDataNodes)
+ );
Iif iif = iif(
triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision,
scaleDownTriggerRevision, zoneId),
@@ -1365,16 +1382,14 @@ public class DistributionZoneManager implements
IgniteComponent {
* Class stores the info about nodes that should be added or removed from
the data nodes of a zone.
* With flag {@code addition} we can track whether {@code nodeNames}
should be added or removed.
*/
- public static class Augmentation implements Serializable {
- private static final long serialVersionUID = -7957428671075739621L;
-
- /** Names of the node. */
+ public static class Augmentation {
+ /** Nodes. */
private final Set<Node> nodes;
/** Flag that indicates whether {@code nodeNames} should be added or
removed. */
private final boolean addition;
- Augmentation(Set<Node> nodes, boolean addition) {
+ public Augmentation(Set<Node> nodes, boolean addition) {
this.nodes = unmodifiableSet(nodes);
this.addition = addition;
}
@@ -1441,7 +1456,7 @@ public class DistributionZoneManager implements
IgniteComponent {
Entry topologyEntry =
metaStorageManager.getLocally(zonesLogicalTopologyKey(), recoveryRevision);
if (topologyEntry.value() != null) {
- Set<NodeWithAttributes> newLogicalTopology =
fromBytes(topologyEntry.value());
+ Set<NodeWithAttributes> newLogicalTopology =
deserializeLogicalTopologySet(topologyEntry.value());
long topologyRevision = topologyEntry.revision();
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 176679ec8c..e8fed7ed5b 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -32,7 +32,6 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
@@ -63,7 +62,6 @@ import
org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
-import org.apache.ignite.internal.util.ByteUtils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -428,7 +426,10 @@ public class DistributionZonesUtil {
List<Operation> operations = new ArrayList<>();
operations.add(put(zonesLogicalTopologyVersionKey(),
longToBytesKeepingOrder(logicalTopology.version())));
- operations.add(put(zonesLogicalTopologyKey(),
ByteUtils.toBytes(topologyFromCmg)));
+ operations.add(put(
+ zonesLogicalTopologyKey(),
+ LogicalTopologySetSerializer.serialize(topologyFromCmg)
+ ));
if (updateClusterId) {
operations.add(put(zonesLogicalTopologyClusterIdKey(),
uuidToBytes(logicalTopology.clusterId())));
}
@@ -466,7 +467,19 @@ public class DistributionZonesUtil {
@Nullable
public static Set<Node> parseDataNodes(byte[] dataNodesBytes) {
- return dataNodesBytes == null ? null :
dataNodes(fromBytes(dataNodesBytes));
+ return dataNodesBytes == null ? null :
dataNodes(deserializeDataNodesMap(dataNodesBytes));
+ }
+
+ public static Map<Node, Integer> deserializeDataNodesMap(byte[] bytes) {
+ return DataNodesMapSerializer.deserialize(bytes);
+ }
+
+ public static Set<NodeWithAttributes> deserializeLogicalTopologySet(byte[]
bytes) {
+ return LogicalTopologySetSerializer.deserialize(bytes);
+ }
+
+ public static Map<UUID, NodeWithAttributes>
deserializeNodesAttributes(byte[] bytes) {
+ return NodesAttributesSerializer.deserialize(bytes);
}
/**
@@ -477,7 +490,7 @@ public class DistributionZonesUtil {
*/
static Map<Node, Integer> extractDataNodes(Entry dataNodesEntry) {
if (!dataNodesEntry.empty()) {
- return fromBytes(dataNodesEntry.value());
+ return deserializeDataNodesMap(dataNodesEntry.value());
} else {
return emptyMap();
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializer.java
new file mode 100644
index 0000000000..93112962bf
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for logical topology sets (represented with
{@code Set<NodeWithAttributes} instances).
+ */
+public class LogicalTopologySetSerializer extends
VersionedSerializer<Set<NodeWithAttributes>> {
+ /** Serializer instance. */
+ public static final LogicalTopologySetSerializer INSTANCE = new
LogicalTopologySetSerializer();
+
+ private final NodeWithAttributesSerializer nodeWithAttributesSerializer =
NodeWithAttributesSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(Set<NodeWithAttributes> topology,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(topology.size());
+ for (NodeWithAttributes node : topology) {
+ nodeWithAttributesSerializer.writeExternal(node, out);
+ }
+ }
+
+ @Override
+ protected Set<NodeWithAttributes> readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ int length = in.readVarIntAsInt();
+
+ Set<NodeWithAttributes> topology = new
HashSet<>(IgniteUtils.capacity(length));
+ for (int i = 0; i < length; i++) {
+ topology.add(nodeWithAttributesSerializer.readExternal(in));
+ }
+
+ return topology;
+ }
+
+ /**
+ * Serializes a set to bytes.
+ *
+ * @param set Set to serialize.
+ */
+ public static byte[] serialize(Set<NodeWithAttributes> set) {
+ return VersionedSerialization.toBytes(set, INSTANCE);
+ }
+
+ /**
+ * Deserializes a set from bytes.
+ *
+ * @param bytes Bytes.
+ */
+ public static Set<NodeWithAttributes> deserialize(byte[] bytes) {
+ return VersionedSerialization.fromBytes(bytes, INSTANCE);
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
index a05322c496..e9d69ee320 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.distributionzones;
-import java.io.Serializable;
import java.util.UUID;
/**
@@ -26,9 +25,7 @@ import java.util.UUID;
* {@code nodeId} that is unique identifier of a node, that changes after a
restart.
* {@code nodeId} is needed to get node's attributes from the local state of
the distribution zone manager.
*/
-public class Node implements Serializable {
- private static final long serialVersionUID = 875461392587175703L;
-
+public class Node {
private final String nodeName;
private final UUID nodeId;
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeSerializer.java
new file mode 100644
index 0000000000..772d055a46
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link Node} instances.
+ */
+public class NodeSerializer extends VersionedSerializer<Node> {
+ /** Serializer instance. */
+ public static final NodeSerializer INSTANCE = new NodeSerializer();
+
+ @Override
+ protected void writeExternalData(Node node, IgniteDataOutput out) throws
IOException {
+ out.writeUTF(node.nodeName());
+ out.writeUuid(node.nodeId());
+ }
+
+ @Override
+ protected Node readExternalData(byte protoVer, IgniteDataInput in) throws
IOException {
+ String nodeName = in.readUTF();
+ UUID nodeId = in.readUuid();
+
+ return new Node(nodeName, nodeId);
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
index 1758ebc99c..23e616f553 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.distributionzones;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -28,9 +27,7 @@ import org.jetbrains.annotations.Nullable;
* Structure that represents node with the attributes and which we store in
Meta Storage when we store logical topology.
* Light-weighted version of the {@link LogicalNode}.
*/
-public class NodeWithAttributes implements Serializable {
- private static final long serialVersionUID = -7778967985161743937L;
-
+public class NodeWithAttributes {
private final Node node;
private final Map<String, String> userAttributes;
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializer.java
new file mode 100644
index 0000000000..e049b7389b
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link NodeWithAttributes} instances.
+ */
+public class NodeWithAttributesSerializer extends
VersionedSerializer<NodeWithAttributes> {
+ /** Serializer instance. */
+ public static final NodeWithAttributesSerializer INSTANCE = new
NodeWithAttributesSerializer();
+
+ private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(NodeWithAttributes node, IgniteDataOutput
out) throws IOException {
+ nodeSerializer.writeExternal(node.node(), out);
+
+ out.writeVarInt(node.userAttributes().size());
+ for (Map.Entry<String, String> attrEntry :
node.userAttributes().entrySet()) {
+ out.writeUTF(attrEntry.getKey());
+ out.writeUTF(attrEntry.getValue());
+ }
+
+ out.writeVarInt(node.storageProfiles().size());
+ for (String profile : node.storageProfiles()) {
+ out.writeUTF(profile);
+ }
+ }
+
+ @Override
+ protected NodeWithAttributes readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ Node node = nodeSerializer.readExternal(in);
+ Map<String, String> userAttributes = readUserAttributes(in);
+ List<String> storageProfiles = readStorageProfiles(in);
+
+ return new NodeWithAttributes(node.nodeName(), node.nodeId(),
userAttributes, storageProfiles);
+ }
+
+ private static Map<String, String> readUserAttributes(IgniteDataInput in)
throws IOException {
+ int length = in.readVarIntAsInt();
+
+ Map<String, String> attrs = new
HashMap<>(IgniteUtils.capacity(length));
+ for (int i = 0; i < length; i++) {
+ attrs.put(in.readUTF(), in.readUTF());
+ }
+
+ return attrs;
+ }
+
+ private static List<String> readStorageProfiles(IgniteDataInput in) throws
IOException {
+ int length = in.readVarIntAsInt();
+
+ List<String> storageProfiles = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ storageProfiles.add(in.readUTF());
+ }
+
+ return storageProfiles;
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializer.java
new file mode 100644
index 0000000000..f3984daf34
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for nodes attributes (represented with {@code
Map<UUID, NodeWithAttributes>}).
+ *
+ * <p>The map is deserialized as a {@link ConcurrentHashMap}.
+ */
+public class NodesAttributesSerializer extends VersionedSerializer<Map<UUID,
NodeWithAttributes>> {
+ /** Serializer instance. */
+ public static final NodesAttributesSerializer INSTANCE = new
NodesAttributesSerializer();
+
+ private final NodeWithAttributesSerializer nodeWithAttributesSerializer =
NodeWithAttributesSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(Map<UUID, NodeWithAttributes> map,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(map.size());
+ for (Map.Entry<UUID, NodeWithAttributes> entry : map.entrySet()) {
+ NodeWithAttributes value = entry.getValue();
+
+ assert entry.getKey().equals(value.nodeId()) : map;
+
+ nodeWithAttributesSerializer.writeExternal(value, out);
+ }
+ }
+
+ @Override
+ protected Map<UUID, NodeWithAttributes> readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ int length = in.readVarIntAsInt();
+
+ List<NodeWithAttributes> nodes = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ nodes.add(nodeWithAttributesSerializer.readExternal(in));
+ }
+
+ return nodes.stream().collect(
+ toMap(NodeWithAttributes::nodeId, identity(), (x, y) -> y,
ConcurrentHashMap::new)
+ );
+ }
+
+ /**
+ * Serializes a map to bytes.
+ *
+ * @param map Map to serialize.
+ */
+ public static byte[] serialize(Map<UUID, NodeWithAttributes> map) {
+ return VersionedSerialization.toBytes(map, INSTANCE);
+ }
+
+ /**
+ * Deserializes a map from bytes.
+ *
+ * @param bytes Bytes.
+ */
+ public static Map<UUID, NodeWithAttributes> deserialize(byte[] bytes) {
+ return VersionedSerialization.fromBytes(bytes, INSTANCE);
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializer.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializer.java
new file mode 100644
index 0000000000..c403137227
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for topology augmentation maps represented with
{@code ConcurrentSkipListMap<Long, Augmentation>}.
+ */
+public class TopologyAugmentationMapSerializer extends
VersionedSerializer<ConcurrentSkipListMap<Long, Augmentation>> {
+ /** Serializer instance. */
+ public static final TopologyAugmentationMapSerializer INSTANCE = new
TopologyAugmentationMapSerializer();
+
+ private final AugmentationSerializer augmentationSerializer =
AugmentationSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(ConcurrentSkipListMap<Long, Augmentation>
map, IgniteDataOutput out) throws IOException {
+ out.writeVarInt(map.size());
+ for (Entry<Long, Augmentation> entry : map.entrySet()) {
+ out.writeVarInt(entry.getKey());
+ augmentationSerializer.writeExternal(entry.getValue(), out);
+ }
+ }
+
+ @Override
+ protected ConcurrentSkipListMap<Long, Augmentation> readExternalData(byte
protoVer, IgniteDataInput in) throws IOException {
+ int length = in.readVarIntAsInt();
+
+ ConcurrentSkipListMap<Long, Augmentation> map = new
ConcurrentSkipListMap<>();
+ for (int i = 0; i < length; i++) {
+ map.put(in.readVarInt(), augmentationSerializer.readExternal(in));
+ }
+
+ return map;
+ }
+
+ /**
+ * Serializes a map to bytes.
+ *
+ * @param map Map to serialize.
+ */
+ public static byte[] serialize(ConcurrentSkipListMap<Long, Augmentation>
map) {
+ return VersionedSerialization.toBytes(map, INSTANCE);
+ }
+
+ /**
+ * Deserializes a map from bytes.
+ *
+ * @param bytes Bytes.
+ */
+ public static ConcurrentSkipListMap<Long, Augmentation> deserialize(byte[]
bytes) {
+ return VersionedSerialization.fromBytes(bytes, INSTANCE);
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
index 838224f922..5fb23cbb0b 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
@@ -21,13 +21,13 @@ import static java.lang.Math.max;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -46,6 +46,7 @@ import
org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.causality.OutdatedTokenException;
import org.apache.ignite.internal.causality.VersionedValue;
+import org.apache.ignite.internal.distributionzones.DataNodesMapSerializer;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
@@ -188,7 +189,7 @@ public class CausalityDataNodesEngine {
return emptySet();
}
- Set<NodeWithAttributes> logicalTopology =
fromBytes(topologyEntry.value());
+ Set<NodeWithAttributes> logicalTopology =
deserializeLogicalTopologySet(topologyEntry.value());
Set<Node> logicalTopologyNodes =
logicalTopology.stream().map(n -> n.node()).collect(toSet());
@@ -227,7 +228,9 @@ public class CausalityDataNodesEngine {
return emptySet();
}
- Set<Node> baseDataNodes =
DistributionZonesUtil.dataNodes(fromBytes(dataNodesEntry.value()));
+ Set<Node> baseDataNodes = DistributionZonesUtil.dataNodes(
+ DataNodesMapSerializer.deserialize(dataNodesEntry.value())
+ );
long scaleUpTriggerRevision =
bytesToLongKeepingOrder(scaleUpChangeTriggerKey.value());
long scaleDownTriggerRevision =
bytesToLongKeepingOrder(scaleDownChangeTriggerKey.value());
@@ -410,7 +413,7 @@ public class CausalityDataNodesEngine {
if (!topologyEntry.empty()) {
byte[] newerLogicalTopologyBytes = topologyEntry.value();
- newerLogicalTopology = fromBytes(newerLogicalTopologyBytes);
+ newerLogicalTopology =
deserializeLogicalTopologySet(newerLogicalTopologyBytes);
newerTopologyRevision = topologyEntry.revision();
@@ -426,7 +429,7 @@ public class CausalityDataNodesEngine {
} else {
byte[] olderLogicalTopologyBytes = topologyEntry.value();
- olderLogicalTopology =
fromBytes(olderLogicalTopologyBytes);
+ olderLogicalTopology =
deserializeLogicalTopologySet(olderLogicalTopologyBytes);
}
CatalogZoneDescriptor zoneDescriptor =
catalogManager.catalog(catalogVersion).zone(zoneId);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializerTest.java
new file mode 100644
index 0000000000..5edd1c1754
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class DataNodesMapSerializerTest {
+ private final DataNodesMapSerializer serializer = new
DataNodesMapSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ Map<Node, Integer> originalMap = Map.of(
+ new Node("a", new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L)), 1000,
+ new Node("b", new UUID(0xFEDCBA0987654321L,
0x1234567890ABCDEFL)), 2000
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+ Map<Node, Integer> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMap, equalTo(originalMap));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwMB775DAmHvzauQeFY0EiFDZYcJutz+6QcB775DAmIhQ2WHCbrc/u/Nq5B4VjQS0Q8=");
+ Map<Node, Integer> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMap, is(aMapWithSize(2)));
+ assertThat(restoredMap, hasEntry(new Node("a", new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)), 1000));
+ assertThat(restoredMap, hasEntry(new Node("b", new
UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL)), 2000));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializerTest.java
new file mode 100644
index 0000000000..469dff5ef5
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class LogicalTopologySetSerializerTest {
+ private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L);
+ private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L,
0x1234567890ABCDEFL);
+
+ private final LogicalTopologySetSerializer serializer = new
LogicalTopologySetSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ Set<NodeWithAttributes> originalNodes = Set.of(
+ new NodeWithAttributes(
+ "node1",
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
+ Map.of("a1", "v1"),
+ List.of("prof1")
+ ),
+ new NodeWithAttributes(
+ "node2",
+ new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+ Map.of("a2", "v2"),
+ List.of("prof2")
+ )
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalNodes,
serializer);
+ Set<NodeWithAttributes> restoredNodes =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredNodes, equalTo(originalNodes));
+ assertEquals(
+
originalNodes.stream().map(NodeWithAttributes::nodeId).collect(toSet()),
+
restoredNodes.stream().map(NodeWithAttributes::nodeId).collect(toSet())
+ );
+ }
+
+ @Test
+ void v1CanBeSerialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwMB775DAe++QwZub2RlMe/Nq5B4VjQSIUNlhwm63P4CA2ExA3YxAgZwcm9mMQHvvkMB775DBm5vZGUy"
+ + "IUNlhwm63P7vzauQeFY0EgIDYTIDdjICBnByb2Yy");
+ Set<NodeWithAttributes> restoredNodes =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredNodes, hasSize(2));
+
+ NodeWithAttributes node1 = restoredNodes.stream()
+ .filter(n -> n.nodeId().equals(NODE1_ID))
+ .findAny().orElseThrow();
+ assertThat(node1.nodeId(), is(NODE1_ID));
+ assertThat(node1.nodeName(), is("node1"));
+ assertThat(node1.userAttributes(), equalTo(Map.of("a1", "v1")));
+ assertThat(node1.storageProfiles(), contains("prof1"));
+
+ NodeWithAttributes node2 = restoredNodes.stream()
+ .filter(n -> n.nodeId().equals(NODE2_ID))
+ .findAny().orElseThrow();
+ assertThat(node2.nodeId(), is(NODE2_ID));
+ assertThat(node2.nodeName(), is("node2"));
+ assertThat(node2.userAttributes(), equalTo(Map.of("a2", "v2")));
+ assertThat(node2.storageProfiles(), contains("prof2"));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializerTest.java
new file mode 100644
index 0000000000..938550b313
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class NodeWithAttributesSerializerTest {
+ private final NodeWithAttributesSerializer serializer = new
NodeWithAttributesSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ NodeWithAttributes originalNode = new NodeWithAttributes(
+ "test",
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
+ Map.of("a1", "v1", "a2", "v2"),
+ List.of("prof1", "prof2")
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalNode,
serializer);
+ NodeWithAttributes restoredNode =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredNode.nodeId(), is(new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L)));
+ assertThat(restoredNode.nodeName(), is("test"));
+ assertThat(restoredNode.userAttributes(), equalTo(Map.of("a1", "v1",
"a2", "v2")));
+ assertThat(restoredNode.storageProfiles(), contains("prof1", "prof2"));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwHvvkMFdGVzdO/Nq5B4VjQSIUNlhwm63P4DA2ExA3YxA2EyA3YyAwZwcm9mMQZwcm9mMg==");
+ NodeWithAttributes restoredNode =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredNode.nodeId(), is(new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L)));
+ assertThat(restoredNode.nodeName(), is("test"));
+ assertThat(restoredNode.userAttributes(), equalTo(Map.of("a1", "v1",
"a2", "v2")));
+ assertThat(restoredNode.storageProfiles(), contains("prof1", "prof2"));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializerTest.java
new file mode 100644
index 0000000000..a6effb0a02
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class NodesAttributesSerializerTest {
+ private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L);
+ private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L,
0x1234567890ABCDEFL);
+
+ private static final String SERIALIZED_WITH_V1 =
"Ae++QwMB775DAe++QwZ0ZXN0Me/Nq5B4VjQSIUNlhwm63P4CA2ExA3YxAgZwcm9mMQHvvkMB775DBnRlc"
+ + "3QyIUNlhwm63P7vzauQeFY0EgIDYTIDdjICBnByb2Yy";
+
+ private final NodesAttributesSerializer serializer = new
NodesAttributesSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ Map<UUID, NodeWithAttributes> originalMap = originalMap();
+
+ byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+ Map<UUID, NodeWithAttributes> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMap, equalTo(originalMap));
+ // Also make sure that IDs are restored correctly as they are ignored
by NodeWithAttributes#equals().
+ for (Map.Entry<UUID, NodeWithAttributes> entry :
originalMap.entrySet()) {
+ assertThat(entry.getValue().nodeId(), equalTo(entry.getKey()));
+ }
+ }
+
+ private static Map<UUID, NodeWithAttributes> originalMap() {
+ NodeWithAttributes node1 = new NodeWithAttributes("test1", NODE1_ID,
Map.of("a1", "v1"), List.of("prof1"));
+ NodeWithAttributes node2 = new NodeWithAttributes("test2", NODE2_ID,
Map.of("a2", "v2"), List.of("prof2"));
+
+ return new ConcurrentHashMap<>(Map.of(
+ node1.nodeId(), node1,
+ node2.nodeId(), node2
+ ));
+ }
+
+ @Test
+ void deserializesAsConcurrentHashMap() {
+ Map<UUID, NodeWithAttributes> originalMap = originalMap();
+
+ byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+ Map<UUID, NodeWithAttributes> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMap, isA(ConcurrentHashMap.class));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes = Base64.getDecoder().decode(SERIALIZED_WITH_V1);
+ Map<UUID, NodeWithAttributes> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMap, aMapWithSize(2));
+
+ NodeWithAttributes node1 = restoredMap.get(NODE1_ID);
+ assertThat(node1.nodeId(), is(NODE1_ID));
+ assertThat(node1.nodeName(), is("test1"));
+ assertThat(node1.userAttributes(), equalTo(Map.of("a1", "v1")));
+ assertThat(node1.storageProfiles(), contains("prof1"));
+
+ NodeWithAttributes node2 = restoredMap.get(NODE2_ID);
+ assertThat(node2.nodeId(), is(NODE2_ID));
+ assertThat(node2.nodeName(), is("test2"));
+ assertThat(node2.userAttributes(), equalTo(Map.of("a2", "v2")));
+ assertThat(node2.storageProfiles(), contains("prof2"));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializerTest.java
new file mode 100644
index 0000000000..6b20a6f5f0
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class TopologyAugmentationMapSerializerTest {
+ private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L);
+ private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L,
0x1234567890ABCDEFL);
+
+ private final TopologyAugmentationMapSerializer serializer = new
TopologyAugmentationMapSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ Node node1 = new Node("node1", NODE1_ID);
+ Node node2 = new Node("node2", NODE2_ID);
+ ConcurrentSkipListMap<Long, Augmentation> originalMap = new
ConcurrentSkipListMap<>(Map.of(
+ 1000L, new Augmentation(Set.of(node1, node2), true),
+ 2000L, new Augmentation(Set.of(node2, node1), false)
+ ));
+
+ byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+ ConcurrentSkipListMap<Long, Augmentation> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMap, is(aMapWithSize(2)));
+
+ Augmentation augmentation1 = restoredMap.get(1000L);
+ assertThat(augmentation1.nodes(), containsInAnyOrder(node1, node2));
+ assertThat(augmentation1.addition(), is(true));
+
+ Augmentation augmentation2 = restoredMap.get(2000L);
+ assertThat(augmentation2.nodes(), containsInAnyOrder(node1, node2));
+ assertThat(augmentation2.addition(), is(false));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwPpBwHvvkMDAe++QwZub2RlMe/Nq5B4VjQSIUNlhwm63P4B775DBm5vZGUyIUNlhwm63P7vzauQeFY0"
+ +
"EgHRDwHvvkMDAe++QwZub2RlMiFDZYcJutz+782rkHhWNBIB775DBm5vZGUx782rkHhWNBIhQ2WHCbrc/gA=");
+ ConcurrentSkipListMap<Long, Augmentation> restoredMap =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ Node node1 = new Node("node1", NODE1_ID);
+ Node node2 = new Node("node2", NODE2_ID);
+
+ assertThat(restoredMap, is(aMapWithSize(2)));
+
+ Augmentation augmentation1 = restoredMap.get(1000L);
+ assertThat(augmentation1.nodes(), containsInAnyOrder(node1, node2));
+ assertThat(augmentation1.addition(), is(true));
+
+ Augmentation augmentation2 = restoredMap.get(2000L);
+ assertThat(augmentation2.nodes(), containsInAnyOrder(node1, node2));
+ assertThat(augmentation2.addition(), is(false));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index b36870ca9f..ce55d8ebf6 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -31,6 +31,8 @@ import static
org.apache.ignite.internal.cluster.management.topology.LogicalTopo
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
@@ -40,7 +42,6 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -668,7 +669,7 @@ public class DistributionZoneCausalityDataNodesTest extends
BaseDistributionZone
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
- (v) ->
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
ONE_NODE_NAME,
TIMEOUT
);
@@ -722,7 +723,7 @@ public class DistributionZoneCausalityDataNodesTest extends
BaseDistributionZone
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
- (v) ->
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
TWO_NODES_NAMES,
TIMEOUT
);
@@ -1170,7 +1171,7 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(zoneId),
- (v) ->
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
TWO_NODES_NAMES,
TIMEOUT
);
@@ -1200,7 +1201,7 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(entry.getKey()),
- (v) ->
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+ (v) ->
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
entry.getValue(),
TIMEOUT
);
@@ -1434,7 +1435,7 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
if (Arrays.equals(e.key(),
zonesLogicalTopologyVersionKey().bytes())) {
revision = e.revision();
} else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
- newLogicalTopology = fromBytes(e.value());
+ newLogicalTopology =
deserializeLogicalTopologySet(e.value());
}
}
@@ -1481,7 +1482,7 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
byte[] dataNodesBytes = e.value();
if (dataNodesBytes != null) {
- newDataNodes =
DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
+ newDataNodes =
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(dataNodesBytes));
} else {
newDataNodes = emptySet();
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index ec488923ed..a1653e7927 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -35,7 +35,6 @@ import static
org.apache.ignite.internal.partitiondistribution.PartitionDistribu
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.sql.ColumnType.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,6 +67,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.distributionzones.DataNodesMapSerializer;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.distributionzones.Node;
@@ -536,7 +536,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
byte[] newLogicalTopology;
if (nodes != null) {
- newLogicalTopology = toBytes(toDataNodesMap(nodes.stream()
+ newLogicalTopology =
DataNodesMapSerializer.serialize(toDataNodesMap(nodes.stream()
.map(n -> new Node(n, findNodeIdByConsistentId(n)))
.collect(toSet())));
} else {
diff --git
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 5ae827b891..6aea6e3e59 100644
---
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseStorageProfiles;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
@@ -27,7 +28,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -215,7 +215,7 @@ public class DistributionZonesTestUtil {
assertValueInStorage(
keyValueStorage,
zoneDataNodesKey(zoneId).bytes(),
- value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+ value ->
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(value)),
nodes,
2000
);
@@ -237,7 +237,7 @@ public class DistributionZonesTestUtil {
assertValueInStorage(
keyValueStorage,
zoneDataNodesKey(zoneId).bytes(),
- value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+ value ->
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(value)),
nodes,
2000
);
@@ -305,7 +305,7 @@ public class DistributionZonesTestUtil {
assertValueInStorage(
keyValueStorage,
zonesLogicalTopologyKey().bytes(),
- ByteUtils::fromBytes,
+ DistributionZonesUtil::deserializeLogicalTopologySet,
nodes,
1000
);
@@ -331,7 +331,7 @@ public class DistributionZonesTestUtil {
assertValueInStorage(
metaStorageManager,
zonesLogicalTopologyKey(),
- ByteUtils::fromBytes,
+ DistributionZonesUtil::deserializeLogicalTopologySet,
nodes,
1000
);