This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ff9534a53 IGNITE-23484 Do not use ByteUtils#toBytes in zones 
management (#4594)
7ff9534a53 is described below

commit 7ff9534a53c17430272788a87605ba55ad83b11e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Oct 28 21:35:53 2024 +0400

    IGNITE-23484 Do not use ByteUtils#toBytes in zones management (#4594)
---
 .../ItDistributionZonesFiltersTest.java            |  9 +-
 ...niteDistributionZoneManagerNodeRestartTest.java | 36 +++++---
 .../distributionzones/AugmentationSerializer.java  | 66 +++++++++++++++
 .../distributionzones/DataNodesMapSerializer.java  | 79 ++++++++++++++++++
 .../distributionzones/DistributionZoneManager.java | 57 ++++++++-----
 .../distributionzones/DistributionZonesUtil.java   | 23 +++--
 .../LogicalTopologySetSerializer.java              | 75 +++++++++++++++++
 .../ignite/internal/distributionzones/Node.java    |  5 +-
 .../internal/distributionzones/NodeSerializer.java | 46 ++++++++++
 .../distributionzones/NodeWithAttributes.java      |  5 +-
 .../NodeWithAttributesSerializer.java              | 85 +++++++++++++++++++
 .../NodesAttributesSerializer.java                 | 88 ++++++++++++++++++++
 .../TopologyAugmentationMapSerializer.java         | 76 +++++++++++++++++
 .../CausalityDataNodesEngine.java                  | 13 +--
 .../DataNodesMapSerializerTest.java                | 57 +++++++++++++
 .../LogicalTopologySetSerializerTest.java          | 93 +++++++++++++++++++++
 .../NodeWithAttributesSerializerTest.java          | 63 ++++++++++++++
 .../NodesAttributesSerializerTest.java             | 97 ++++++++++++++++++++++
 .../TopologyAugmentationMapSerializerTest.java     | 82 ++++++++++++++++++
 .../DistributionZoneCausalityDataNodesTest.java    | 15 ++--
 .../DistributionZoneRebalanceEngineTest.java       |  4 +-
 .../DistributionZonesTestUtil.java                 | 10 +--
 22 files changed, 1015 insertions(+), 69 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
index 149d3529bc..6db9a426b9 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
@@ -24,16 +24,15 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -156,7 +155,7 @@ public class ItDistributionZonesFiltersTest extends 
ClusterPerTestIntegrationTes
         assertValueInStorage(
                 metaStorageManager,
                 zoneDataNodesKey(zoneId),
-                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+                (v) -> deserializeDataNodesMap(v).size(),
                 4,
                 TIMEOUT_MILLIS
         );
@@ -414,7 +413,7 @@ public class ItDistributionZonesFiltersTest extends 
ClusterPerTestIntegrationTes
         assertValueInStorage(
                 metaStorageManager,
                 zoneDataNodesKey(zoneId),
-                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+                (v) -> deserializeDataNodesMap(v).size(),
                 expectedDataNodesSize,
                 TIMEOUT_MILLIS
         );
@@ -431,7 +430,7 @@ public class ItDistributionZonesFiltersTest extends 
ClusterPerTestIntegrationTes
         assertValueInStorage(
                 metaStorageManager,
                 zoneDataNodesKey(zoneId),
-                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+                (v) -> deserializeDataNodesMap(v).size(),
                 expectedDataNodesSize,
                 TIMEOUT_MILLIS
         );
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index bf2806ed41..c3e9ec3ace 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -31,6 +31,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKeyPrefix;
@@ -43,9 +44,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -127,7 +126,6 @@ import 
org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceF
 import 
org.apache.ignite.internal.security.authentication.validator.AuthenticationProvidersValidatorImpl;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.worker.fixtures.NoOpCriticalWorkerRegistry;
 import org.apache.ignite.network.NetworkAddress;
@@ -480,7 +478,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
         assertTrue(waitForCondition(() -> 
logicalTopology.equals(finalDistributionZoneManager.logicalTopology()), 
TIMEOUT_MILLIS));
 
-        assertValueInStorage(metastore, zonesLastHandledTopology(), 
ByteUtils::fromBytes, logicalTopology, TIMEOUT_MILLIS);
+        assertValueInStorage(metastore, zonesLastHandledTopology(), 
this::deserializeLogicalTopologySet, logicalTopology, TIMEOUT_MILLIS);
 
         int zoneId = getDefaultZoneId(node);
 
@@ -506,7 +504,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
         assertTrue(waitForCondition(() -> 
newLogicalTopology.equals(finalDistributionZoneManager.logicalTopology()), 
TIMEOUT_MILLIS));
 
-        assertValueInStorage(metastore, zonesLastHandledTopology(), 
ByteUtils::fromBytes, logicalTopology, TIMEOUT_MILLIS);
+        assertValueInStorage(metastore, zonesLastHandledTopology(), 
this::deserializeLogicalTopologySet, logicalTopology, TIMEOUT_MILLIS);
 
         node.stop();
 
@@ -520,7 +518,13 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         assertEquals(newLogicalTopology, 
distributionZoneManager.logicalTopology());
 
-        assertValueInStorage(metastore, zonesLastHandledTopology(), 
ByteUtils::fromBytes, newLogicalTopology, TIMEOUT_MILLIS);
+        assertValueInStorage(
+                metastore,
+                zonesLastHandledTopology(),
+                this::deserializeLogicalTopologySet,
+                newLogicalTopology,
+                TIMEOUT_MILLIS
+        );
 
         assertDataNodesFromManager(
                 distributionZoneManager,
@@ -572,7 +576,13 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         assertEquals(newLogicalTopology, 
distributionZoneManager.logicalTopology());
 
-        assertValueInStorage(metastore, zonesLastHandledTopology(), 
ByteUtils::fromBytes, newLogicalTopology, TIMEOUT_MILLIS);
+        assertValueInStorage(
+                metastore,
+                zonesLastHandledTopology(),
+                this::deserializeLogicalTopologySet,
+                newLogicalTopology,
+                TIMEOUT_MILLIS
+        );
 
         int zoneId = getDefaultZoneId(node);
 
@@ -609,7 +619,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
         assertValueInStorage(
                 metastore,
                 zoneDataNodesKey(zoneId),
-                (v) -> 
dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                (v) -> 
dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                 Set.of(A.name(), B.name(), C.name()),
                 TIMEOUT_MILLIS
         );
@@ -625,7 +635,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
             ByteArray dataNodeKeyForZone = new ByteArray(dataNodeKey[0]);
 
             // Here we remove data nodes value for newly created zone, so it 
is tombstone
-            metastore.put(dataNodeKeyForZone, 
toBytes(toDataNodesMap(emptySet()))).get();
+            metastore.put(dataNodeKeyForZone, 
DataNodesMapSerializer.serialize(toDataNodesMap(emptySet()))).get();
 
             metastore.remove(dataNodeKeyForZone).get();
 
@@ -650,6 +660,10 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
         assertThat(metastore.get(new 
ByteArray(dataNodeKey[0])).thenApply(Entry::tombstone), willBe(true));
     }
 
+    private Set<NodeWithAttributes> deserializeLogicalTopologySet(byte[] 
bytes) {
+        return DistributionZonesUtil.deserializeLogicalTopologySet(bytes);
+    }
+
     @ParameterizedTest(name = "defaultZone={0}")
     @ValueSource(booleans = {true, false})
     public void testLocalDataNodesAreRestoredAfterRestart(boolean defaultZone) 
throws Exception {
@@ -751,7 +765,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
         assertValueInStorage(
                 metastore,
                 zoneDataNodesKey(zoneId),
-                (v) -> 
dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                (v) -> 
dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                 Set.of(A.name(), B.name(), C.name()),
                 TIMEOUT_MILLIS
         );
@@ -812,7 +826,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
         assertValueInStorage(
                 metastore,
                 zoneDataNodesKey(zoneId),
-                (v) -> 
dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                (v) -> 
dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                 Set.of(A.name(), C.name()),
                 TIMEOUT_MILLIS
         );
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/AugmentationSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/AugmentationSerializer.java
new file mode 100644
index 0000000000..057802f2e3
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/AugmentationSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link Augmentation} instances.
+ */
+public class AugmentationSerializer extends VersionedSerializer<Augmentation> {
+    /** Serializer instance. */
+    public static final AugmentationSerializer INSTANCE = new 
AugmentationSerializer();
+
+    private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(Augmentation augmentation, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(augmentation.nodes().size());
+        for (Node node : augmentation.nodes()) {
+            nodeSerializer.writeExternal(node, out);
+        }
+
+        out.writeBoolean(augmentation.addition());
+    }
+
+    @Override
+    protected Augmentation readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        Set<Node> nodes = readNodes(in);
+        boolean addition = in.readBoolean();
+
+        return new Augmentation(nodes, addition);
+    }
+
+    private Set<Node> readNodes(IgniteDataInput in) throws IOException {
+        int length = in.readVarIntAsInt();
+
+        Set<Node> nodes = new HashSet<>(IgniteUtils.capacity(length));
+        for (int i = 0; i < length; i++) {
+            nodes.add(nodeSerializer.readExternal(in));
+        }
+
+        return nodes;
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
new file mode 100644
index 0000000000..eace2f8b9f
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for data nodes' maps.
+ */
+public class DataNodesMapSerializer extends VersionedSerializer<Map<Node, 
Integer>> {
+    /** Serializer instance. */
+    public static final DataNodesMapSerializer INSTANCE = new 
DataNodesMapSerializer();
+
+    private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(Map<Node, Integer> map, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(map.size());
+        for (Map.Entry<Node, Integer> entry : map.entrySet()) {
+            nodeSerializer.writeExternal(entry.getKey(), out);
+            out.writeVarInt(entry.getValue());
+        }
+    }
+
+    @Override
+    protected Map<Node, Integer> readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        int size = in.readVarIntAsInt();
+
+        Map<Node, Integer> map = new HashMap<>(IgniteUtils.capacity(size));
+        for (int i = 0; i < size; i++) {
+            Node node = nodeSerializer.readExternal(in);
+            int count = in.readVarIntAsInt();
+
+            map.put(node, count);
+        }
+
+        return map;
+    }
+
+    /**
+     * Serializes a map to bytes.
+     *
+     * @param map Map to serialize.
+     */
+    public static byte[] serialize(Map<Node, Integer> map) {
+        return VersionedSerialization.toBytes(map, INSTANCE);
+    }
+
+    /**
+     * Deserializes a map from bytes.
+     *
+     * @param bytes Bytes.
+     */
+    public static Map<Node, Integer> deserialize(byte[] bytes) {
+        return VersionedSerialization.fromBytes(bytes, INSTANCE);
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 5911f02400..0f0803d8fa 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -35,6 +35,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneRemoval;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndTriggerKeys;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
@@ -61,9 +62,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -71,7 +70,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -434,7 +432,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 // This case means that there won't any logical topology 
updates before restart.
                 topologyAugmentationMap = new ConcurrentSkipListMap<>();
             } else {
-                topologyAugmentationMap = 
fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+                topologyAugmentationMap = 
TopologyAugmentationMapSerializer.deserialize(topologyAugmentationMapLocalMetaStorage.value());
             }
 
             ZoneState zoneState = new ZoneState(executor, 
topologyAugmentationMap);
@@ -528,7 +526,11 @@ public class DistributionZoneManager implements 
IgniteComponent {
             // Update data nodes for a zone only if the corresponding data 
nodes keys weren't initialised in ms yet.
             CompoundCondition triggerKeyCondition = 
conditionForZoneCreation(zoneId);
 
-            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKeys(zoneId, revision, 
toBytes(toDataNodesMap(dataNodes)));
+            Update dataNodesAndTriggerKeyUpd = updateDataNodesAndTriggerKeys(
+                    zoneId,
+                    revision,
+                    DataNodesMapSerializer.serialize(toDataNodesMap(dataNodes))
+            );
 
             Iif iif = iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
 
@@ -677,16 +679,17 @@ public class DistributionZoneManager implements 
IgniteComponent {
             // that one value is not null, but other is null.
             assert nodeAttributesEntry.value() != null;
 
-            logicalTopology = fromBytes(lastHandledTopologyEntry.value());
+            logicalTopology = 
deserializeLogicalTopologySet(lastHandledTopologyEntry.value());
 
-            nodesAttributes = fromBytes(nodeAttributesEntry.value());
+            nodesAttributes = 
DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value());
         }
 
-        assert lastHandledTopologyEntry.value() == null || 
logicalTopology.equals(fromBytes(lastHandledTopologyEntry.value()))
+        assert lastHandledTopologyEntry.value() == null
+                || 
logicalTopology.equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
                 : "Initial value of logical topology was changed after 
initialization from the Meta Storage manager.";
 
         assert nodeAttributesEntry.value() == null
-                || 
nodesAttributes.equals(fromBytes(nodeAttributesEntry.value()))
+                || 
nodesAttributes.equals(DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value()))
                 : "Initial value of nodes' attributes was changed after 
initialization from the Meta Storage manager.";
     }
 
@@ -723,7 +726,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                         } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
                             newLogicalTopologyBytes = e.value();
 
-                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                            newLogicalTopology = 
deserializeLogicalTopologySet(newLogicalTopologyBytes);
                         }
                     }
 
@@ -831,18 +834,24 @@ public class DistributionZoneManager implements 
IgniteComponent {
     ) {
         Operation[] puts = new Operation[3 + zoneIds.size()];
 
-        puts[0] = put(zonesNodesAttributes(), toBytes(nodesAttributes()));
+        puts[0] = put(zonesNodesAttributes(), 
NodesAttributesSerializer.serialize(nodesAttributes()));
 
         puts[1] = put(zonesRecoverableStateRevision(), 
longToBytesKeepingOrder(revision));
 
-        puts[2] = put(zonesLastHandledTopology(), toBytes(newLogicalTopology));
+        puts[2] = put(
+                zonesLastHandledTopology(),
+                LogicalTopologySetSerializer.serialize(newLogicalTopology)
+        );
 
         int i = 3;
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly 
utilise topology augmentation map. Also this map
         // TODO: can be saved only once for all zones.
         for (Integer zoneId : zoneIds) {
-            puts[i++] = put(zoneTopologyAugmentation(zoneId), 
toBytes(zonesState.get(zoneId).topologyAugmentationMap()));
+            puts[i++] = put(
+                    zoneTopologyAugmentation(zoneId),
+                    
TopologyAugmentationMapSerializer.serialize(zonesState.get(zoneId).topologyAugmentationMap())
+            );
         }
 
         Iif iif = iif(
@@ -982,7 +991,11 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 // Remove redundant nodes that are not presented in the data 
nodes.
                 newDataNodes.entrySet().removeIf(e -> e.getValue() == 0);
 
-                Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+                Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndScaleUpTriggerKey(
+                        zoneId,
+                        revision,
+                        DataNodesMapSerializer.serialize(newDataNodes)
+                );
 
                 Iif iif = iif(
                         
triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, 
scaleDownTriggerRevision, zoneId),
@@ -1088,7 +1101,11 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 // Remove redundant nodes that are not presented in the data 
nodes.
                 newDataNodes.entrySet().removeIf(e -> e.getValue() == 0);
 
-                Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndScaleDownTriggerKey(zoneId, revision, toBytes(newDataNodes));
+                Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndScaleDownTriggerKey(
+                        zoneId,
+                        revision,
+                        DataNodesMapSerializer.serialize(newDataNodes)
+                );
 
                 Iif iif = iif(
                         
triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, 
scaleDownTriggerRevision, zoneId),
@@ -1365,16 +1382,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * Class stores the info about nodes that should be added or removed from 
the data nodes of a zone.
      * With flag {@code addition} we can track whether {@code nodeNames} 
should be added or removed.
      */
-    public static class Augmentation implements Serializable {
-        private static final long serialVersionUID = -7957428671075739621L;
-
-        /** Names of the node. */
+    public static class Augmentation {
+        /** Nodes. */
         private final Set<Node> nodes;
 
         /** Flag that indicates whether {@code nodeNames} should be added or 
removed. */
         private final boolean addition;
 
-        Augmentation(Set<Node> nodes, boolean addition) {
+        public Augmentation(Set<Node> nodes, boolean addition) {
             this.nodes = unmodifiableSet(nodes);
             this.addition = addition;
         }
@@ -1441,7 +1456,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
         Entry topologyEntry = 
metaStorageManager.getLocally(zonesLogicalTopologyKey(), recoveryRevision);
 
         if (topologyEntry.value() != null) {
-            Set<NodeWithAttributes> newLogicalTopology = 
fromBytes(topologyEntry.value());
+            Set<NodeWithAttributes> newLogicalTopology = 
deserializeLogicalTopologySet(topologyEntry.value());
 
             long topologyRevision = topologyEntry.revision();
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 176679ec8c..e8fed7ed5b 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -32,7 +32,6 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 
@@ -63,7 +62,6 @@ import 
org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -428,7 +426,10 @@ public class DistributionZonesUtil {
         List<Operation> operations = new ArrayList<>();
 
         operations.add(put(zonesLogicalTopologyVersionKey(), 
longToBytesKeepingOrder(logicalTopology.version())));
-        operations.add(put(zonesLogicalTopologyKey(), 
ByteUtils.toBytes(topologyFromCmg)));
+        operations.add(put(
+                zonesLogicalTopologyKey(),
+                LogicalTopologySetSerializer.serialize(topologyFromCmg)
+        ));
         if (updateClusterId) {
             operations.add(put(zonesLogicalTopologyClusterIdKey(), 
uuidToBytes(logicalTopology.clusterId())));
         }
@@ -466,7 +467,19 @@ public class DistributionZonesUtil {
 
     @Nullable
     public static Set<Node> parseDataNodes(byte[] dataNodesBytes) {
-        return dataNodesBytes == null ? null : 
dataNodes(fromBytes(dataNodesBytes));
+        return dataNodesBytes == null ? null : 
dataNodes(deserializeDataNodesMap(dataNodesBytes));
+    }
+
+    public static Map<Node, Integer> deserializeDataNodesMap(byte[] bytes) {
+        return DataNodesMapSerializer.deserialize(bytes);
+    }
+
+    public static Set<NodeWithAttributes> deserializeLogicalTopologySet(byte[] 
bytes) {
+        return LogicalTopologySetSerializer.deserialize(bytes);
+    }
+
+    public static Map<UUID, NodeWithAttributes> 
deserializeNodesAttributes(byte[] bytes) {
+        return NodesAttributesSerializer.deserialize(bytes);
     }
 
     /**
@@ -477,7 +490,7 @@ public class DistributionZonesUtil {
      */
     static Map<Node, Integer> extractDataNodes(Entry dataNodesEntry) {
         if (!dataNodesEntry.empty()) {
-            return fromBytes(dataNodesEntry.value());
+            return deserializeDataNodesMap(dataNodesEntry.value());
         } else {
             return emptyMap();
         }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializer.java
new file mode 100644
index 0000000000..93112962bf
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for logical topology sets (represented with 
{@code Set<NodeWithAttributes} instances).
+ */
+public class LogicalTopologySetSerializer extends 
VersionedSerializer<Set<NodeWithAttributes>> {
+    /** Serializer instance. */
+    public static final LogicalTopologySetSerializer INSTANCE = new 
LogicalTopologySetSerializer();
+
+    private final NodeWithAttributesSerializer nodeWithAttributesSerializer = 
NodeWithAttributesSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(Set<NodeWithAttributes> topology, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(topology.size());
+        for (NodeWithAttributes node : topology) {
+            nodeWithAttributesSerializer.writeExternal(node, out);
+        }
+    }
+
+    @Override
+    protected Set<NodeWithAttributes> readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        int length = in.readVarIntAsInt();
+
+        Set<NodeWithAttributes> topology = new 
HashSet<>(IgniteUtils.capacity(length));
+        for (int i = 0; i < length; i++) {
+            topology.add(nodeWithAttributesSerializer.readExternal(in));
+        }
+
+        return topology;
+    }
+
+    /**
+     * Serializes a set to bytes.
+     *
+     * @param set Set to serialize.
+     */
+    public static byte[] serialize(Set<NodeWithAttributes> set) {
+        return VersionedSerialization.toBytes(set, INSTANCE);
+    }
+
+    /**
+     * Deserializes a set from bytes.
+     *
+     * @param bytes Bytes.
+     */
+    public static Set<NodeWithAttributes> deserialize(byte[] bytes) {
+        return VersionedSerialization.fromBytes(bytes, INSTANCE);
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
index a05322c496..e9d69ee320 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.distributionzones;
 
-import java.io.Serializable;
 import java.util.UUID;
 
 /**
@@ -26,9 +25,7 @@ import java.util.UUID;
  * {@code nodeId} that is unique identifier of a node, that changes after a 
restart.
  * {@code nodeId} is needed to get node's attributes from the local state of 
the distribution zone manager.
  */
-public class Node implements Serializable {
-    private static final long serialVersionUID = 875461392587175703L;
-
+public class Node {
     private final String nodeName;
 
     private final UUID nodeId;
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeSerializer.java
new file mode 100644
index 0000000000..772d055a46
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link Node} instances.
+ */
+public class NodeSerializer extends VersionedSerializer<Node> {
+    /** Serializer instance. */
+    public static final NodeSerializer INSTANCE = new NodeSerializer();
+
+    @Override
+    protected void writeExternalData(Node node, IgniteDataOutput out) throws 
IOException {
+        out.writeUTF(node.nodeName());
+        out.writeUuid(node.nodeId());
+    }
+
+    @Override
+    protected Node readExternalData(byte protoVer, IgniteDataInput in) throws 
IOException {
+        String nodeName = in.readUTF();
+        UUID nodeId = in.readUuid();
+
+        return new Node(nodeName, nodeId);
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
index 1758ebc99c..23e616f553 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.distributionzones;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -28,9 +27,7 @@ import org.jetbrains.annotations.Nullable;
  * Structure that represents node with the attributes and which we store in 
Meta Storage when we store logical topology.
  * Light-weighted version of the {@link LogicalNode}.
  */
-public class NodeWithAttributes implements Serializable {
-    private static final long serialVersionUID = -7778967985161743937L;
-
+public class NodeWithAttributes {
     private final Node node;
 
     private final Map<String, String> userAttributes;
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializer.java
new file mode 100644
index 0000000000..e049b7389b
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link NodeWithAttributes} instances.
+ */
+public class NodeWithAttributesSerializer extends 
VersionedSerializer<NodeWithAttributes> {
+    /** Serializer instance. */
+    public static final NodeWithAttributesSerializer INSTANCE = new 
NodeWithAttributesSerializer();
+
+    private final NodeSerializer nodeSerializer = NodeSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(NodeWithAttributes node, IgniteDataOutput 
out) throws IOException {
+        nodeSerializer.writeExternal(node.node(), out);
+
+        out.writeVarInt(node.userAttributes().size());
+        for (Map.Entry<String, String> attrEntry : 
node.userAttributes().entrySet()) {
+            out.writeUTF(attrEntry.getKey());
+            out.writeUTF(attrEntry.getValue());
+        }
+
+        out.writeVarInt(node.storageProfiles().size());
+        for (String profile : node.storageProfiles()) {
+            out.writeUTF(profile);
+        }
+    }
+
+    @Override
+    protected NodeWithAttributes readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        Node node = nodeSerializer.readExternal(in);
+        Map<String, String> userAttributes = readUserAttributes(in);
+        List<String> storageProfiles = readStorageProfiles(in);
+
+        return new NodeWithAttributes(node.nodeName(), node.nodeId(), 
userAttributes, storageProfiles);
+    }
+
+    private static Map<String, String> readUserAttributes(IgniteDataInput in) 
throws IOException {
+        int length = in.readVarIntAsInt();
+
+        Map<String, String> attrs = new 
HashMap<>(IgniteUtils.capacity(length));
+        for (int i = 0; i < length; i++) {
+            attrs.put(in.readUTF(), in.readUTF());
+        }
+
+        return attrs;
+    }
+
+    private static List<String> readStorageProfiles(IgniteDataInput in) throws 
IOException {
+        int length = in.readVarIntAsInt();
+
+        List<String> storageProfiles = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            storageProfiles.add(in.readUTF());
+        }
+
+        return storageProfiles;
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializer.java
new file mode 100644
index 0000000000..f3984daf34
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for nodes attributes (represented with {@code 
Map<UUID, NodeWithAttributes>}).
+ *
+ * <p>The map is deserialized as a {@link ConcurrentHashMap}.
+ */
+public class NodesAttributesSerializer extends VersionedSerializer<Map<UUID, 
NodeWithAttributes>> {
+    /** Serializer instance. */
+    public static final NodesAttributesSerializer INSTANCE = new 
NodesAttributesSerializer();
+
+    private final NodeWithAttributesSerializer nodeWithAttributesSerializer = 
NodeWithAttributesSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(Map<UUID, NodeWithAttributes> map, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(map.size());
+        for (Map.Entry<UUID, NodeWithAttributes> entry : map.entrySet()) {
+            NodeWithAttributes value = entry.getValue();
+
+            assert entry.getKey().equals(value.nodeId()) : map;
+
+            nodeWithAttributesSerializer.writeExternal(value, out);
+        }
+    }
+
+    @Override
+    protected Map<UUID, NodeWithAttributes> readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        int length = in.readVarIntAsInt();
+
+        List<NodeWithAttributes> nodes = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            nodes.add(nodeWithAttributesSerializer.readExternal(in));
+        }
+
+        return nodes.stream().collect(
+                toMap(NodeWithAttributes::nodeId, identity(), (x, y) -> y, 
ConcurrentHashMap::new)
+        );
+    }
+
+    /**
+     * Serializes a map to bytes.
+     *
+     * @param map Map to serialize.
+     */
+    public static byte[] serialize(Map<UUID, NodeWithAttributes> map) {
+        return VersionedSerialization.toBytes(map, INSTANCE);
+    }
+
+    /**
+     * Deserializes a map from bytes.
+     *
+     * @param bytes Bytes.
+     */
+    public static Map<UUID, NodeWithAttributes> deserialize(byte[] bytes) {
+        return VersionedSerialization.fromBytes(bytes, INSTANCE);
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializer.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializer.java
new file mode 100644
index 0000000000..c403137227
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for topology augmentation maps represented with 
{@code ConcurrentSkipListMap<Long, Augmentation>}.
+ */
+public class TopologyAugmentationMapSerializer extends 
VersionedSerializer<ConcurrentSkipListMap<Long, Augmentation>> {
+    /** Serializer instance. */
+    public static final TopologyAugmentationMapSerializer INSTANCE = new 
TopologyAugmentationMapSerializer();
+
+    private final AugmentationSerializer augmentationSerializer = 
AugmentationSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(ConcurrentSkipListMap<Long, Augmentation> 
map, IgniteDataOutput out) throws IOException {
+        out.writeVarInt(map.size());
+        for (Entry<Long, Augmentation> entry : map.entrySet()) {
+            out.writeVarInt(entry.getKey());
+            augmentationSerializer.writeExternal(entry.getValue(), out);
+        }
+    }
+
+    @Override
+    protected ConcurrentSkipListMap<Long, Augmentation> readExternalData(byte 
protoVer, IgniteDataInput in) throws IOException {
+        int length = in.readVarIntAsInt();
+
+        ConcurrentSkipListMap<Long, Augmentation> map = new 
ConcurrentSkipListMap<>();
+        for (int i = 0; i < length; i++) {
+            map.put(in.readVarInt(), augmentationSerializer.readExternal(in));
+        }
+
+        return map;
+    }
+
+    /**
+     * Serializes a map to bytes.
+     *
+     * @param map Map to serialize.
+     */
+    public static byte[] serialize(ConcurrentSkipListMap<Long, Augmentation> 
map) {
+        return VersionedSerialization.toBytes(map, INSTANCE);
+    }
+
+    /**
+     * Deserializes a map from bytes.
+     *
+     * @param bytes Bytes.
+     */
+    public static ConcurrentSkipListMap<Long, Augmentation> deserialize(byte[] 
bytes) {
+        return VersionedSerialization.fromBytes(bytes, INSTANCE);
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
index 838224f922..5fb23cbb0b 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
@@ -21,13 +21,13 @@ import static java.lang.Math.max;
 import static java.util.Collections.emptySet;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.causality.OutdatedTokenException;
 import org.apache.ignite.internal.causality.VersionedValue;
+import org.apache.ignite.internal.distributionzones.DataNodesMapSerializer;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
@@ -188,7 +189,7 @@ public class CausalityDataNodesEngine {
                     return emptySet();
                 }
 
-                Set<NodeWithAttributes> logicalTopology = 
fromBytes(topologyEntry.value());
+                Set<NodeWithAttributes> logicalTopology = 
deserializeLogicalTopologySet(topologyEntry.value());
 
                 Set<Node> logicalTopologyNodes = 
logicalTopology.stream().map(n -> n.node()).collect(toSet());
 
@@ -227,7 +228,9 @@ public class CausalityDataNodesEngine {
                 return emptySet();
             }
 
-            Set<Node> baseDataNodes = 
DistributionZonesUtil.dataNodes(fromBytes(dataNodesEntry.value()));
+            Set<Node> baseDataNodes = DistributionZonesUtil.dataNodes(
+                    DataNodesMapSerializer.deserialize(dataNodesEntry.value())
+            );
             long scaleUpTriggerRevision = 
bytesToLongKeepingOrder(scaleUpChangeTriggerKey.value());
             long scaleDownTriggerRevision = 
bytesToLongKeepingOrder(scaleDownChangeTriggerKey.value());
 
@@ -410,7 +413,7 @@ public class CausalityDataNodesEngine {
         if (!topologyEntry.empty()) {
             byte[] newerLogicalTopologyBytes = topologyEntry.value();
 
-            newerLogicalTopology = fromBytes(newerLogicalTopologyBytes);
+            newerLogicalTopology = 
deserializeLogicalTopologySet(newerLogicalTopologyBytes);
 
             newerTopologyRevision = topologyEntry.revision();
 
@@ -426,7 +429,7 @@ public class CausalityDataNodesEngine {
                 } else {
                     byte[] olderLogicalTopologyBytes = topologyEntry.value();
 
-                    olderLogicalTopology = 
fromBytes(olderLogicalTopologyBytes);
+                    olderLogicalTopology = 
deserializeLogicalTopologySet(olderLogicalTopologyBytes);
                 }
 
                 CatalogZoneDescriptor zoneDescriptor = 
catalogManager.catalog(catalogVersion).zone(zoneId);
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializerTest.java
new file mode 100644
index 0000000000..5edd1c1754
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesMapSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class DataNodesMapSerializerTest {
+    private final DataNodesMapSerializer serializer = new 
DataNodesMapSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        Map<Node, Integer> originalMap = Map.of(
+                new Node("a", new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L)), 1000,
+                new Node("b", new UUID(0xFEDCBA0987654321L, 
0x1234567890ABCDEFL)), 2000
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+        Map<Node, Integer> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMap, equalTo(originalMap));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwMB775DAmHvzauQeFY0EiFDZYcJutz+6QcB775DAmIhQ2WHCbrc/u/Nq5B4VjQS0Q8=");
+        Map<Node, Integer> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMap, is(aMapWithSize(2)));
+        assertThat(restoredMap, hasEntry(new Node("a", new 
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)), 1000));
+        assertThat(restoredMap, hasEntry(new Node("b", new 
UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL)), 2000));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializerTest.java
new file mode 100644
index 0000000000..469dff5ef5
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/LogicalTopologySetSerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class LogicalTopologySetSerializerTest {
+    private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L);
+    private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L, 
0x1234567890ABCDEFL);
+
+    private final LogicalTopologySetSerializer serializer = new 
LogicalTopologySetSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        Set<NodeWithAttributes> originalNodes = Set.of(
+                new NodeWithAttributes(
+                    "node1",
+                    new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
+                    Map.of("a1", "v1"),
+                    List.of("prof1")
+                ),
+                new NodeWithAttributes(
+                    "node2",
+                    new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+                    Map.of("a2", "v2"),
+                    List.of("prof2")
+                )
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalNodes, 
serializer);
+        Set<NodeWithAttributes> restoredNodes = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredNodes, equalTo(originalNodes));
+        assertEquals(
+                
originalNodes.stream().map(NodeWithAttributes::nodeId).collect(toSet()),
+                
restoredNodes.stream().map(NodeWithAttributes::nodeId).collect(toSet())
+        );
+    }
+
+    @Test
+    void v1CanBeSerialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwMB775DAe++QwZub2RlMe/Nq5B4VjQSIUNlhwm63P4CA2ExA3YxAgZwcm9mMQHvvkMB775DBm5vZGUy"
+                + "IUNlhwm63P7vzauQeFY0EgIDYTIDdjICBnByb2Yy");
+        Set<NodeWithAttributes> restoredNodes = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredNodes, hasSize(2));
+
+        NodeWithAttributes node1 = restoredNodes.stream()
+                .filter(n -> n.nodeId().equals(NODE1_ID))
+                .findAny().orElseThrow();
+        assertThat(node1.nodeId(), is(NODE1_ID));
+        assertThat(node1.nodeName(), is("node1"));
+        assertThat(node1.userAttributes(), equalTo(Map.of("a1", "v1")));
+        assertThat(node1.storageProfiles(), contains("prof1"));
+
+        NodeWithAttributes node2 = restoredNodes.stream()
+                .filter(n -> n.nodeId().equals(NODE2_ID))
+                .findAny().orElseThrow();
+        assertThat(node2.nodeId(), is(NODE2_ID));
+        assertThat(node2.nodeName(), is("node2"));
+        assertThat(node2.userAttributes(), equalTo(Map.of("a2", "v2")));
+        assertThat(node2.storageProfiles(), contains("prof2"));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializerTest.java
new file mode 100644
index 0000000000..938550b313
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodeWithAttributesSerializerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class NodeWithAttributesSerializerTest {
+    private final NodeWithAttributesSerializer serializer = new 
NodeWithAttributesSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        NodeWithAttributes originalNode = new NodeWithAttributes(
+                "test",
+                new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
+                Map.of("a1", "v1", "a2", "v2"),
+                List.of("prof1", "prof2")
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalNode, 
serializer);
+        NodeWithAttributes restoredNode = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredNode.nodeId(), is(new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L)));
+        assertThat(restoredNode.nodeName(), is("test"));
+        assertThat(restoredNode.userAttributes(), equalTo(Map.of("a1", "v1", 
"a2", "v2")));
+        assertThat(restoredNode.storageProfiles(), contains("prof1", "prof2"));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwHvvkMFdGVzdO/Nq5B4VjQSIUNlhwm63P4DA2ExA3YxA2EyA3YyAwZwcm9mMQZwcm9mMg==");
+        NodeWithAttributes restoredNode = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredNode.nodeId(), is(new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L)));
+        assertThat(restoredNode.nodeName(), is("test"));
+        assertThat(restoredNode.userAttributes(), equalTo(Map.of("a1", "v1", 
"a2", "v2")));
+        assertThat(restoredNode.storageProfiles(), contains("prof1", "prof2"));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializerTest.java
new file mode 100644
index 0000000000..a6effb0a02
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/NodesAttributesSerializerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class NodesAttributesSerializerTest {
+    private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L);
+    private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L, 
0x1234567890ABCDEFL);
+
+    private static final String SERIALIZED_WITH_V1 = 
"Ae++QwMB775DAe++QwZ0ZXN0Me/Nq5B4VjQSIUNlhwm63P4CA2ExA3YxAgZwcm9mMQHvvkMB775DBnRlc"
+            + "3QyIUNlhwm63P7vzauQeFY0EgIDYTIDdjICBnByb2Yy";
+
+    private final NodesAttributesSerializer serializer = new 
NodesAttributesSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        Map<UUID, NodeWithAttributes> originalMap = originalMap();
+
+        byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+        Map<UUID, NodeWithAttributes> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMap, equalTo(originalMap));
+        // Also make sure that IDs are restored correctly as they are ignored 
by NodeWithAttributes#equals().
+        for (Map.Entry<UUID, NodeWithAttributes> entry : 
originalMap.entrySet()) {
+            assertThat(entry.getValue().nodeId(), equalTo(entry.getKey()));
+        }
+    }
+
+    private static Map<UUID, NodeWithAttributes> originalMap() {
+        NodeWithAttributes node1 = new NodeWithAttributes("test1", NODE1_ID, 
Map.of("a1", "v1"), List.of("prof1"));
+        NodeWithAttributes node2 = new NodeWithAttributes("test2", NODE2_ID, 
Map.of("a2", "v2"), List.of("prof2"));
+
+        return new ConcurrentHashMap<>(Map.of(
+                node1.nodeId(), node1,
+                node2.nodeId(), node2
+        ));
+    }
+
+    @Test
+    void deserializesAsConcurrentHashMap() {
+        Map<UUID, NodeWithAttributes> originalMap = originalMap();
+
+        byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+        Map<UUID, NodeWithAttributes> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMap, isA(ConcurrentHashMap.class));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = Base64.getDecoder().decode(SERIALIZED_WITH_V1);
+        Map<UUID, NodeWithAttributes> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMap, aMapWithSize(2));
+
+        NodeWithAttributes node1 = restoredMap.get(NODE1_ID);
+        assertThat(node1.nodeId(), is(NODE1_ID));
+        assertThat(node1.nodeName(), is("test1"));
+        assertThat(node1.userAttributes(), equalTo(Map.of("a1", "v1")));
+        assertThat(node1.storageProfiles(), contains("prof1"));
+
+        NodeWithAttributes node2 = restoredMap.get(NODE2_ID);
+        assertThat(node2.nodeId(), is(NODE2_ID));
+        assertThat(node2.nodeName(), is("test2"));
+        assertThat(node2.userAttributes(), equalTo(Map.of("a2", "v2")));
+        assertThat(node2.storageProfiles(), contains("prof2"));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializerTest.java
new file mode 100644
index 0000000000..6b20a6f5f0
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/TopologyAugmentationMapSerializerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class TopologyAugmentationMapSerializerTest {
+    private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L);
+    private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L, 
0x1234567890ABCDEFL);
+
+    private final TopologyAugmentationMapSerializer serializer = new 
TopologyAugmentationMapSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        Node node1 = new Node("node1", NODE1_ID);
+        Node node2 = new Node("node2", NODE2_ID);
+        ConcurrentSkipListMap<Long, Augmentation> originalMap = new 
ConcurrentSkipListMap<>(Map.of(
+                1000L, new Augmentation(Set.of(node1, node2), true),
+                2000L, new Augmentation(Set.of(node2, node1), false)
+        ));
+
+        byte[] bytes = VersionedSerialization.toBytes(originalMap, serializer);
+        ConcurrentSkipListMap<Long, Augmentation> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMap, is(aMapWithSize(2)));
+
+        Augmentation augmentation1 = restoredMap.get(1000L);
+        assertThat(augmentation1.nodes(), containsInAnyOrder(node1, node2));
+        assertThat(augmentation1.addition(), is(true));
+
+        Augmentation augmentation2 = restoredMap.get(2000L);
+        assertThat(augmentation2.nodes(), containsInAnyOrder(node1, node2));
+        assertThat(augmentation2.addition(), is(false));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwPpBwHvvkMDAe++QwZub2RlMe/Nq5B4VjQSIUNlhwm63P4B775DBm5vZGUyIUNlhwm63P7vzauQeFY0"
+                + 
"EgHRDwHvvkMDAe++QwZub2RlMiFDZYcJutz+782rkHhWNBIB775DBm5vZGUx782rkHhWNBIhQ2WHCbrc/gA=");
+        ConcurrentSkipListMap<Long, Augmentation> restoredMap = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        Node node1 = new Node("node1", NODE1_ID);
+        Node node2 = new Node("node2", NODE2_ID);
+
+        assertThat(restoredMap, is(aMapWithSize(2)));
+
+        Augmentation augmentation1 = restoredMap.get(1000L);
+        assertThat(augmentation1.nodes(), containsInAnyOrder(node1, node2));
+        assertThat(augmentation1.addition(), is(true));
+
+        Augmentation augmentation2 = restoredMap.get(2000L);
+        assertThat(augmentation2.nodes(), containsInAnyOrder(node1, node2));
+        assertThat(augmentation2.addition(), is(false));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index b36870ca9f..ce55d8ebf6 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -31,6 +31,8 @@ import static 
org.apache.ignite.internal.cluster.management.topology.LogicalTopo
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
@@ -40,7 +42,6 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -668,7 +669,7 @@ public class DistributionZoneCausalityDataNodesTest extends 
BaseDistributionZone
         assertValueInStorage(
                 metaStorageManager,
                 zoneDataNodesKey(zoneId),
-                (v) -> 
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                (v) -> 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                 ONE_NODE_NAME,
                 TIMEOUT
         );
@@ -722,7 +723,7 @@ public class DistributionZoneCausalityDataNodesTest extends 
BaseDistributionZone
         assertValueInStorage(
                 metaStorageManager,
                 zoneDataNodesKey(zoneId),
-                (v) -> 
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                (v) -> 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                 TWO_NODES_NAMES,
                 TIMEOUT
         );
@@ -1170,7 +1171,7 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
             assertValueInStorage(
                     metaStorageManager,
                     zoneDataNodesKey(zoneId),
-                    (v) -> 
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                    (v) -> 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                     TWO_NODES_NAMES,
                     TIMEOUT
             );
@@ -1200,7 +1201,7 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
             assertValueInStorage(
                     metaStorageManager,
                     zoneDataNodesKey(entry.getKey()),
-                    (v) -> 
DistributionZonesUtil.dataNodes(fromBytes(v)).stream().map(Node::nodeName).collect(toSet()),
+                    (v) -> 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(v)).stream().map(Node::nodeName).collect(toSet()),
                     entry.getValue(),
                     TIMEOUT
             );
@@ -1434,7 +1435,7 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
                     if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
                         revision = e.revision();
                     } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
-                        newLogicalTopology = fromBytes(e.value());
+                        newLogicalTopology = 
deserializeLogicalTopologySet(e.value());
                     }
                 }
 
@@ -1481,7 +1482,7 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
                         byte[] dataNodesBytes = e.value();
 
                         if (dataNodesBytes != null) {
-                            newDataNodes = 
DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
+                            newDataNodes = 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(dataNodesBytes));
                         } else {
                             newDataNodes = emptySet();
                         }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index ec488923ed..a1653e7927 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -35,7 +35,6 @@ import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistribu
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.sql.ColumnType.STRING;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,6 +67,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.distributionzones.DataNodesMapSerializer;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.distributionzones.Node;
@@ -536,7 +536,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
         byte[] newLogicalTopology;
 
         if (nodes != null) {
-            newLogicalTopology = toBytes(toDataNodesMap(nodes.stream()
+            newLogicalTopology = 
DataNodesMapSerializer.serialize(toDataNodesMap(nodes.stream()
                     .map(n -> new Node(n, findNodeIdByConsistentId(n)))
                     .collect(toSet())));
         } else {
diff --git 
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
 
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 5ae827b891..6aea6e3e59 100644
--- 
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ 
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.distributionzones;
 
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeDataNodesMap;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseStorageProfiles;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
@@ -27,7 +28,6 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -215,7 +215,7 @@ public class DistributionZonesTestUtil {
         assertValueInStorage(
                 keyValueStorage,
                 zoneDataNodesKey(zoneId).bytes(),
-                value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+                value -> 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(value)),
                 nodes,
                 2000
         );
@@ -237,7 +237,7 @@ public class DistributionZonesTestUtil {
         assertValueInStorage(
                 keyValueStorage,
                 zoneDataNodesKey(zoneId).bytes(),
-                value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+                value -> 
DistributionZonesUtil.dataNodes(deserializeDataNodesMap(value)),
                 nodes,
                 2000
         );
@@ -305,7 +305,7 @@ public class DistributionZonesTestUtil {
         assertValueInStorage(
                 keyValueStorage,
                 zonesLogicalTopologyKey().bytes(),
-                ByteUtils::fromBytes,
+                DistributionZonesUtil::deserializeLogicalTopologySet,
                 nodes,
                 1000
         );
@@ -331,7 +331,7 @@ public class DistributionZonesTestUtil {
         assertValueInStorage(
                 metaStorageManager,
                 zonesLogicalTopologyKey(),
-                ByteUtils::fromBytes,
+                DistributionZonesUtil::deserializeLogicalTopologySet,
                 nodes,
                 1000
         );

Reply via email to