This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e21f50be0 IGNITE-23496 Optimize lease batch serialization (#4608)
9e21f50be0 is described below

commit 9e21f50be02f78afcf5755a43f3c2baf7f006c98
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 25 18:19:17 2024 +0400

    IGNITE-23496 Optimize lease batch serialization (#4608)
---
 .../ClusterStatePersistentSerializer.java          |  38 --
 ...plicationGroupId.java => PartitionGroupId.java} |  11 +-
 .../internal/replicator/ReplicationGroupId.java    |   4 +-
 .../internal/replicator/TablePartitionId.java      |   9 +-
 .../internal/versioned/VersionedSerializer.java    |  40 ++
 .../internal/placementdriver/ReplicaMeta.java      |   3 +-
 .../internal/placementdriver/leases/Lease.java     | 111 -----
 .../placementdriver/leases/LeaseBatch.java         |  10 +-
 .../leases/LeaseBatchSerializer.java               | 534 +++++++++++++++++++++
 .../placementdriver/leases/NodesDictionary.java    | 161 +++++++
 .../leases/LeaseBatchSerializerTest.java           | 258 ++++++++++
 .../leases/LeaseSerializationTest.java             |  32 +-
 .../leases/NodesDictionaryTest.java                | 169 +++++++
 .../ResetClusterMessagePersistentSerializer.java   |  31 --
 .../ManualGroupRestartRequestSerializer.java       |  20 +-
 15 files changed, 1188 insertions(+), 243 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
index 7bd3240751..6806207e47 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
@@ -17,13 +17,9 @@
 
 package org.apache.ignite.internal.cluster.management;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import org.apache.ignite.internal.util.io.IgniteDataInput;
@@ -58,20 +54,6 @@ public class ClusterStatePersistentSerializer extends 
VersionedSerializer<Cluste
         }
     }
 
-    private static void writeStringSet(Set<String> strings, IgniteDataOutput 
out) throws IOException {
-        out.writeVarInt(strings.size());
-        for (String str : strings) {
-            out.writeUTF(str);
-        }
-    }
-
-    private static void writeNullableString(@Nullable String str, 
IgniteDataOutput out) throws IOException {
-        out.writeVarInt(str == null ? -1 : str.length());
-        if (str != null) {
-            out.writeByteArray(str.getBytes(UTF_8));
-        }
-    }
-
     @Override
     protected ClusterState readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
         return CMG_MSGS_FACTORY.clusterState()
@@ -84,26 +66,6 @@ public class ClusterStatePersistentSerializer extends 
VersionedSerializer<Cluste
                 .build();
     }
 
-    private static Set<String> readStringSet(IgniteDataInput in) throws 
IOException {
-        int size = in.readVarIntAsInt();
-
-        Set<String> result = new HashSet<>(size);
-        for (int i = 0; i < size; i++) {
-            result.add(in.readUTF());
-        }
-
-        return result;
-    }
-
-    private static @Nullable String readNullableString(IgniteDataInput in) 
throws IOException {
-        int lengthOrMinusOne = in.readVarIntAsInt();
-        if (lengthOrMinusOne == -1) {
-            return null;
-        }
-
-        return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
-    }
-
     private static @Nullable List<UUID> readFormerClusterIds(IgniteDataInput 
in) throws IOException {
         int length = in.readVarIntAsInt();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/PartitionGroupId.java
similarity index 75%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/replicator/PartitionGroupId.java
index cc3fb4dcb5..666bd065ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/PartitionGroupId.java
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.internal.replicator;
 
-import java.io.Serializable;
-
 /**
- * The interface represents a replication group identifier.
+ * A {@link ReplicationGroupId} which corresponds to partition of a 
partitioned object.
  */
-public interface ReplicationGroupId extends Serializable {
+public interface PartitionGroupId extends ReplicationGroupId {
+    /** Returns ID of the partitioned object. */
+    int objectId();
+
+    /** Returns partition ID. */
+    int partitionId();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
index cc3fb4dcb5..8348d22995 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.replicator;
 
-import java.io.Serializable;
-
 /**
  * The interface represents a replication group identifier.
  */
-public interface ReplicationGroupId extends Serializable {
+public interface ReplicationGroupId {
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
index fba5ff7768..26a2518cc3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
@@ -18,10 +18,11 @@
 package org.apache.ignite.internal.replicator;
 
 // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be 
refactored to ZonePartitionId.
+
 /**
  * The class is used to identify a table replication group.
  */
-public class TablePartitionId implements ReplicationGroupId {
+public class TablePartitionId implements PartitionGroupId {
 
     /** Table id. */
     private final int tableId;
@@ -52,11 +53,17 @@ public class TablePartitionId implements ReplicationGroupId 
{
         return new TablePartitionId(Integer.parseInt(parts[0]), 
Integer.parseInt(parts[1]));
     }
 
+    @Override
+    public int objectId() {
+        return tableId;
+    }
+
     /**
      * Get the partition id.
      *
      * @return Partition id.
      */
+    @Override
     public int partitionId() {
         return partId;
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
index a2d78c5d5d..5cf2ecf977 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
@@ -17,9 +17,15 @@
 
 package org.apache.ignite.internal.versioned;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Serializes and deserializes objects in a versioned way: that is, includes 
version to make it possible to deserialize objects serialized
@@ -95,4 +101,38 @@ public abstract class VersionedSerializer<T> {
 
         return readExternalData(ver, in);
     }
+
+    protected static void writeNullableString(@Nullable String str, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(str == null ? -1 : str.length());
+        if (str != null) {
+            out.writeByteArray(str.getBytes(UTF_8));
+        }
+    }
+
+    protected static @Nullable String readNullableString(IgniteDataInput in) 
throws IOException {
+        int lengthOrMinusOne = in.readVarIntAsInt();
+        if (lengthOrMinusOne == -1) {
+            return null;
+        }
+
+        return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
+    }
+
+    protected static void writeStringSet(Set<String> strings, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(strings.size());
+        for (String str : strings) {
+            out.writeUTF(str);
+        }
+    }
+
+    protected static Set<String> readStringSet(IgniteDataInput in) throws 
IOException {
+        int size = in.readVarIntAsInt();
+
+        Set<String> result = new HashSet<>(IgniteUtils.capacity(size));
+        for (int i = 0; i < size; i++) {
+            result.add(in.readUTF());
+        }
+
+        return result;
+    }
 }
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
index 288da9af9f..76f9e4f557 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.internal.placementdriver;
 
-import java.io.Serializable;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.jetbrains.annotations.Nullable;
 
 /** Replica lease meta. */
-public interface ReplicaMeta extends Serializable {
+public interface ReplicaMeta {
     /** Gets a leaseholder node consistent ID (assigned to a node once), 
{@code null} if nothing holds the lease. */
     @Nullable String getLeaseholder();
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
index e22d8a435a..bad5b2f269 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -17,23 +17,15 @@
 
 package org.apache.ignite.internal.placementdriver.leases;
 
-import static java.nio.ByteOrder.LITTLE_ENDIAN;
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.ByteUtils.stringFromBytes;
-import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 
-import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,8 +33,6 @@ import org.jetbrains.annotations.Nullable;
  * The real lease is stored in Meta storage.
  */
 public class Lease implements ReplicaMeta {
-    private static final long serialVersionUID = 394641185393949608L;
-
     /** Node consistent ID (assigned to a node once), {@code null} if nothing 
holds the lease. */
     private final @Nullable String leaseholder;
 
@@ -199,63 +189,6 @@ public class Lease implements ReplicaMeta {
         return proposedCandidate;
     }
 
-    /**
-     * Encodes this lease into sequence of bytes.
-     *
-     * @return Lease representation in a byte array.
-     */
-    public byte[] bytes() {
-        byte[] leaseholderBytes = stringToBytes(leaseholder);
-        byte[] leaseholderIdBytes = leaseholderId == null ? null : 
stringToBytes(leaseholderId.toString());
-        byte[] proposedCandidateBytes = stringToBytes(proposedCandidate);
-        byte[] groupIdBytes = toBytes(replicationGroupId);
-
-        int bufSize = 2 // accepted + prolongable
-                + HYBRID_TIMESTAMP_SIZE * 2 // startTime + expirationTime
-                + bytesSizeForWrite(leaseholderBytes) + 
bytesSizeForWrite(leaseholderIdBytes) + 
bytesSizeForWrite(proposedCandidateBytes)
-                + bytesSizeForWrite(groupIdBytes);
-
-        ByteBuffer buf = ByteBuffer.allocate(bufSize).order(LITTLE_ENDIAN);
-
-        putBoolean(buf, accepted);
-        putBoolean(buf, prolongable);
-
-        putHybridTimestamp(buf, startTime);
-        putHybridTimestamp(buf, expirationTime);
-
-        putBytes(buf, leaseholderBytes);
-        putBytes(buf, leaseholderIdBytes);
-        putBytes(buf, proposedCandidateBytes);
-        putBytes(buf, groupIdBytes);
-
-        return buf.array();
-    }
-
-    /**
-     * Decodes a lease from the sequence of bytes.
-     *
-     * @param buf Byte buffer containing lease representation. Requires to be 
in little-endian.
-     * @return Decoded lease.
-     */
-    public static Lease fromBytes(ByteBuffer buf) {
-        assert buf.order() == LITTLE_ENDIAN;
-
-        boolean accepted = getBoolean(buf);
-        boolean prolongable = getBoolean(buf);
-
-        HybridTimestamp startTime = getHybridTimestamp(buf);
-        HybridTimestamp expirationTime = getHybridTimestamp(buf);
-
-        String leaseholder = stringFromBytes(getBytes(buf));
-        String leaseholderIdString = stringFromBytes(getBytes(buf));
-        UUID leaseholderId = leaseholderIdString == null ? null : 
UUID.fromString(leaseholderIdString);
-        String proposedCandidate = stringFromBytes(getBytes(buf));
-
-        ReplicationGroupId groupId = ByteUtils.fromBytes(getBytes(buf));
-
-        return new Lease(leaseholder, leaseholderId, startTime, 
expirationTime, prolongable, accepted, proposedCandidate, groupId);
-    }
-
     /**
      * Returns a lease that no one holds and is always expired.
      *
@@ -289,48 +222,4 @@ public class Lease implements ReplicaMeta {
     public int hashCode() {
         return Objects.hash(leaseholder, leaseholderId, accepted, startTime, 
expirationTime, prolongable, replicationGroupId);
     }
-
-    private static int bytesSizeForWrite(byte @Nullable [] bytes) {
-        return Integer.BYTES + (bytes == null ? 0 : bytes.length);
-    }
-
-    private static void putBoolean(ByteBuffer buffer, boolean b) {
-        buffer.put((byte) (b ? 1 : 0));
-    }
-
-    private static boolean getBoolean(ByteBuffer buffer) {
-        return buffer.get() == 1;
-    }
-
-    private static void putHybridTimestamp(ByteBuffer buffer, HybridTimestamp 
hybridTimestamp) {
-        buffer.putLong(hybridTimestamp.longValue());
-    }
-
-    private static HybridTimestamp getHybridTimestamp(ByteBuffer buffer) {
-        return hybridTimestamp(buffer.getLong());
-    }
-
-    private static void putBytes(ByteBuffer buffer, byte @Nullable [] bytes) {
-        buffer.putInt(bytes == null ? -1 : bytes.length);
-
-        if (bytes != null) {
-            buffer.put(bytes);
-        }
-    }
-
-    private static byte @Nullable [] getBytes(ByteBuffer buffer) {
-        int bytesLen = buffer.getInt();
-
-        if (bytesLen < 0) {
-            return null;
-        } else if (bytesLen == 0) {
-            return BYTE_EMPTY_ARRAY;
-        }
-
-        byte[] bytes = new byte[bytesLen];
-
-        buffer.get(bytes);
-
-        return bytes;
-    }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
index 2ef68105e5..a6900a0ab1 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
@@ -17,11 +17,10 @@
 
 package org.apache.ignite.internal.placementdriver.leases;
 
-import static org.apache.ignite.internal.util.IgniteUtils.bytesToList;
-import static org.apache.ignite.internal.util.IgniteUtils.collectionToBytes;
-
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 
 /**
  * Representation of leases batch.
@@ -38,10 +37,11 @@ public class LeaseBatch {
     }
 
     public byte[] bytes() {
-        return collectionToBytes(leases, Lease::bytes);
+        return VersionedSerialization.toBytes(this, 
LeaseBatchSerializer.INSTANCE);
     }
 
     public static LeaseBatch fromBytes(ByteBuffer bytes) {
-        return new LeaseBatch(bytesToList(bytes, Lease::fromBytes));
+        byte[] byteArray = ByteUtils.toByteArray(bytes);
+        return VersionedSerialization.fromBytes(byteArray, 
LeaseBatchSerializer.INSTANCE);
     }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
new file mode 100644
index 0000000000..b0e9b76680
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link LeaseBatch} instances.
+ *
+ * <p>Format grammar:
+ * <pre>{@code
+ *     <BATCH> ::= <HEADER> <TABLE_LEASES_SECTION>
+ *
+ *     <HEADER> ::=
+ *       <MIN_EXPIRATION_TIME_PHYSICAL_PART> (varint) // Self-explanatory
+ *       <COMMON_EXPIRATION_TIME_PHYSICAL_PART_DELTA> (varint) // Delta 
between physical part of common expiration time and min exp. time
+ *       <COMMON_EXPIRATION_TIME_LOGICAL_PART> (varint) // Logical part of 
most common expiration time
+ *       <NODE_DICTIONARY>
+ *
+ *     <NODE_DICTIONARY> ::=
+ *       <NAME_COUNT> (varint)
+ *       { <NODE_NAME> (UTF) } (nameCount times)
+ *       <NODE_COUNT> (varint)
+ *       {
+ *         <NODE_ID> (UUID)
+ *         <NODE_NAME_INDEX> (varint) // Index in the name table (defined 
above)
+ *       } (nodeCount times)
+ *
+ *     <TABLE_LEASES_SECTION> ::=
+ *       <TABLE_COUNT> (varint)
+ *       { <OBJECT_LEASES> } // tableCount of OBJECT_LEASES elements
+ *
+ *     <OBJECT_LEASES> ::=
+ *       <OBJECT_ID_DELTA> (varint) // For first object in section, it's full 
object ID; for subsequent once, it's object ID minus
+ *                                  // previous object ID
+ *       <LEASE_COUNT> (varint) // Number of leases (including holes) for 
current object
+ *       { <LEASE> } // leaseCount times
+ *
+ *     <LEASE> ::=
+ *       <FLAGS> (byte)
+ *       [ // Only present if DUMMY_LEASE flag is 0
+ *         <HOLDER_AND_PROPOSED_CANDIDATE>
+ *         [ <EXPIRATION_TIME> ] // Only written if 
HAS_UNCOMMON_EXPIRATION_TIME flag is 1
+ *         <START_TIME>
+ *       ]
+ *
+ *       <HOLDER_AND_PROPOSED_CANDIDATE> ::=
+ *         <HOLDER_AND_PROPOSED_CANDIDATE_COMPACTLY> (byte) // Only if number 
of node names in the dictionary is 8 or less; lowest 3 bits
+ *                                                          // encode holder 
index (in the nodes table), and next 3 bits are for proposed
+ *                                                          // candidate index 
(in the names table)
+ *         | <HOLDER_INDEX> (varint) [ <PROPOSED_CANDIDATE> (varint) ] // 
Proposed candidate is only written if HAS_PROPOSED_CANDIDATE
+ *                                                                     // flag 
is 1
+ *       <EXPIRATION_TIME> ::=
+ *         <EXPIRATION_TIME_PHYSICAL_PART_DELTA> (varint) // Relative to 
minExpirationTimePhysicalPart
+ *         [ EXPIRATION_TIME_LOGICAL_PART ] (varint) // Only written if 
HAS_EXPIRATION_TIME_LOGICAL flag is 1
+ *
+ *       <START_TIME> ::=
+ *         <PERIOD> // Difference between physical parts of expirationTime and 
startTime
+ *         <START_TIME_LOGICAL_PART (varint)
+ * }</pre>
+ *
+ * <p>The following optimizations are applied to minimize the amount of bytes 
a batch is serialized to:</p>
+ * <ul>
+ *     <li>Java Serialization is not used to serialize components (it's pretty 
verbose)</li>
+ *     <li>Varints are used extensively</li>
+ *     <li>A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+ *     per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+ *     and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+ *     </li>
+ *     <li>Leases are grouped per table/zone ID (aka object ID), so an object 
ID is only written once per table </li>
+ * </ul>
+ */
+public class LeaseBatchSerializer extends VersionedSerializer<LeaseBatch> {
+    /** Serializer instance. */
+    public static final LeaseBatchSerializer INSTANCE = new 
LeaseBatchSerializer();
+
+    /** Contains {@link Lease#isAccepted()}. */
+    @SuppressWarnings("PointlessBitwiseExpression")
+    private static final int ACCEPTED_MASK = 1 << 0;
+
+    /** Contains {@link Lease#isProlongable()}. */
+    private static final int PROLONGABLE_MASK = 1 << 1;
+
+    /** Whether the lease has a non-null {@link Lease#proposedCandidate()}. */
+    private static final int HAS_PROPOSED_CANDIDATE_MASK = 1 << 2;
+
+    /** Whether expiration time differs from the most common expiration time 
in the batch. */
+    private static final int HAS_UNCOMMON_EXPIRATION_TIME_MASK = 1 << 3;
+
+    /** Whether expiration timestamp logical part is not zero (this is 
uncommon). */
+    private static final int HAS_EXPIRATION_LOGICAL_PART_MASK = 1 << 4;
+
+    /** Whether this is not a real lease, but a hole in partitionId sequence. 
Having this flag allows us to omit partitionId. */
+    private static final int DUMMY_LEASE_MASK = 1 << 5;
+
+    // When there are no more 8 nodes in the cluster, node name index and node 
index are guaranteed to fit in 7 bits we have in a varint
+    // byte, which allows us to enable 'compact mode' to save 1 byte per lease.
+
+    /** Number of bits to fit name index/node index in to enable compact mode. 
*/
+    private static final int BIT_WIDTH_TO_FIT_IN_HALF_BYTE = 3;
+
+    /** Max size of cluster which allows compact mode. */
+    private static final int MAX_NODES_FOR_COMPACT_MODE = 1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+
+    /** Mask to extract lease holder index from compact representation. */
+    private static final int COMPACT_HOLDER_INDEX_MASK = (1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
+
+    @Override
+    protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) 
throws IOException {
+        long minExpirationTimePhysical = minExpirationTimePhysicalPart(batch);
+        HybridTimestamp commonExpirationTime = 
mostFrequentExpirationTime(batch);
+
+        out.writeVarInt(minExpirationTimePhysical);
+        out.writeVarInt(commonExpirationTime.getPhysical() - 
minExpirationTimePhysical);
+        out.writeVarInt(commonExpirationTime.getLogical());
+
+        NodesDictionary nodesDictionary = buildNodesDictionary(batch);
+        nodesDictionary.writeTo(out);
+
+        List<Lease> tableLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
TablePartitionId)
+                .collect(toList());
+        List<Lease> zoneLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
ZonePartitionId)
+                .collect(toList());
+        assert tableLeases.size() + zoneLeases.size() == batch.leases().size() 
: "There are " + batch.leases().size()
+                + " leases in total, "
+                + tableLeases.size() + " of them are table leases, " + 
zoneLeases.size() + " are zone leases, but "
+                + (batch.leases().size() - tableLeases.size() - 
zoneLeases.size()) + " are neither";
+
+        writePartitionedGroupLeases(tableLeases, minExpirationTimePhysical, 
commonExpirationTime, nodesDictionary, out);
+
+        assert zoneLeases.isEmpty() : "There are zone leases which are not 
supported yet";
+    }
+
+    private static long minExpirationTimePhysicalPart(LeaseBatch batch) {
+        long min = HybridTimestamp.MAX_VALUE.getPhysical();
+
+        for (Lease lease : batch.leases()) {
+            min = Math.min(min, lease.getExpirationTime().getPhysical());
+        }
+
+        return min;
+    }
+
+    private static HybridTimestamp mostFrequentExpirationTime(LeaseBatch 
batch) {
+        if (batch.leases().isEmpty()) {
+            return HybridTimestamp.MIN_VALUE;
+        }
+
+        Object2IntMap<HybridTimestamp> counts = new Object2IntOpenHashMap<>();
+
+        for (Lease lease : batch.leases()) {
+            counts.mergeInt(lease.getExpirationTime(), 1, Integer::sum);
+        }
+
+        HybridTimestamp commonExpirationTime = HybridTimestamp.MIN_VALUE;
+        int maxCount = -1;
+        for (Object2IntMap.Entry<HybridTimestamp> entry : 
counts.object2IntEntrySet()) {
+            if (entry.getIntValue() > maxCount) {
+                commonExpirationTime = entry.getKey();
+                maxCount = entry.getIntValue();
+            }
+        }
+
+        return commonExpirationTime;
+    }
+
+    private static NodesDictionary buildNodesDictionary(LeaseBatch batch) {
+        NodesDictionary nodesDictionary = new NodesDictionary();
+
+        for (Lease lease : batch.leases()) {
+            if (lease.getLeaseholderId() != null) {
+                assert lease.getLeaseholder() != null : lease;
+                nodesDictionary.putNode(lease.getLeaseholderId(), 
lease.getLeaseholder());
+            }
+            if (lease.proposedCandidate() != null) {
+                //noinspection DataFlowIssue
+                nodesDictionary.putName(lease.proposedCandidate());
+            }
+        }
+
+        return nodesDictionary;
+    }
+
+    private static void writePartitionedGroupLeases(
+            List<Lease> leases,
+            long minExpirationTimePhysical,
+            HybridTimestamp commonExpirationTime,
+            NodesDictionary nodesDictionary,
+            IgniteDataOutput out
+    ) throws IOException {
+        Map<Integer, List<Lease>> leasesByObjectId = leases.stream()
+                .collect(
+                        groupingBy(
+                                lease -> 
partitionedGroupIdFrom(lease).objectId(),
+                                TreeMap::new,
+                                toList()
+                        )
+                );
+
+        out.writeVarInt(leasesByObjectId.size());
+
+        int objectIdBase = 0;
+        for (Entry<Integer, List<Lease>> entry : leasesByObjectId.entrySet()) {
+            int objectId = entry.getKey();
+            List<Lease> objectLeases = entry.getValue();
+
+            objectIdBase = writeLeasesForObject(
+                    objectId,
+                    objectLeases,
+                    minExpirationTimePhysical,
+                    commonExpirationTime,
+                    nodesDictionary,
+                    out,
+                    objectIdBase
+            );
+        }
+    }
+
+    private static PartitionGroupId partitionedGroupIdFrom(Lease lease) {
+        return (PartitionGroupId) lease.replicationGroupId();
+    }
+
+    private static int writeLeasesForObject(
+            int objectId,
+            List<Lease> objectLeases,
+            long minExpirationTimePhysical,
+            HybridTimestamp commonExpirationTime,
+            NodesDictionary nodesDictionary,
+            IgniteDataOutput out,
+            int objectIdBase
+    ) throws IOException {
+        
objectLeases.sort(comparing(LeaseBatchSerializer::partitionedGroupIdFrom, 
comparing(PartitionGroupId::partitionId)));
+
+        out.writeVarInt(objectId - objectIdBase);
+
+        int partitionCount = 
partitionedGroupIdFrom(objectLeases.get(objectLeases.size() - 1)).partitionId() 
+ 1;
+        out.writeVarInt(partitionCount);
+
+        int partitionId = 0;
+        for (Lease lease : objectLeases) {
+            partitionId = writeLease(lease, partitionId, 
minExpirationTimePhysical, commonExpirationTime, nodesDictionary, out);
+        }
+
+        return objectId;
+    }
+
+    private static int writeLease(
+            Lease lease,
+            int partitionId,
+            long minExpirationTimePhysical,
+            HybridTimestamp commonExpirationTime,
+            NodesDictionary nodesDictionary,
+            IgniteDataOutput out
+    ) throws IOException {
+        PartitionGroupId groupId = partitionedGroupIdFrom(lease);
+
+        while (partitionId < groupId.partitionId()) {
+            // It's a hole in partitionId sequence, let's write a 'dummy 
lease'.
+            out.write(DUMMY_LEASE_MASK);
+            partitionId++;
+        }
+
+        assert partitionId == groupId.partitionId() : "Duplicate partitionId 
in " + lease;
+
+        assert lease.getLeaseholder() != null && lease.getLeaseholderId() != 
null : lease + " doesn't have a leaseholder";
+        assert lease.getStartTime() != HybridTimestamp.MIN_VALUE : lease + " 
has illegal start time";
+        assert lease.getExpirationTime() != HybridTimestamp.MIN_VALUE : lease 
+ " has illegal expiration time";
+
+        UUID leaseHolderId = lease.getLeaseholderId();
+        String proposedCandidate = lease.proposedCandidate();
+        boolean hasProposedCandidate = proposedCandidate != null;
+
+        boolean hasUncommonExpirationTime = 
!Objects.equals(lease.getExpirationTime(), commonExpirationTime);
+        boolean hasExpirationLogicalPart = 
lease.getExpirationTime().getLogical() != 0;
+
+        out.write(flags(
+                lease.isAccepted(),
+                lease.isProlongable(),
+                hasProposedCandidate,
+                hasUncommonExpirationTime,
+                hasExpirationLogicalPart
+        ));
+
+        if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+            int nodesInfo = packNodesInfo(
+                    nodesDictionary.getNodeIndex(leaseHolderId),
+                    hasProposedCandidate ? 
nodesDictionary.getNameIndex(proposedCandidate) : 0
+            );
+            out.writeVarInt(nodesInfo);
+        } else {
+            out.writeVarInt(nodesDictionary.getNodeIndex(leaseHolderId));
+            if (hasProposedCandidate) {
+                
out.writeVarInt(nodesDictionary.getNameIndex(proposedCandidate));
+            }
+        }
+
+        if (hasUncommonExpirationTime) {
+            out.writeVarInt(lease.getExpirationTime().getPhysical() - 
minExpirationTimePhysical);
+            if (hasExpirationLogicalPart) {
+                out.writeVarInt(lease.getExpirationTime().getLogical());
+            }
+        }
+
+        long periodIn = lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical();
+        out.writeVarInt(periodIn);
+        out.writeVarInt(lease.getStartTime().getLogical());
+
+        return partitionId + 1;
+    }
+
+    private static int packNodesInfo(int holderNodeIndex, int 
proposedCandidateNameIndex) {
+        assert holderNodeIndex < MAX_NODES_FOR_COMPACT_MODE : holderNodeIndex;
+        assert proposedCandidateNameIndex < MAX_NODES_FOR_COMPACT_MODE : 
proposedCandidateNameIndex;
+
+        return holderNodeIndex | (proposedCandidateNameIndex << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE);
+    }
+
+    private static boolean 
holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) {
+        // Up to 8 names means that for name index it's enough to have 3 bits, 
same for node index, so, in sum, they
+        // require up to 6 bits, and we have 7 bits in a varint byte.
+        return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+    }
+
+    private static int flags(
+            boolean accepted,
+            boolean prolongable,
+            boolean hasProposedCandidate,
+            boolean hasUncommonExpirationTime,
+            boolean hasExpirationLogicalPart
+    ) {
+        return (accepted ? ACCEPTED_MASK : 0)
+                | (prolongable ? PROLONGABLE_MASK : 0)
+                | (hasProposedCandidate ? HAS_PROPOSED_CANDIDATE_MASK : 0)
+                | (hasUncommonExpirationTime ? 
HAS_UNCOMMON_EXPIRATION_TIME_MASK : 0)
+                | (hasExpirationLogicalPart ? HAS_EXPIRATION_LOGICAL_PART_MASK 
: 0);
+    }
+
+    @Override
+    protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        long minExpirationTimePhysical = in.readVarInt();
+        HybridTimestamp commonExpirationTime = new 
HybridTimestamp(minExpirationTimePhysical + in.readVarInt(), 
in.readVarIntAsInt());
+        NodesDictionary nodesDictionary = NodesDictionary.readFrom(in);
+
+        List<Lease> leases = new ArrayList<>();
+
+        readPartitionedGroupLeases(
+                minExpirationTimePhysical,
+                commonExpirationTime,
+                nodesDictionary,
+                leases,
+                in,
+                TablePartitionId::new
+        );
+
+        return new LeaseBatch(leases);
+    }
+
+    private static void readPartitionedGroupLeases(
+            long minExpirationTimePhysical,
+            HybridTimestamp commonExpirationTime,
+            NodesDictionary nodesDictionary,
+            List<Lease> leases,
+            IgniteDataInput in,
+            GroupIdFactory groupIdFactory
+    ) throws IOException {
+        int objectCount = in.readVarIntAsInt();
+
+        int objectIdBase = 0;
+        for (int i = 0; i < objectCount; i++) {
+            objectIdBase = readLeasesForObject(
+                    minExpirationTimePhysical,
+                    commonExpirationTime,
+                    nodesDictionary,
+                    leases,
+                    in,
+                    groupIdFactory,
+                    objectIdBase
+            );
+        }
+    }
+
+    private static int readLeasesForObject(
+            long minExpirationTimePhysical,
+            HybridTimestamp commonExpirationTime,
+            NodesDictionary nodesDictionary,
+            List<Lease> leases,
+            IgniteDataInput in,
+            GroupIdFactory groupIdFactory,
+            int objectIdBase
+    ) throws IOException {
+        int objectId = objectIdBase + in.readVarIntAsInt();
+
+        int partitionCount = in.readVarIntAsInt();
+        for (int partitionId = 0; partitionId < partitionCount; partitionId++) 
{
+            Lease lease = readLeaseForPartition(
+                    partitionId,
+                    objectId,
+                    minExpirationTimePhysical,
+                    commonExpirationTime,
+                    nodesDictionary,
+                    in,
+                    groupIdFactory
+            );
+            if (lease != null) {
+                leases.add(lease);
+            }
+        }
+
+        return objectId;
+    }
+
+    private static @Nullable Lease readLeaseForPartition(
+            int partitionId,
+            int objectId,
+            long minExpirationTimePhysical,
+            HybridTimestamp commonExpirationTime,
+            NodesDictionary nodesDictionary,
+            IgniteDataInput in,
+            GroupIdFactory groupIdFactory
+    ) throws IOException {
+        int flags = in.read();
+        if (flagSet(flags, DUMMY_LEASE_MASK)) {
+            // This represents a hole, just skip it.
+            return null;
+        }
+
+        boolean hasProposedCandidate = flagSet(flags, 
HAS_PROPOSED_CANDIDATE_MASK);
+
+        int holderNodeIndex;
+        int proposedCandidateNodeIndex = -1;
+        if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+            int nodesInfo = in.readVarIntAsInt();
+
+            holderNodeIndex = unpackHolderNodeIndex(nodesInfo);
+
+            if (hasProposedCandidate) {
+                proposedCandidateNodeIndex = 
unpackProposedCandidateNameIndex(nodesInfo);
+            }
+        } else {
+            holderNodeIndex = in.readVarIntAsInt();
+
+            if (hasProposedCandidate) {
+                proposedCandidateNodeIndex = in.readVarIntAsInt();
+            }
+        }
+
+        UUID leaseHolderId = nodesDictionary.getNodeId(holderNodeIndex);
+        String leaseHolder = nodesDictionary.getNodeName(holderNodeIndex);
+        String proposedCandidate = null;
+        if (hasProposedCandidate) {
+            proposedCandidate = 
nodesDictionary.getName(proposedCandidateNodeIndex);
+        }
+
+        HybridTimestamp expirationTime;
+        if (flagSet(flags, HAS_UNCOMMON_EXPIRATION_TIME_MASK)) {
+            long expirationPhysical = minExpirationTimePhysical + 
in.readVarInt();
+            int expirationLogical = flagSet(flags, 
HAS_EXPIRATION_LOGICAL_PART_MASK) ? in.readVarIntAsInt() : 0;
+            expirationTime = new HybridTimestamp(expirationPhysical, 
expirationLogical);
+        } else {
+            expirationTime = commonExpirationTime;
+        }
+
+        long period = in.readVarInt();
+        int startLogical = in.readVarIntAsInt();
+
+        HybridTimestamp startTime = new 
HybridTimestamp(expirationTime.getPhysical() - period, startLogical);
+
+        return new Lease(
+                leaseHolder,
+                leaseHolderId,
+                startTime,
+                expirationTime,
+                flagSet(flags, PROLONGABLE_MASK),
+                flagSet(flags, ACCEPTED_MASK),
+                proposedCandidate,
+                groupIdFactory.create(objectId, partitionId)
+        );
+    }
+
+    private static int unpackHolderNodeIndex(int nodesInfo) {
+        return nodesInfo & COMPACT_HOLDER_INDEX_MASK;
+    }
+
+    private static int unpackProposedCandidateNameIndex(int nodesInfo) {
+        return nodesInfo >> BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+    }
+
+    private static boolean flagSet(int flags, int mask) {
+        return (flags & mask) != 0;
+    }
+
+    @FunctionalInterface
+    private interface GroupIdFactory {
+        PartitionGroupId create(int objectId, int partitionId);
+    }
+}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
new file mode 100644
index 0000000000..967a0a4fa2
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * Represents a dictionary of nodes mentioned in a {@link LeaseBatch}.
+ *
+ * <p>Has two parts: table of node names and table of nodes (these are node ID 
+ node name index pairs).
+ */
+final class NodesDictionary {
+    private final List<String> nameIndexToName = new ArrayList<>();
+    private final Object2IntMap<String> nameToNameIndex = new 
Object2IntOpenHashMap<>();
+
+    private final List<UUID> nodeIndexToId = new ArrayList<>();
+    private final Object2IntMap<UUID> idToNodeIndex = new 
Object2IntOpenHashMap<>();
+    private final Object2IntMap<UUID> idToNameIndex = new 
Object2IntOpenHashMap<>();
+
+    NodesDictionary() {
+        nameToNameIndex.defaultReturnValue(-1);
+        idToNodeIndex.defaultReturnValue(-1);
+        idToNameIndex.defaultReturnValue(-1);
+    }
+
+    int putNode(UUID id, String name) {
+        if (idToNodeIndex.containsKey(id)) {
+            return idToNodeIndex.getInt(id);
+        } else {
+            int nameIndex = putName(name);
+
+            int nodeIndex = idToNodeIndex.size();
+            nodeIndexToId.add(id);
+            idToNodeIndex.put(id, nodeIndex);
+            idToNameIndex.put(id, nameIndex);
+
+            return nodeIndex;
+        }
+    }
+
+    int putName(String name) {
+        int nameIndex;
+        if (nameToNameIndex.containsKey(name)) {
+            nameIndex = nameToNameIndex.getInt(name);
+        } else {
+            nameIndex = nameIndexToName.size();
+            nameIndexToName.add(name);
+            nameToNameIndex.put(name, nameIndex);
+        }
+
+        return nameIndex;
+    }
+
+    String getName(int nameIndex) {
+        return nameIndexToName.get(nameIndex);
+    }
+
+    UUID getNodeId(int nodeIndex) {
+        return nodeIndexToId.get(nodeIndex);
+    }
+
+    String getNodeName(int nodeIndex) {
+        UUID id = getNodeId(nodeIndex);
+        int nameIndex = idToNameIndex.getInt(id);
+        return getName(nameIndex);
+    }
+
+    int getNameIndex(String name) {
+        return nameToNameIndex.getInt(name);
+    }
+
+    int getNodeIndex(UUID leaseHolderId) {
+        return idToNodeIndex.getInt(leaseHolderId);
+    }
+
+    void writeTo(IgniteDataOutput out) throws IOException {
+        out.writeVarInt(nameIndexToName.size());
+        for (String name : nameIndexToName) {
+            out.writeUTF(name);
+        }
+
+        out.writeVarInt(nodeIndexToId.size());
+        for (UUID id : nodeIndexToId) {
+            out.writeUuid(id);
+            out.writeVarInt(idToNameIndex.getInt(id));
+        }
+    }
+
+    static NodesDictionary readFrom(IgniteDataInput in) throws IOException {
+        NodesDictionary dict = new NodesDictionary();
+
+        int namesCount = in.readVarIntAsInt();
+        for (int i = 0; i < namesCount; i++) {
+            dict.putName(in.readUTF());
+        }
+
+        int nodesCount = in.readVarIntAsInt();
+        for (int i = 0; i < nodesCount; i++) {
+            UUID id = in.readUuid();
+            int nameIndex = in.readVarIntAsInt();
+
+            String name = dict.nameIndexToName.get(nameIndex);
+            dict.putNode(id, name);
+        }
+
+        return dict;
+    }
+
+    int nameCount() {
+        return nameIndexToName.size();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        NodesDictionary that = (NodesDictionary) o;
+        return nameIndexToName.equals(that.nameIndexToName)
+                && nameToNameIndex.equals(that.nameToNameIndex)
+                && nodeIndexToId.equals(that.nodeIndexToId)
+                && idToNodeIndex.equals(that.idToNodeIndex)
+                && idToNameIndex.equals(that.idToNameIndex);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = nameIndexToName.hashCode();
+        result = 31 * result + nameToNameIndex.hashCode();
+        result = 31 * result + nodeIndexToId.hashCode();
+        result = 31 * result + idToNodeIndex.hashCode();
+        result = 31 * result + idToNameIndex.hashCode();
+        return result;
+    }
+}
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
new file mode 100644
index 0000000000..9081022792
--- /dev/null
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Collections.emptyList;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneOffset;
+import java.util.Base64;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class LeaseBatchSerializerTest {
+    private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L);
+    private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L, 
0x1234567890ABCDEFL);
+
+    private static final long STANDARD_LEASE_DURATION_MS = 5000;
+
+    private static final String SERIALIZED_WITH_V1 = 
"Ae++Q4mPyJLMMQEBAwZub2RlMQZub2RlMgPvzauQeFY0EiFDZYcJutz+ASFDZYcJutz+782rkHhWNBICA"
+            + "gIDBwmJJwEIAmWJJwE=";
+
+    private final LeaseBatchSerializer serializer = new LeaseBatchSerializer();
+
+    @Test
+    void emptyBatch() {
+        LeaseBatch originalBatch = new LeaseBatch(emptyList());
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    private void 
verifySerializationAndDeserializationGivesSameResult(LeaseBatch originalBatch) {
+        byte[] bytes = VersionedSerialization.toBytes(originalBatch, 
serializer);
+        LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredBatch.leases(), equalTo(originalBatch.leases()));
+        assertEquals(
+                
originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()),
+                
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())
+        );
+    }
+
+    @Test
+    void batchWithTablePartitionsOnly() {
+        HybridTimestamp baseTs = baseTs();
+
+        List<Lease> originalLeases = List.of(
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true, 
true, "node2", new TablePartitionId(1, 0)),
+                new Lease(
+                        "node2",
+                        NODE2_ID,
+                        baseTs.addPhysicalTime(100),
+                        expiration(baseTs.addPhysicalTime(100)),
+                        false,
+                        false,
+                        null,
+                        new TablePartitionId(1, 1)
+                ),
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true, 
true, "node2", new TablePartitionId(2, 0)),
+                new Lease(
+                        "node2",
+                        NODE2_ID,
+                        baseTs.addPhysicalTime(100),
+                        expiration(baseTs.addPhysicalTime(100)),
+                        false,
+                        false,
+                        null,
+                        new TablePartitionId(2, 1)
+                )
+        );
+
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    private static HybridTimestamp baseTs() {
+        long physicalBase = LocalDateTime.of(2024, Month.JANUARY, 1, 0, 0)
+                .atOffset(ZoneOffset.UTC)
+                .toInstant()
+                .toEpochMilli();
+        return new HybridTimestamp(physicalBase, 0);
+    }
+
+    private static HybridTimestamp expiration(HybridTimestamp startTs) {
+        return expiration(startTs, STANDARD_LEASE_DURATION_MS);
+    }
+
+    private static HybridTimestamp expiration(HybridTimestamp startTs, long 
interval) {
+        return new HybridTimestamp(startTs.getPhysical() + interval, 0);
+    }
+
+    @Test
+    void batchWithTablePartitionsOnlyWithNulls() {
+        HybridTimestamp baseTs = baseTs();
+
+        List<Lease> originalLeases = List.of(
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true, 
true, null, new TablePartitionId(1, 0))
+        );
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    @Test
+    void batchWithTablePartitionsOnlyWithUncommonPeriod() {
+        HybridTimestamp baseTs = baseTs();
+
+        List<Lease> originalLeases = List.of(
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true, 
true, null, new TablePartitionId(1, 0)),
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true, 
true, null, new TablePartitionId(1, 1)),
+                // This lease has an uncommon period (1000 instead of 5000).
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs, 1000), 
true, true, null, new TablePartitionId(1, 2))
+        );
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    @Test
+    void batchWithExpirationTimeWithLogicalPart() {
+        HybridTimestamp baseTs = baseTs();
+
+        List<Lease> originalLeases = List.of(
+                new Lease("node1", NODE1_ID, baseTs, 
expiration(baseTs).tick(), true, true, "node2", new TablePartitionId(1, 0))
+        );
+
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    @Test
+    void batchWithExactly8NodeNames() {
+        List<Lease> originalLeases = IntStream.range(0, 8)
+                .mapToObj(n -> {
+                    String nodeName = "node" + n;
+                    return tableLease(nodeName, randomUUID(), nodeName, n);
+                })
+                .collect(toList());
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    @Test
+    void batchWithMoreThan8NodeNames() {
+        List<Lease> originalLeases = IntStream.range(0, 9)
+                .mapToObj(n -> tableLease("node" + n, randomUUID(), 
"candidate" + n, n))
+                .collect(toList());
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    private static Lease tableLease(String holderName, UUID holderId, String 
proposedCandidate, int partitionId) {
+        TablePartitionId groupId = new TablePartitionId(1, partitionId);
+        return new Lease(holderName, holderId, baseTs(), expiration(baseTs()), 
true, true, proposedCandidate, groupId);
+    }
+
+    @Test
+    void batchWithHoleInTable() {
+        List<Lease> originalLeases = IntStream.of(0, 2)
+                .mapToObj(n -> {
+                    String nodeName = "node" + n;
+                    return tableLease(nodeName, randomUUID(), nodeName, n);
+                })
+                .collect(toList());
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = Base64.getDecoder().decode(SERIALIZED_WITH_V1);
+        LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredBatch.leases(), hasSize(2));
+        Iterator<Lease> iterator = restoredBatch.leases().iterator();
+
+        Lease lease1 = iterator.next();
+        assertThat(
+                lease1,
+                equalTo(new Lease("node1", NODE1_ID, baseTs(), 
expiration(baseTs()), true, true, "node2", new TablePartitionId(1, 0)))
+        );
+        assertThat(lease1.proposedCandidate(), is("node2"));
+
+        Lease lease2 = iterator.next();
+        assertThat(
+                lease2,
+                equalTo(new Lease(
+                        "node2",
+                        NODE2_ID,
+                        baseTs().addPhysicalTime(100),
+                        expiration(baseTs().addPhysicalTime(100)),
+                        false,
+                        false,
+                        null,
+                        new TablePartitionId(1, 1)
+                ))
+        );
+        assertThat(lease2.proposedCandidate(), is(nullValue()));
+    }
+
+    @SuppressWarnings("unused")
+    private String v1LeaseBatchAsBase64() {
+        HybridTimestamp baseTs = baseTs();
+
+        List<Lease> originalLeases = List.of(
+                new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true, 
true, "node2", new TablePartitionId(1, 0)),
+                new Lease(
+                        "node2",
+                        NODE2_ID,
+                        baseTs.addPhysicalTime(100),
+                        expiration(baseTs.addPhysicalTime(100)),
+                        false,
+                        false,
+                        null,
+                        new TablePartitionId(1, 1)
+                )
+        );
+
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        byte[] originalBytes = VersionedSerialization.toBytes(originalBatch, 
serializer);
+        return Base64.getEncoder().encodeToString(originalBytes);
+    }
+}
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
index b856f945ae..a9cc35fc09 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
@@ -31,32 +31,10 @@ import org.junit.jupiter.api.Test;
 
 /** Tests for lease encoding and decoding from byte arrays. */
 public class LeaseSerializationTest {
-    @Test
-    public void testLeaseSerialization() {
-        long now = System.currentTimeMillis();
-        ReplicationGroupId groupId = new TablePartitionId(1, 1);
-
-        checksSerialization(Lease.emptyLease(groupId));
-
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), true, true, null, groupId));
-
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), false, false, "node2", groupId));
-
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), false, true, "node2", groupId));
-
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), true, false, null, groupId));
-
-        checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 + 
1_000_000, 100), true, true, null, groupId));
-
-        checksSerialization(newLease("node" + new String(new byte[1000]), 
timestamp(1, 1), timestamp(2, 100), false, false, null, groupId));
-    }
-
     @Test
     public void testLeaseBatchSerialization() {
         var leases = new ArrayList<Lease>();
 
-        ReplicationGroupId groupId = new TablePartitionId(1, 1);
-
         for (int i = 0; i < 25; i++) {
             leases.add(newLease(
                     "node" + i,
@@ -65,7 +43,7 @@ public class LeaseSerializationTest {
                     i % 2 == 0,
                     i % 2 == 1,
                     i % 2 == 0 ? null : "node" + i,
-                    groupId
+                    new TablePartitionId(1, i)
             ));
         }
 
@@ -74,12 +52,8 @@ public class LeaseSerializationTest {
         assertEquals(leases, 
LeaseBatch.fromBytes(wrap(leaseBatchBytes)).leases());
     }
 
-    private static void checksSerialization(Lease lease) {
-        assertEquals(lease, Lease.fromBytes(wrap(lease.bytes())));
-    }
-
     private static Lease newLease(
-            @Nullable String leaseholder,
+            String leaseholder,
             HybridTimestamp startTime,
             HybridTimestamp expirationTime,
             boolean prolong,
@@ -89,7 +63,7 @@ public class LeaseSerializationTest {
     ) {
         return new Lease(
                 leaseholder,
-                leaseholder == null ? null : randomUUID(),
+                randomUUID(),
                 startTime,
                 expirationTime,
                 prolong,
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionaryTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionaryTest.java
new file mode 100644
index 0000000000..28c39e1a0b
--- /dev/null
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionaryTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.UUID.randomUUID;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
+import org.junit.jupiter.api.Test;
+
+class NodesDictionaryTest {
+    private final NodesDictionary dictionary = new NodesDictionary();
+
+    @Test
+    void putsNames() {
+        dictionary.putName("a");
+        dictionary.putName("b");
+
+        assertThat(dictionary.getName(0), is("a"));
+        assertThat(dictionary.getName(1), is("b"));
+    }
+
+    @Test
+    void puttingNewNameReturnsNewIndex() {
+        assertThat(dictionary.putName("a"), is(0));
+        assertThat(dictionary.putName("b"), is(1));
+    }
+
+    @Test
+    void puttingExistingNameReturnsSameIndex() {
+        dictionary.putName("a");
+
+        assertThat(dictionary.putName("a"), is(0));
+    }
+
+    @Test
+    void putsNodes() {
+        UUID id1 = randomUUID();
+        UUID id2 = randomUUID();
+        dictionary.putNode(id1, "a");
+        dictionary.putNode(id2, "b");
+
+        assertThat(dictionary.getName(0), is("a"));
+        assertThat(dictionary.getName(1), is("b"));
+        assertThat(dictionary.getNodeId(0), is(id1));
+        assertThat(dictionary.getNodeId(1), is(id2));
+        assertThat(dictionary.getNodeName(0), is("a"));
+        assertThat(dictionary.getNodeName(1), is("b"));
+    }
+
+    @Test
+    void puttingNewNodeReturnsNewIndex() {
+        assertThat(dictionary.putNode(randomUUID(), "a"), is(0));
+        assertThat(dictionary.putNode(randomUUID(), "b"), is(1));
+    }
+
+    @Test
+    void puttingExistingNodeReturnsSameIndex() {
+        UUID id = randomUUID();
+        dictionary.putNode(id, "a");
+
+        assertThat(dictionary.putNode(id, "a"), is(0));
+    }
+
+    @Test
+    void supportsNodesWithSameName() {
+        UUID id1 = randomUUID();
+        UUID id2 = randomUUID();
+        dictionary.putNode(id1, "a");
+        dictionary.putNode(id2, "a");
+
+        assertThat(dictionary.getName(0), is("a"));
+        assertThat(dictionary.getNodeId(0), is(id1));
+        assertThat(dictionary.getNodeId(1), is(id2));
+        assertThat(dictionary.getNodeName(0), is("a"));
+        assertThat(dictionary.getNodeName(1), is("a"));
+    }
+
+    @Test
+    void putNodeAddsName() {
+        dictionary.putNode(randomUUID(), "a");
+
+        assertThat(dictionary.putName("a"), is(0));
+    }
+
+    @Test
+    void namesAndNodesHaveIndependentIndexes() {
+        assertThat(dictionary.putName("a"), is(0));
+        assertThat(dictionary.putNode(randomUUID(), "b"), is(0));
+    }
+
+    @Test
+    void nameCountIsZeroInitially() {
+        assertThat(dictionary.nameCount(), is(0));
+    }
+
+    @Test
+    void nameCountIsIncreasedWhenPuttingNewName() {
+        dictionary.putName("a");
+        assertThat(dictionary.nameCount(), is(1));
+
+        dictionary.putName("b");
+        assertThat(dictionary.nameCount(), is(2));
+    }
+
+    @Test
+    void nameCountIsNotIncreasedWhenPuttingExistingName() {
+        dictionary.putName("a");
+        dictionary.putName("a");
+
+        assertThat(dictionary.nameCount(), is(1));
+    }
+
+    @Test
+    void nameCountIsIncreasedWhenPuttingNewNode() {
+        dictionary.putNode(randomUUID(), "a");
+        assertThat(dictionary.nameCount(), is(1));
+
+        dictionary.putNode(randomUUID(), "b");
+        assertThat(dictionary.nameCount(), is(2));
+    }
+
+    @Test
+    void nameCountIsNotIncreasedWhenPuttingExistingNode() {
+        UUID id = randomUUID();
+        dictionary.putNode(id, "a");
+        dictionary.putNode(id, "a");
+
+        assertThat(dictionary.nameCount(), is(1));
+    }
+
+    @Test
+    void serializationAndDeserialization() throws Exception {
+        dictionary.putName("stray-name");
+        dictionary.putNode(randomUUID(), "node1");
+        dictionary.putNode(randomUUID(), "node2");
+        dictionary.putNode(randomUUID(), "node2");
+
+        IgniteDataOutput out = new IgniteUnsafeDataOutput(100);
+        dictionary.writeTo(out);
+        byte[] bytes = out.array();
+
+        IgniteDataInput in = new IgniteUnsafeDataInput(bytes);
+        NodesDictionary restoredDict = NodesDictionary.readFrom(in);
+
+        assertThat(restoredDict, equalTo(dictionary));
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
index bc740063f1..8d159491fc 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.disaster.system;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -67,20 +65,6 @@ public class ResetClusterMessagePersistentSerializer extends 
VersionedSerializer
         }
     }
 
-    private static void writeStringSet(Set<String> strings, IgniteDataOutput 
out) throws IOException {
-        out.writeVarInt(strings.size());
-        for (String str : strings) {
-            out.writeUTF(str);
-        }
-    }
-
-    private static void writeNullableString(@Nullable String str, 
IgniteDataOutput out) throws IOException {
-        out.writeVarInt(str == null ? -1 : str.length());
-        if (str != null) {
-            out.writeByteArray(str.getBytes(UTF_8));
-        }
-    }
-
     @Override
     protected ResetClusterMessage readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
         Set<String> newCmgNodes = readStringSet(in);
@@ -106,12 +90,6 @@ public class ResetClusterMessagePersistentSerializer 
extends VersionedSerializer
                 .build();
     }
 
-    private static Set<String> readStringSet(IgniteDataInput in) throws 
IOException {
-        int size = in.readVarIntAsInt();
-
-        return readStringSet(size, in);
-    }
-
     private static Set<String> readStringSet(int size, IgniteDataInput in) 
throws IOException {
         Set<String> result = new HashSet<>(size);
         for (int i = 0; i < size; i++) {
@@ -121,15 +99,6 @@ public class ResetClusterMessagePersistentSerializer 
extends VersionedSerializer
         return result;
     }
 
-    private static @Nullable String readNullableString(IgniteDataInput in) 
throws IOException {
-        int lengthOrMinusOne = in.readVarIntAsInt();
-        if (lengthOrMinusOne == -1) {
-            return null;
-        }
-
-        return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
-    }
-
     private static List<UUID> readFormerClusterIds(IgniteDataInput in) throws 
IOException {
         int length = in.readVarIntAsInt();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
index ec53a418b0..e8a4348828 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
@@ -21,10 +21,8 @@ import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterReco
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestsSerialization.writeVarIntSet;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.apache.ignite.internal.versioned.VersionedSerializer;
@@ -42,12 +40,7 @@ class ManualGroupRestartRequestSerializer extends 
VersionedSerializer<ManualGrou
         out.writeVarInt(request.zoneId());
         out.writeVarInt(request.tableId());
         writeVarIntSet(request.partitionIds(), out);
-
-        out.writeVarInt(request.nodeNames().size());
-        for (String nodeName : request.nodeNames()) {
-            out.writeUTF(nodeName);
-        }
-
+        writeStringSet(request.nodeNames(), out);
         // Writing long and not a varlong as the latter requires 9 bytes for 
hybrid timestamps.
         out.writeLong(request.assignmentsTimestamp());
     }
@@ -63,15 +56,4 @@ class ManualGroupRestartRequestSerializer extends 
VersionedSerializer<ManualGrou
 
         return new ManualGroupRestartRequest(operationId, zoneId, tableId, 
partitionIds, nodeNames, assignmentsTimestamp);
     }
-
-    private static Set<String> readStringSet(IgniteDataInput in) throws 
IOException {
-        int size = in.readVarIntAsInt();
-
-        Set<String> result = new HashSet<>(IgniteUtils.capacity(size));
-        for (int i = 0; i < size; i++) {
-            result.add(in.readUTF());
-        }
-
-        return result;
-    }
 }

Reply via email to