This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9e21f50be0 IGNITE-23496 Optimize lease batch serialization (#4608)
9e21f50be0 is described below
commit 9e21f50be02f78afcf5755a43f3c2baf7f006c98
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 25 18:19:17 2024 +0400
IGNITE-23496 Optimize lease batch serialization (#4608)
---
.../ClusterStatePersistentSerializer.java | 38 --
...plicationGroupId.java => PartitionGroupId.java} | 11 +-
.../internal/replicator/ReplicationGroupId.java | 4 +-
.../internal/replicator/TablePartitionId.java | 9 +-
.../internal/versioned/VersionedSerializer.java | 40 ++
.../internal/placementdriver/ReplicaMeta.java | 3 +-
.../internal/placementdriver/leases/Lease.java | 111 -----
.../placementdriver/leases/LeaseBatch.java | 10 +-
.../leases/LeaseBatchSerializer.java | 534 +++++++++++++++++++++
.../placementdriver/leases/NodesDictionary.java | 161 +++++++
.../leases/LeaseBatchSerializerTest.java | 258 ++++++++++
.../leases/LeaseSerializationTest.java | 32 +-
.../leases/NodesDictionaryTest.java | 169 +++++++
.../ResetClusterMessagePersistentSerializer.java | 31 --
.../ManualGroupRestartRequestSerializer.java | 20 +-
15 files changed, 1188 insertions(+), 243 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
index 7bd3240751..6806207e47 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
@@ -17,13 +17,9 @@
package org.apache.ignite.internal.cluster.management;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.UUID;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.util.io.IgniteDataInput;
@@ -58,20 +54,6 @@ public class ClusterStatePersistentSerializer extends
VersionedSerializer<Cluste
}
}
- private static void writeStringSet(Set<String> strings, IgniteDataOutput
out) throws IOException {
- out.writeVarInt(strings.size());
- for (String str : strings) {
- out.writeUTF(str);
- }
- }
-
- private static void writeNullableString(@Nullable String str,
IgniteDataOutput out) throws IOException {
- out.writeVarInt(str == null ? -1 : str.length());
- if (str != null) {
- out.writeByteArray(str.getBytes(UTF_8));
- }
- }
-
@Override
protected ClusterState readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
return CMG_MSGS_FACTORY.clusterState()
@@ -84,26 +66,6 @@ public class ClusterStatePersistentSerializer extends
VersionedSerializer<Cluste
.build();
}
- private static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
- int size = in.readVarIntAsInt();
-
- Set<String> result = new HashSet<>(size);
- for (int i = 0; i < size; i++) {
- result.add(in.readUTF());
- }
-
- return result;
- }
-
- private static @Nullable String readNullableString(IgniteDataInput in)
throws IOException {
- int lengthOrMinusOne = in.readVarIntAsInt();
- if (lengthOrMinusOne == -1) {
- return null;
- }
-
- return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
- }
-
private static @Nullable List<UUID> readFormerClusterIds(IgniteDataInput
in) throws IOException {
int length = in.readVarIntAsInt();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/PartitionGroupId.java
similarity index 75%
copy from
modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/replicator/PartitionGroupId.java
index cc3fb4dcb5..666bd065ca 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/PartitionGroupId.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.replicator;
-import java.io.Serializable;
-
/**
- * The interface represents a replication group identifier.
+ * A {@link ReplicationGroupId} which corresponds to partition of a
partitioned object.
*/
-public interface ReplicationGroupId extends Serializable {
+public interface PartitionGroupId extends ReplicationGroupId {
+ /** Returns ID of the partitioned object. */
+ int objectId();
+
+ /** Returns partition ID. */
+ int partitionId();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
index cc3fb4dcb5..8348d22995 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.replicator;
-import java.io.Serializable;
-
/**
* The interface represents a replication group identifier.
*/
-public interface ReplicationGroupId extends Serializable {
+public interface ReplicationGroupId {
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
index fba5ff7768..26a2518cc3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
@@ -18,10 +18,11 @@
package org.apache.ignite.internal.replicator;
// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be
refactored to ZonePartitionId.
+
/**
* The class is used to identify a table replication group.
*/
-public class TablePartitionId implements ReplicationGroupId {
+public class TablePartitionId implements PartitionGroupId {
/** Table id. */
private final int tableId;
@@ -52,11 +53,17 @@ public class TablePartitionId implements ReplicationGroupId
{
return new TablePartitionId(Integer.parseInt(parts[0]),
Integer.parseInt(parts[1]));
}
+ @Override
+ public int objectId() {
+ return tableId;
+ }
+
/**
* Get the partition id.
*
* @return Partition id.
*/
+ @Override
public int partitionId() {
return partId;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
index a2d78c5d5d..5cf2ecf977 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
@@ -17,9 +17,15 @@
package org.apache.ignite.internal.versioned;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.jetbrains.annotations.Nullable;
/**
* Serializes and deserializes objects in a versioned way: that is, includes
version to make it possible to deserialize objects serialized
@@ -95,4 +101,38 @@ public abstract class VersionedSerializer<T> {
return readExternalData(ver, in);
}
+
+ protected static void writeNullableString(@Nullable String str,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(str == null ? -1 : str.length());
+ if (str != null) {
+ out.writeByteArray(str.getBytes(UTF_8));
+ }
+ }
+
+ protected static @Nullable String readNullableString(IgniteDataInput in)
throws IOException {
+ int lengthOrMinusOne = in.readVarIntAsInt();
+ if (lengthOrMinusOne == -1) {
+ return null;
+ }
+
+ return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
+ }
+
+ protected static void writeStringSet(Set<String> strings, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(strings.size());
+ for (String str : strings) {
+ out.writeUTF(str);
+ }
+ }
+
+ protected static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
+ int size = in.readVarIntAsInt();
+
+ Set<String> result = new HashSet<>(IgniteUtils.capacity(size));
+ for (int i = 0; i < size; i++) {
+ result.add(in.readUTF());
+ }
+
+ return result;
+ }
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
index 288da9af9f..76f9e4f557 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
@@ -17,13 +17,12 @@
package org.apache.ignite.internal.placementdriver;
-import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.jetbrains.annotations.Nullable;
/** Replica lease meta. */
-public interface ReplicaMeta extends Serializable {
+public interface ReplicaMeta {
/** Gets a leaseholder node consistent ID (assigned to a node once),
{@code null} if nothing holds the lease. */
@Nullable String getLeaseholder();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
index e22d8a435a..bad5b2f269 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -17,23 +17,15 @@
package org.apache.ignite.internal.placementdriver.leases;
-import static java.nio.ByteOrder.LITTLE_ENDIAN;
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.ByteUtils.stringFromBytes;
-import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
-import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.ByteUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -41,8 +33,6 @@ import org.jetbrains.annotations.Nullable;
* The real lease is stored in Meta storage.
*/
public class Lease implements ReplicaMeta {
- private static final long serialVersionUID = 394641185393949608L;
-
/** Node consistent ID (assigned to a node once), {@code null} if nothing
holds the lease. */
private final @Nullable String leaseholder;
@@ -199,63 +189,6 @@ public class Lease implements ReplicaMeta {
return proposedCandidate;
}
- /**
- * Encodes this lease into sequence of bytes.
- *
- * @return Lease representation in a byte array.
- */
- public byte[] bytes() {
- byte[] leaseholderBytes = stringToBytes(leaseholder);
- byte[] leaseholderIdBytes = leaseholderId == null ? null :
stringToBytes(leaseholderId.toString());
- byte[] proposedCandidateBytes = stringToBytes(proposedCandidate);
- byte[] groupIdBytes = toBytes(replicationGroupId);
-
- int bufSize = 2 // accepted + prolongable
- + HYBRID_TIMESTAMP_SIZE * 2 // startTime + expirationTime
- + bytesSizeForWrite(leaseholderBytes) +
bytesSizeForWrite(leaseholderIdBytes) +
bytesSizeForWrite(proposedCandidateBytes)
- + bytesSizeForWrite(groupIdBytes);
-
- ByteBuffer buf = ByteBuffer.allocate(bufSize).order(LITTLE_ENDIAN);
-
- putBoolean(buf, accepted);
- putBoolean(buf, prolongable);
-
- putHybridTimestamp(buf, startTime);
- putHybridTimestamp(buf, expirationTime);
-
- putBytes(buf, leaseholderBytes);
- putBytes(buf, leaseholderIdBytes);
- putBytes(buf, proposedCandidateBytes);
- putBytes(buf, groupIdBytes);
-
- return buf.array();
- }
-
- /**
- * Decodes a lease from the sequence of bytes.
- *
- * @param buf Byte buffer containing lease representation. Requires to be
in little-endian.
- * @return Decoded lease.
- */
- public static Lease fromBytes(ByteBuffer buf) {
- assert buf.order() == LITTLE_ENDIAN;
-
- boolean accepted = getBoolean(buf);
- boolean prolongable = getBoolean(buf);
-
- HybridTimestamp startTime = getHybridTimestamp(buf);
- HybridTimestamp expirationTime = getHybridTimestamp(buf);
-
- String leaseholder = stringFromBytes(getBytes(buf));
- String leaseholderIdString = stringFromBytes(getBytes(buf));
- UUID leaseholderId = leaseholderIdString == null ? null :
UUID.fromString(leaseholderIdString);
- String proposedCandidate = stringFromBytes(getBytes(buf));
-
- ReplicationGroupId groupId = ByteUtils.fromBytes(getBytes(buf));
-
- return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, prolongable, accepted, proposedCandidate, groupId);
- }
-
/**
* Returns a lease that no one holds and is always expired.
*
@@ -289,48 +222,4 @@ public class Lease implements ReplicaMeta {
public int hashCode() {
return Objects.hash(leaseholder, leaseholderId, accepted, startTime,
expirationTime, prolongable, replicationGroupId);
}
-
- private static int bytesSizeForWrite(byte @Nullable [] bytes) {
- return Integer.BYTES + (bytes == null ? 0 : bytes.length);
- }
-
- private static void putBoolean(ByteBuffer buffer, boolean b) {
- buffer.put((byte) (b ? 1 : 0));
- }
-
- private static boolean getBoolean(ByteBuffer buffer) {
- return buffer.get() == 1;
- }
-
- private static void putHybridTimestamp(ByteBuffer buffer, HybridTimestamp
hybridTimestamp) {
- buffer.putLong(hybridTimestamp.longValue());
- }
-
- private static HybridTimestamp getHybridTimestamp(ByteBuffer buffer) {
- return hybridTimestamp(buffer.getLong());
- }
-
- private static void putBytes(ByteBuffer buffer, byte @Nullable [] bytes) {
- buffer.putInt(bytes == null ? -1 : bytes.length);
-
- if (bytes != null) {
- buffer.put(bytes);
- }
- }
-
- private static byte @Nullable [] getBytes(ByteBuffer buffer) {
- int bytesLen = buffer.getInt();
-
- if (bytesLen < 0) {
- return null;
- } else if (bytesLen == 0) {
- return BYTE_EMPTY_ARRAY;
- }
-
- byte[] bytes = new byte[bytesLen];
-
- buffer.get(bytes);
-
- return bytes;
- }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
index 2ef68105e5..a6900a0ab1 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatch.java
@@ -17,11 +17,10 @@
package org.apache.ignite.internal.placementdriver.leases;
-import static org.apache.ignite.internal.util.IgniteUtils.bytesToList;
-import static org.apache.ignite.internal.util.IgniteUtils.collectionToBytes;
-
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
/**
* Representation of leases batch.
@@ -38,10 +37,11 @@ public class LeaseBatch {
}
public byte[] bytes() {
- return collectionToBytes(leases, Lease::bytes);
+ return VersionedSerialization.toBytes(this,
LeaseBatchSerializer.INSTANCE);
}
public static LeaseBatch fromBytes(ByteBuffer bytes) {
- return new LeaseBatch(bytesToList(bytes, Lease::fromBytes));
+ byte[] byteArray = ByteUtils.toByteArray(bytes);
+ return VersionedSerialization.fromBytes(byteArray,
LeaseBatchSerializer.INSTANCE);
}
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
new file mode 100644
index 0000000000..b0e9b76680
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link LeaseBatch} instances.
+ *
+ * <p>Format grammar:
+ * <pre>{@code
+ * <BATCH> ::= <HEADER> <TABLE_LEASES_SECTION>
+ *
+ * <HEADER> ::=
+ * <MIN_EXPIRATION_TIME_PHYSICAL_PART> (varint) // Self-explanatory
+ * <COMMON_EXPIRATION_TIME_PHYSICAL_PART_DELTA> (varint) // Delta
between physical part of common expiration time and min exp. time
+ * <COMMON_EXPIRATION_TIME_LOGICAL_PART> (varint) // Logical part of
most common expiration time
+ * <NODE_DICTIONARY>
+ *
+ * <NODE_DICTIONARY> ::=
+ * <NAME_COUNT> (varint)
+ * { <NODE_NAME> (UTF) } (nameCount times)
+ * <NODE_COUNT> (varint)
+ * {
+ * <NODE_ID> (UUID)
+ * <NODE_NAME_INDEX> (varint) // Index in the name table (defined
above)
+ * } (nodeCount times)
+ *
+ * <TABLE_LEASES_SECTION> ::=
+ * <TABLE_COUNT> (varint)
+ * { <OBJECT_LEASES> } // tableCount of OBJECT_LEASES elements
+ *
+ * <OBJECT_LEASES> ::=
+ * <OBJECT_ID_DELTA> (varint) // For first object in section, it's full
object ID; for subsequent once, it's object ID minus
+ * // previous object ID
+ * <LEASE_COUNT> (varint) // Number of leases (including holes) for
current object
+ * { <LEASE> } // leaseCount times
+ *
+ * <LEASE> ::=
+ * <FLAGS> (byte)
+ * [ // Only present if DUMMY_LEASE flag is 0
+ * <HOLDER_AND_PROPOSED_CANDIDATE>
+ * [ <EXPIRATION_TIME> ] // Only written if
HAS_UNCOMMON_EXPIRATION_TIME flag is 1
+ * <START_TIME>
+ * ]
+ *
+ * <HOLDER_AND_PROPOSED_CANDIDATE> ::=
+ * <HOLDER_AND_PROPOSED_CANDIDATE_COMPACTLY> (byte) // Only if number
of node names in the dictionary is 8 or less; lowest 3 bits
+ * // encode holder
index (in the nodes table), and next 3 bits are for proposed
+ * // candidate index
(in the names table)
+ * | <HOLDER_INDEX> (varint) [ <PROPOSED_CANDIDATE> (varint) ] //
Proposed candidate is only written if HAS_PROPOSED_CANDIDATE
+ * // flag
is 1
+ * <EXPIRATION_TIME> ::=
+ * <EXPIRATION_TIME_PHYSICAL_PART_DELTA> (varint) // Relative to
minExpirationTimePhysicalPart
+ * [ EXPIRATION_TIME_LOGICAL_PART ] (varint) // Only written if
HAS_EXPIRATION_TIME_LOGICAL flag is 1
+ *
+ * <START_TIME> ::=
+ * <PERIOD> // Difference between physical parts of expirationTime and
startTime
+ * <START_TIME_LOGICAL_PART (varint)
+ * }</pre>
+ *
+ * <p>The following optimizations are applied to minimize the amount of bytes
a batch is serialized to:</p>
+ * <ul>
+ * <li>Java Serialization is not used to serialize components (it's pretty
verbose)</li>
+ * <li>Varints are used extensively</li>
+ * <li>A dictionary of all nodes mentioned as lease holders and proposed
candidates is collected and written in the header once
+ * per batch. This is beneficial as we usually a lot more leases than
number of nodes in cluster, so we can just represent nodes
+ * and their names as indices in the dictionary (this is especially
effective given we use varints as indices are usually very small).
+ * </li>
+ * <li>Leases are grouped per table/zone ID (aka object ID), so an object
ID is only written once per table </li>
+ * </ul>
+ */
+public class LeaseBatchSerializer extends VersionedSerializer<LeaseBatch> {
+ /** Serializer instance. */
+ public static final LeaseBatchSerializer INSTANCE = new
LeaseBatchSerializer();
+
+ /** Contains {@link Lease#isAccepted()}. */
+ @SuppressWarnings("PointlessBitwiseExpression")
+ private static final int ACCEPTED_MASK = 1 << 0;
+
+ /** Contains {@link Lease#isProlongable()}. */
+ private static final int PROLONGABLE_MASK = 1 << 1;
+
+ /** Whether the lease has a non-null {@link Lease#proposedCandidate()}. */
+ private static final int HAS_PROPOSED_CANDIDATE_MASK = 1 << 2;
+
+ /** Whether expiration time differs from the most common expiration time
in the batch. */
+ private static final int HAS_UNCOMMON_EXPIRATION_TIME_MASK = 1 << 3;
+
+ /** Whether expiration timestamp logical part is not zero (this is
uncommon). */
+ private static final int HAS_EXPIRATION_LOGICAL_PART_MASK = 1 << 4;
+
+ /** Whether this is not a real lease, but a hole in partitionId sequence.
Having this flag allows us to omit partitionId. */
+ private static final int DUMMY_LEASE_MASK = 1 << 5;
+
+ // When there are no more 8 nodes in the cluster, node name index and node
index are guaranteed to fit in 7 bits we have in a varint
+ // byte, which allows us to enable 'compact mode' to save 1 byte per lease.
+
+ /** Number of bits to fit name index/node index in to enable compact mode.
*/
+ private static final int BIT_WIDTH_TO_FIT_IN_HALF_BYTE = 3;
+
+ /** Max size of cluster which allows compact mode. */
+ private static final int MAX_NODES_FOR_COMPACT_MODE = 1 <<
BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+
+ /** Mask to extract lease holder index from compact representation. */
+ private static final int COMPACT_HOLDER_INDEX_MASK = (1 <<
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
+
+ @Override
+ protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out)
throws IOException {
+ long minExpirationTimePhysical = minExpirationTimePhysicalPart(batch);
+ HybridTimestamp commonExpirationTime =
mostFrequentExpirationTime(batch);
+
+ out.writeVarInt(minExpirationTimePhysical);
+ out.writeVarInt(commonExpirationTime.getPhysical() -
minExpirationTimePhysical);
+ out.writeVarInt(commonExpirationTime.getLogical());
+
+ NodesDictionary nodesDictionary = buildNodesDictionary(batch);
+ nodesDictionary.writeTo(out);
+
+ List<Lease> tableLeases = batch.leases().stream()
+ .filter(lease -> lease.replicationGroupId() instanceof
TablePartitionId)
+ .collect(toList());
+ List<Lease> zoneLeases = batch.leases().stream()
+ .filter(lease -> lease.replicationGroupId() instanceof
ZonePartitionId)
+ .collect(toList());
+ assert tableLeases.size() + zoneLeases.size() == batch.leases().size()
: "There are " + batch.leases().size()
+ + " leases in total, "
+ + tableLeases.size() + " of them are table leases, " +
zoneLeases.size() + " are zone leases, but "
+ + (batch.leases().size() - tableLeases.size() -
zoneLeases.size()) + " are neither";
+
+ writePartitionedGroupLeases(tableLeases, minExpirationTimePhysical,
commonExpirationTime, nodesDictionary, out);
+
+ assert zoneLeases.isEmpty() : "There are zone leases which are not
supported yet";
+ }
+
+ private static long minExpirationTimePhysicalPart(LeaseBatch batch) {
+ long min = HybridTimestamp.MAX_VALUE.getPhysical();
+
+ for (Lease lease : batch.leases()) {
+ min = Math.min(min, lease.getExpirationTime().getPhysical());
+ }
+
+ return min;
+ }
+
+ private static HybridTimestamp mostFrequentExpirationTime(LeaseBatch
batch) {
+ if (batch.leases().isEmpty()) {
+ return HybridTimestamp.MIN_VALUE;
+ }
+
+ Object2IntMap<HybridTimestamp> counts = new Object2IntOpenHashMap<>();
+
+ for (Lease lease : batch.leases()) {
+ counts.mergeInt(lease.getExpirationTime(), 1, Integer::sum);
+ }
+
+ HybridTimestamp commonExpirationTime = HybridTimestamp.MIN_VALUE;
+ int maxCount = -1;
+ for (Object2IntMap.Entry<HybridTimestamp> entry :
counts.object2IntEntrySet()) {
+ if (entry.getIntValue() > maxCount) {
+ commonExpirationTime = entry.getKey();
+ maxCount = entry.getIntValue();
+ }
+ }
+
+ return commonExpirationTime;
+ }
+
+ private static NodesDictionary buildNodesDictionary(LeaseBatch batch) {
+ NodesDictionary nodesDictionary = new NodesDictionary();
+
+ for (Lease lease : batch.leases()) {
+ if (lease.getLeaseholderId() != null) {
+ assert lease.getLeaseholder() != null : lease;
+ nodesDictionary.putNode(lease.getLeaseholderId(),
lease.getLeaseholder());
+ }
+ if (lease.proposedCandidate() != null) {
+ //noinspection DataFlowIssue
+ nodesDictionary.putName(lease.proposedCandidate());
+ }
+ }
+
+ return nodesDictionary;
+ }
+
+ private static void writePartitionedGroupLeases(
+ List<Lease> leases,
+ long minExpirationTimePhysical,
+ HybridTimestamp commonExpirationTime,
+ NodesDictionary nodesDictionary,
+ IgniteDataOutput out
+ ) throws IOException {
+ Map<Integer, List<Lease>> leasesByObjectId = leases.stream()
+ .collect(
+ groupingBy(
+ lease ->
partitionedGroupIdFrom(lease).objectId(),
+ TreeMap::new,
+ toList()
+ )
+ );
+
+ out.writeVarInt(leasesByObjectId.size());
+
+ int objectIdBase = 0;
+ for (Entry<Integer, List<Lease>> entry : leasesByObjectId.entrySet()) {
+ int objectId = entry.getKey();
+ List<Lease> objectLeases = entry.getValue();
+
+ objectIdBase = writeLeasesForObject(
+ objectId,
+ objectLeases,
+ minExpirationTimePhysical,
+ commonExpirationTime,
+ nodesDictionary,
+ out,
+ objectIdBase
+ );
+ }
+ }
+
+ private static PartitionGroupId partitionedGroupIdFrom(Lease lease) {
+ return (PartitionGroupId) lease.replicationGroupId();
+ }
+
+ private static int writeLeasesForObject(
+ int objectId,
+ List<Lease> objectLeases,
+ long minExpirationTimePhysical,
+ HybridTimestamp commonExpirationTime,
+ NodesDictionary nodesDictionary,
+ IgniteDataOutput out,
+ int objectIdBase
+ ) throws IOException {
+
objectLeases.sort(comparing(LeaseBatchSerializer::partitionedGroupIdFrom,
comparing(PartitionGroupId::partitionId)));
+
+ out.writeVarInt(objectId - objectIdBase);
+
+ int partitionCount =
partitionedGroupIdFrom(objectLeases.get(objectLeases.size() - 1)).partitionId()
+ 1;
+ out.writeVarInt(partitionCount);
+
+ int partitionId = 0;
+ for (Lease lease : objectLeases) {
+ partitionId = writeLease(lease, partitionId,
minExpirationTimePhysical, commonExpirationTime, nodesDictionary, out);
+ }
+
+ return objectId;
+ }
+
+ private static int writeLease(
+ Lease lease,
+ int partitionId,
+ long minExpirationTimePhysical,
+ HybridTimestamp commonExpirationTime,
+ NodesDictionary nodesDictionary,
+ IgniteDataOutput out
+ ) throws IOException {
+ PartitionGroupId groupId = partitionedGroupIdFrom(lease);
+
+ while (partitionId < groupId.partitionId()) {
+ // It's a hole in partitionId sequence, let's write a 'dummy
lease'.
+ out.write(DUMMY_LEASE_MASK);
+ partitionId++;
+ }
+
+ assert partitionId == groupId.partitionId() : "Duplicate partitionId
in " + lease;
+
+ assert lease.getLeaseholder() != null && lease.getLeaseholderId() !=
null : lease + " doesn't have a leaseholder";
+ assert lease.getStartTime() != HybridTimestamp.MIN_VALUE : lease + "
has illegal start time";
+ assert lease.getExpirationTime() != HybridTimestamp.MIN_VALUE : lease
+ " has illegal expiration time";
+
+ UUID leaseHolderId = lease.getLeaseholderId();
+ String proposedCandidate = lease.proposedCandidate();
+ boolean hasProposedCandidate = proposedCandidate != null;
+
+ boolean hasUncommonExpirationTime =
!Objects.equals(lease.getExpirationTime(), commonExpirationTime);
+ boolean hasExpirationLogicalPart =
lease.getExpirationTime().getLogical() != 0;
+
+ out.write(flags(
+ lease.isAccepted(),
+ lease.isProlongable(),
+ hasProposedCandidate,
+ hasUncommonExpirationTime,
+ hasExpirationLogicalPart
+ ));
+
+ if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+ int nodesInfo = packNodesInfo(
+ nodesDictionary.getNodeIndex(leaseHolderId),
+ hasProposedCandidate ?
nodesDictionary.getNameIndex(proposedCandidate) : 0
+ );
+ out.writeVarInt(nodesInfo);
+ } else {
+ out.writeVarInt(nodesDictionary.getNodeIndex(leaseHolderId));
+ if (hasProposedCandidate) {
+
out.writeVarInt(nodesDictionary.getNameIndex(proposedCandidate));
+ }
+ }
+
+ if (hasUncommonExpirationTime) {
+ out.writeVarInt(lease.getExpirationTime().getPhysical() -
minExpirationTimePhysical);
+ if (hasExpirationLogicalPart) {
+ out.writeVarInt(lease.getExpirationTime().getLogical());
+ }
+ }
+
+ long periodIn = lease.getExpirationTime().getPhysical() -
lease.getStartTime().getPhysical();
+ out.writeVarInt(periodIn);
+ out.writeVarInt(lease.getStartTime().getLogical());
+
+ return partitionId + 1;
+ }
+
+ private static int packNodesInfo(int holderNodeIndex, int
proposedCandidateNameIndex) {
+ assert holderNodeIndex < MAX_NODES_FOR_COMPACT_MODE : holderNodeIndex;
+ assert proposedCandidateNameIndex < MAX_NODES_FOR_COMPACT_MODE :
proposedCandidateNameIndex;
+
+ return holderNodeIndex | (proposedCandidateNameIndex <<
BIT_WIDTH_TO_FIT_IN_HALF_BYTE);
+ }
+
+ private static boolean
holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) {
+ // Up to 8 names means that for name index it's enough to have 3 bits,
same for node index, so, in sum, they
+ // require up to 6 bits, and we have 7 bits in a varint byte.
+ return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+ }
+
+ private static int flags(
+ boolean accepted,
+ boolean prolongable,
+ boolean hasProposedCandidate,
+ boolean hasUncommonExpirationTime,
+ boolean hasExpirationLogicalPart
+ ) {
+ return (accepted ? ACCEPTED_MASK : 0)
+ | (prolongable ? PROLONGABLE_MASK : 0)
+ | (hasProposedCandidate ? HAS_PROPOSED_CANDIDATE_MASK : 0)
+ | (hasUncommonExpirationTime ?
HAS_UNCOMMON_EXPIRATION_TIME_MASK : 0)
+ | (hasExpirationLogicalPart ? HAS_EXPIRATION_LOGICAL_PART_MASK
: 0);
+ }
+
+ @Override
+ protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
+ long minExpirationTimePhysical = in.readVarInt();
+ HybridTimestamp commonExpirationTime = new
HybridTimestamp(minExpirationTimePhysical + in.readVarInt(),
in.readVarIntAsInt());
+ NodesDictionary nodesDictionary = NodesDictionary.readFrom(in);
+
+ List<Lease> leases = new ArrayList<>();
+
+ readPartitionedGroupLeases(
+ minExpirationTimePhysical,
+ commonExpirationTime,
+ nodesDictionary,
+ leases,
+ in,
+ TablePartitionId::new
+ );
+
+ return new LeaseBatch(leases);
+ }
+
+ private static void readPartitionedGroupLeases(
+ long minExpirationTimePhysical,
+ HybridTimestamp commonExpirationTime,
+ NodesDictionary nodesDictionary,
+ List<Lease> leases,
+ IgniteDataInput in,
+ GroupIdFactory groupIdFactory
+ ) throws IOException {
+ int objectCount = in.readVarIntAsInt();
+
+ int objectIdBase = 0;
+ for (int i = 0; i < objectCount; i++) {
+ objectIdBase = readLeasesForObject(
+ minExpirationTimePhysical,
+ commonExpirationTime,
+ nodesDictionary,
+ leases,
+ in,
+ groupIdFactory,
+ objectIdBase
+ );
+ }
+ }
+
+ private static int readLeasesForObject(
+ long minExpirationTimePhysical,
+ HybridTimestamp commonExpirationTime,
+ NodesDictionary nodesDictionary,
+ List<Lease> leases,
+ IgniteDataInput in,
+ GroupIdFactory groupIdFactory,
+ int objectIdBase
+ ) throws IOException {
+ int objectId = objectIdBase + in.readVarIntAsInt();
+
+ int partitionCount = in.readVarIntAsInt();
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++)
{
+ Lease lease = readLeaseForPartition(
+ partitionId,
+ objectId,
+ minExpirationTimePhysical,
+ commonExpirationTime,
+ nodesDictionary,
+ in,
+ groupIdFactory
+ );
+ if (lease != null) {
+ leases.add(lease);
+ }
+ }
+
+ return objectId;
+ }
+
+ private static @Nullable Lease readLeaseForPartition(
+ int partitionId,
+ int objectId,
+ long minExpirationTimePhysical,
+ HybridTimestamp commonExpirationTime,
+ NodesDictionary nodesDictionary,
+ IgniteDataInput in,
+ GroupIdFactory groupIdFactory
+ ) throws IOException {
+ int flags = in.read();
+ if (flagSet(flags, DUMMY_LEASE_MASK)) {
+ // This represents a hole, just skip it.
+ return null;
+ }
+
+ boolean hasProposedCandidate = flagSet(flags,
HAS_PROPOSED_CANDIDATE_MASK);
+
+ int holderNodeIndex;
+ int proposedCandidateNodeIndex = -1;
+ if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+ int nodesInfo = in.readVarIntAsInt();
+
+ holderNodeIndex = unpackHolderNodeIndex(nodesInfo);
+
+ if (hasProposedCandidate) {
+ proposedCandidateNodeIndex =
unpackProposedCandidateNameIndex(nodesInfo);
+ }
+ } else {
+ holderNodeIndex = in.readVarIntAsInt();
+
+ if (hasProposedCandidate) {
+ proposedCandidateNodeIndex = in.readVarIntAsInt();
+ }
+ }
+
+ UUID leaseHolderId = nodesDictionary.getNodeId(holderNodeIndex);
+ String leaseHolder = nodesDictionary.getNodeName(holderNodeIndex);
+ String proposedCandidate = null;
+ if (hasProposedCandidate) {
+ proposedCandidate =
nodesDictionary.getName(proposedCandidateNodeIndex);
+ }
+
+ HybridTimestamp expirationTime;
+ if (flagSet(flags, HAS_UNCOMMON_EXPIRATION_TIME_MASK)) {
+ long expirationPhysical = minExpirationTimePhysical +
in.readVarInt();
+ int expirationLogical = flagSet(flags,
HAS_EXPIRATION_LOGICAL_PART_MASK) ? in.readVarIntAsInt() : 0;
+ expirationTime = new HybridTimestamp(expirationPhysical,
expirationLogical);
+ } else {
+ expirationTime = commonExpirationTime;
+ }
+
+ long period = in.readVarInt();
+ int startLogical = in.readVarIntAsInt();
+
+ HybridTimestamp startTime = new
HybridTimestamp(expirationTime.getPhysical() - period, startLogical);
+
+ return new Lease(
+ leaseHolder,
+ leaseHolderId,
+ startTime,
+ expirationTime,
+ flagSet(flags, PROLONGABLE_MASK),
+ flagSet(flags, ACCEPTED_MASK),
+ proposedCandidate,
+ groupIdFactory.create(objectId, partitionId)
+ );
+ }
+
+ private static int unpackHolderNodeIndex(int nodesInfo) {
+ return nodesInfo & COMPACT_HOLDER_INDEX_MASK;
+ }
+
+ private static int unpackProposedCandidateNameIndex(int nodesInfo) {
+ return nodesInfo >> BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+ }
+
+ private static boolean flagSet(int flags, int mask) {
+ return (flags & mask) != 0;
+ }
+
+ @FunctionalInterface
+ private interface GroupIdFactory {
+ PartitionGroupId create(int objectId, int partitionId);
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
new file mode 100644
index 0000000000..967a0a4fa2
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * Represents a dictionary of nodes mentioned in a {@link LeaseBatch}.
+ *
+ * <p>Has two parts: table of node names and table of nodes (these are node ID
+ node name index pairs).
+ */
+final class NodesDictionary {
+ private final List<String> nameIndexToName = new ArrayList<>();
+ private final Object2IntMap<String> nameToNameIndex = new
Object2IntOpenHashMap<>();
+
+ private final List<UUID> nodeIndexToId = new ArrayList<>();
+ private final Object2IntMap<UUID> idToNodeIndex = new
Object2IntOpenHashMap<>();
+ private final Object2IntMap<UUID> idToNameIndex = new
Object2IntOpenHashMap<>();
+
+ NodesDictionary() {
+ nameToNameIndex.defaultReturnValue(-1);
+ idToNodeIndex.defaultReturnValue(-1);
+ idToNameIndex.defaultReturnValue(-1);
+ }
+
+ int putNode(UUID id, String name) {
+ if (idToNodeIndex.containsKey(id)) {
+ return idToNodeIndex.getInt(id);
+ } else {
+ int nameIndex = putName(name);
+
+ int nodeIndex = idToNodeIndex.size();
+ nodeIndexToId.add(id);
+ idToNodeIndex.put(id, nodeIndex);
+ idToNameIndex.put(id, nameIndex);
+
+ return nodeIndex;
+ }
+ }
+
+ int putName(String name) {
+ int nameIndex;
+ if (nameToNameIndex.containsKey(name)) {
+ nameIndex = nameToNameIndex.getInt(name);
+ } else {
+ nameIndex = nameIndexToName.size();
+ nameIndexToName.add(name);
+ nameToNameIndex.put(name, nameIndex);
+ }
+
+ return nameIndex;
+ }
+
+ String getName(int nameIndex) {
+ return nameIndexToName.get(nameIndex);
+ }
+
+ UUID getNodeId(int nodeIndex) {
+ return nodeIndexToId.get(nodeIndex);
+ }
+
+ String getNodeName(int nodeIndex) {
+ UUID id = getNodeId(nodeIndex);
+ int nameIndex = idToNameIndex.getInt(id);
+ return getName(nameIndex);
+ }
+
+ int getNameIndex(String name) {
+ return nameToNameIndex.getInt(name);
+ }
+
+ int getNodeIndex(UUID leaseHolderId) {
+ return idToNodeIndex.getInt(leaseHolderId);
+ }
+
+ void writeTo(IgniteDataOutput out) throws IOException {
+ out.writeVarInt(nameIndexToName.size());
+ for (String name : nameIndexToName) {
+ out.writeUTF(name);
+ }
+
+ out.writeVarInt(nodeIndexToId.size());
+ for (UUID id : nodeIndexToId) {
+ out.writeUuid(id);
+ out.writeVarInt(idToNameIndex.getInt(id));
+ }
+ }
+
+ static NodesDictionary readFrom(IgniteDataInput in) throws IOException {
+ NodesDictionary dict = new NodesDictionary();
+
+ int namesCount = in.readVarIntAsInt();
+ for (int i = 0; i < namesCount; i++) {
+ dict.putName(in.readUTF());
+ }
+
+ int nodesCount = in.readVarIntAsInt();
+ for (int i = 0; i < nodesCount; i++) {
+ UUID id = in.readUuid();
+ int nameIndex = in.readVarIntAsInt();
+
+ String name = dict.nameIndexToName.get(nameIndex);
+ dict.putNode(id, name);
+ }
+
+ return dict;
+ }
+
+ int nameCount() {
+ return nameIndexToName.size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NodesDictionary that = (NodesDictionary) o;
+ return nameIndexToName.equals(that.nameIndexToName)
+ && nameToNameIndex.equals(that.nameToNameIndex)
+ && nodeIndexToId.equals(that.nodeIndexToId)
+ && idToNodeIndex.equals(that.idToNodeIndex)
+ && idToNameIndex.equals(that.idToNameIndex);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = nameIndexToName.hashCode();
+ result = 31 * result + nameToNameIndex.hashCode();
+ result = 31 * result + nodeIndexToId.hashCode();
+ result = 31 * result + idToNodeIndex.hashCode();
+ result = 31 * result + idToNameIndex.hashCode();
+ return result;
+ }
+}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
new file mode 100644
index 0000000000..9081022792
--- /dev/null
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Collections.emptyList;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneOffset;
+import java.util.Base64;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class LeaseBatchSerializerTest {
+ private static final UUID NODE1_ID = new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L);
+ private static final UUID NODE2_ID = new UUID(0xFEDCBA0987654321L,
0x1234567890ABCDEFL);
+
+ private static final long STANDARD_LEASE_DURATION_MS = 5000;
+
+ private static final String SERIALIZED_WITH_V1 =
"Ae++Q4mPyJLMMQEBAwZub2RlMQZub2RlMgPvzauQeFY0EiFDZYcJutz+ASFDZYcJutz+782rkHhWNBICA"
+ + "gIDBwmJJwEIAmWJJwE=";
+
+ private final LeaseBatchSerializer serializer = new LeaseBatchSerializer();
+
+ @Test
+ void emptyBatch() {
+ LeaseBatch originalBatch = new LeaseBatch(emptyList());
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ private void
verifySerializationAndDeserializationGivesSameResult(LeaseBatch originalBatch) {
+ byte[] bytes = VersionedSerialization.toBytes(originalBatch,
serializer);
+ LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredBatch.leases(), equalTo(originalBatch.leases()));
+ assertEquals(
+
originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()),
+
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())
+ );
+ }
+
+ @Test
+ void batchWithTablePartitionsOnly() {
+ HybridTimestamp baseTs = baseTs();
+
+ List<Lease> originalLeases = List.of(
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true,
true, "node2", new TablePartitionId(1, 0)),
+ new Lease(
+ "node2",
+ NODE2_ID,
+ baseTs.addPhysicalTime(100),
+ expiration(baseTs.addPhysicalTime(100)),
+ false,
+ false,
+ null,
+ new TablePartitionId(1, 1)
+ ),
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true,
true, "node2", new TablePartitionId(2, 0)),
+ new Lease(
+ "node2",
+ NODE2_ID,
+ baseTs.addPhysicalTime(100),
+ expiration(baseTs.addPhysicalTime(100)),
+ false,
+ false,
+ null,
+ new TablePartitionId(2, 1)
+ )
+ );
+
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ private static HybridTimestamp baseTs() {
+ long physicalBase = LocalDateTime.of(2024, Month.JANUARY, 1, 0, 0)
+ .atOffset(ZoneOffset.UTC)
+ .toInstant()
+ .toEpochMilli();
+ return new HybridTimestamp(physicalBase, 0);
+ }
+
+ private static HybridTimestamp expiration(HybridTimestamp startTs) {
+ return expiration(startTs, STANDARD_LEASE_DURATION_MS);
+ }
+
+ private static HybridTimestamp expiration(HybridTimestamp startTs, long
interval) {
+ return new HybridTimestamp(startTs.getPhysical() + interval, 0);
+ }
+
+ @Test
+ void batchWithTablePartitionsOnlyWithNulls() {
+ HybridTimestamp baseTs = baseTs();
+
+ List<Lease> originalLeases = List.of(
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true,
true, null, new TablePartitionId(1, 0))
+ );
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ @Test
+ void batchWithTablePartitionsOnlyWithUncommonPeriod() {
+ HybridTimestamp baseTs = baseTs();
+
+ List<Lease> originalLeases = List.of(
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true,
true, null, new TablePartitionId(1, 0)),
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true,
true, null, new TablePartitionId(1, 1)),
+ // This lease has an uncommon period (1000 instead of 5000).
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs, 1000),
true, true, null, new TablePartitionId(1, 2))
+ );
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ @Test
+ void batchWithExpirationTimeWithLogicalPart() {
+ HybridTimestamp baseTs = baseTs();
+
+ List<Lease> originalLeases = List.of(
+ new Lease("node1", NODE1_ID, baseTs,
expiration(baseTs).tick(), true, true, "node2", new TablePartitionId(1, 0))
+ );
+
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ @Test
+ void batchWithExactly8NodeNames() {
+ List<Lease> originalLeases = IntStream.range(0, 8)
+ .mapToObj(n -> {
+ String nodeName = "node" + n;
+ return tableLease(nodeName, randomUUID(), nodeName, n);
+ })
+ .collect(toList());
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ @Test
+ void batchWithMoreThan8NodeNames() {
+ List<Lease> originalLeases = IntStream.range(0, 9)
+ .mapToObj(n -> tableLease("node" + n, randomUUID(),
"candidate" + n, n))
+ .collect(toList());
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ private static Lease tableLease(String holderName, UUID holderId, String
proposedCandidate, int partitionId) {
+ TablePartitionId groupId = new TablePartitionId(1, partitionId);
+ return new Lease(holderName, holderId, baseTs(), expiration(baseTs()),
true, true, proposedCandidate, groupId);
+ }
+
+ @Test
+ void batchWithHoleInTable() {
+ List<Lease> originalLeases = IntStream.of(0, 2)
+ .mapToObj(n -> {
+ String nodeName = "node" + n;
+ return tableLease(nodeName, randomUUID(), nodeName, n);
+ })
+ .collect(toList());
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes = Base64.getDecoder().decode(SERIALIZED_WITH_V1);
+ LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredBatch.leases(), hasSize(2));
+ Iterator<Lease> iterator = restoredBatch.leases().iterator();
+
+ Lease lease1 = iterator.next();
+ assertThat(
+ lease1,
+ equalTo(new Lease("node1", NODE1_ID, baseTs(),
expiration(baseTs()), true, true, "node2", new TablePartitionId(1, 0)))
+ );
+ assertThat(lease1.proposedCandidate(), is("node2"));
+
+ Lease lease2 = iterator.next();
+ assertThat(
+ lease2,
+ equalTo(new Lease(
+ "node2",
+ NODE2_ID,
+ baseTs().addPhysicalTime(100),
+ expiration(baseTs().addPhysicalTime(100)),
+ false,
+ false,
+ null,
+ new TablePartitionId(1, 1)
+ ))
+ );
+ assertThat(lease2.proposedCandidate(), is(nullValue()));
+ }
+
+ @SuppressWarnings("unused")
+ private String v1LeaseBatchAsBase64() {
+ HybridTimestamp baseTs = baseTs();
+
+ List<Lease> originalLeases = List.of(
+ new Lease("node1", NODE1_ID, baseTs, expiration(baseTs), true,
true, "node2", new TablePartitionId(1, 0)),
+ new Lease(
+ "node2",
+ NODE2_ID,
+ baseTs.addPhysicalTime(100),
+ expiration(baseTs.addPhysicalTime(100)),
+ false,
+ false,
+ null,
+ new TablePartitionId(1, 1)
+ )
+ );
+
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ byte[] originalBytes = VersionedSerialization.toBytes(originalBatch,
serializer);
+ return Base64.getEncoder().encodeToString(originalBytes);
+ }
+}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
index b856f945ae..a9cc35fc09 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
@@ -31,32 +31,10 @@ import org.junit.jupiter.api.Test;
/** Tests for lease encoding and decoding from byte arrays. */
public class LeaseSerializationTest {
- @Test
- public void testLeaseSerialization() {
- long now = System.currentTimeMillis();
- ReplicationGroupId groupId = new TablePartitionId(1, 1);
-
- checksSerialization(Lease.emptyLease(groupId));
-
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, true, null, groupId));
-
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, false, "node2", groupId));
-
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, true, "node2", groupId));
-
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, false, null, groupId));
-
- checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 +
1_000_000, 100), true, true, null, groupId));
-
- checksSerialization(newLease("node" + new String(new byte[1000]),
timestamp(1, 1), timestamp(2, 100), false, false, null, groupId));
- }
-
@Test
public void testLeaseBatchSerialization() {
var leases = new ArrayList<Lease>();
- ReplicationGroupId groupId = new TablePartitionId(1, 1);
-
for (int i = 0; i < 25; i++) {
leases.add(newLease(
"node" + i,
@@ -65,7 +43,7 @@ public class LeaseSerializationTest {
i % 2 == 0,
i % 2 == 1,
i % 2 == 0 ? null : "node" + i,
- groupId
+ new TablePartitionId(1, i)
));
}
@@ -74,12 +52,8 @@ public class LeaseSerializationTest {
assertEquals(leases,
LeaseBatch.fromBytes(wrap(leaseBatchBytes)).leases());
}
- private static void checksSerialization(Lease lease) {
- assertEquals(lease, Lease.fromBytes(wrap(lease.bytes())));
- }
-
private static Lease newLease(
- @Nullable String leaseholder,
+ String leaseholder,
HybridTimestamp startTime,
HybridTimestamp expirationTime,
boolean prolong,
@@ -89,7 +63,7 @@ public class LeaseSerializationTest {
) {
return new Lease(
leaseholder,
- leaseholder == null ? null : randomUUID(),
+ randomUUID(),
startTime,
expirationTime,
prolong,
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionaryTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionaryTest.java
new file mode 100644
index 0000000000..28c39e1a0b
--- /dev/null
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionaryTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.UUID.randomUUID;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
+import org.junit.jupiter.api.Test;
+
+class NodesDictionaryTest {
+ private final NodesDictionary dictionary = new NodesDictionary();
+
+ @Test
+ void putsNames() {
+ dictionary.putName("a");
+ dictionary.putName("b");
+
+ assertThat(dictionary.getName(0), is("a"));
+ assertThat(dictionary.getName(1), is("b"));
+ }
+
+ @Test
+ void puttingNewNameReturnsNewIndex() {
+ assertThat(dictionary.putName("a"), is(0));
+ assertThat(dictionary.putName("b"), is(1));
+ }
+
+ @Test
+ void puttingExistingNameReturnsSameIndex() {
+ dictionary.putName("a");
+
+ assertThat(dictionary.putName("a"), is(0));
+ }
+
+ @Test
+ void putsNodes() {
+ UUID id1 = randomUUID();
+ UUID id2 = randomUUID();
+ dictionary.putNode(id1, "a");
+ dictionary.putNode(id2, "b");
+
+ assertThat(dictionary.getName(0), is("a"));
+ assertThat(dictionary.getName(1), is("b"));
+ assertThat(dictionary.getNodeId(0), is(id1));
+ assertThat(dictionary.getNodeId(1), is(id2));
+ assertThat(dictionary.getNodeName(0), is("a"));
+ assertThat(dictionary.getNodeName(1), is("b"));
+ }
+
+ @Test
+ void puttingNewNodeReturnsNewIndex() {
+ assertThat(dictionary.putNode(randomUUID(), "a"), is(0));
+ assertThat(dictionary.putNode(randomUUID(), "b"), is(1));
+ }
+
+ @Test
+ void puttingExistingNodeReturnsSameIndex() {
+ UUID id = randomUUID();
+ dictionary.putNode(id, "a");
+
+ assertThat(dictionary.putNode(id, "a"), is(0));
+ }
+
+ @Test
+ void supportsNodesWithSameName() {
+ UUID id1 = randomUUID();
+ UUID id2 = randomUUID();
+ dictionary.putNode(id1, "a");
+ dictionary.putNode(id2, "a");
+
+ assertThat(dictionary.getName(0), is("a"));
+ assertThat(dictionary.getNodeId(0), is(id1));
+ assertThat(dictionary.getNodeId(1), is(id2));
+ assertThat(dictionary.getNodeName(0), is("a"));
+ assertThat(dictionary.getNodeName(1), is("a"));
+ }
+
+ @Test
+ void putNodeAddsName() {
+ dictionary.putNode(randomUUID(), "a");
+
+ assertThat(dictionary.putName("a"), is(0));
+ }
+
+ @Test
+ void namesAndNodesHaveIndependentIndexes() {
+ assertThat(dictionary.putName("a"), is(0));
+ assertThat(dictionary.putNode(randomUUID(), "b"), is(0));
+ }
+
+ @Test
+ void nameCountIsZeroInitially() {
+ assertThat(dictionary.nameCount(), is(0));
+ }
+
+ @Test
+ void nameCountIsIncreasedWhenPuttingNewName() {
+ dictionary.putName("a");
+ assertThat(dictionary.nameCount(), is(1));
+
+ dictionary.putName("b");
+ assertThat(dictionary.nameCount(), is(2));
+ }
+
+ @Test
+ void nameCountIsNotIncreasedWhenPuttingExistingName() {
+ dictionary.putName("a");
+ dictionary.putName("a");
+
+ assertThat(dictionary.nameCount(), is(1));
+ }
+
+ @Test
+ void nameCountIsIncreasedWhenPuttingNewNode() {
+ dictionary.putNode(randomUUID(), "a");
+ assertThat(dictionary.nameCount(), is(1));
+
+ dictionary.putNode(randomUUID(), "b");
+ assertThat(dictionary.nameCount(), is(2));
+ }
+
+ @Test
+ void nameCountIsNotIncreasedWhenPuttingExistingNode() {
+ UUID id = randomUUID();
+ dictionary.putNode(id, "a");
+ dictionary.putNode(id, "a");
+
+ assertThat(dictionary.nameCount(), is(1));
+ }
+
+ @Test
+ void serializationAndDeserialization() throws Exception {
+ dictionary.putName("stray-name");
+ dictionary.putNode(randomUUID(), "node1");
+ dictionary.putNode(randomUUID(), "node2");
+ dictionary.putNode(randomUUID(), "node2");
+
+ IgniteDataOutput out = new IgniteUnsafeDataOutput(100);
+ dictionary.writeTo(out);
+ byte[] bytes = out.array();
+
+ IgniteDataInput in = new IgniteUnsafeDataInput(bytes);
+ NodesDictionary restoredDict = NodesDictionary.readFrom(in);
+
+ assertThat(restoredDict, equalTo(dictionary));
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
index bc740063f1..8d159491fc 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.disaster.system;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -67,20 +65,6 @@ public class ResetClusterMessagePersistentSerializer extends
VersionedSerializer
}
}
- private static void writeStringSet(Set<String> strings, IgniteDataOutput
out) throws IOException {
- out.writeVarInt(strings.size());
- for (String str : strings) {
- out.writeUTF(str);
- }
- }
-
- private static void writeNullableString(@Nullable String str,
IgniteDataOutput out) throws IOException {
- out.writeVarInt(str == null ? -1 : str.length());
- if (str != null) {
- out.writeByteArray(str.getBytes(UTF_8));
- }
- }
-
@Override
protected ResetClusterMessage readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
Set<String> newCmgNodes = readStringSet(in);
@@ -106,12 +90,6 @@ public class ResetClusterMessagePersistentSerializer
extends VersionedSerializer
.build();
}
- private static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
- int size = in.readVarIntAsInt();
-
- return readStringSet(size, in);
- }
-
private static Set<String> readStringSet(int size, IgniteDataInput in)
throws IOException {
Set<String> result = new HashSet<>(size);
for (int i = 0; i < size; i++) {
@@ -121,15 +99,6 @@ public class ResetClusterMessagePersistentSerializer
extends VersionedSerializer
return result;
}
- private static @Nullable String readNullableString(IgniteDataInput in)
throws IOException {
- int lengthOrMinusOne = in.readVarIntAsInt();
- if (lengthOrMinusOne == -1) {
- return null;
- }
-
- return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
- }
-
private static List<UUID> readFormerClusterIds(IgniteDataInput in) throws
IOException {
int length = in.readVarIntAsInt();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
index ec53a418b0..e8a4348828 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
@@ -21,10 +21,8 @@ import static
org.apache.ignite.internal.table.distributed.disaster.DisasterReco
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestsSerialization.writeVarIntSet;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.internal.versioned.VersionedSerializer;
@@ -42,12 +40,7 @@ class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGrou
out.writeVarInt(request.zoneId());
out.writeVarInt(request.tableId());
writeVarIntSet(request.partitionIds(), out);
-
- out.writeVarInt(request.nodeNames().size());
- for (String nodeName : request.nodeNames()) {
- out.writeUTF(nodeName);
- }
-
+ writeStringSet(request.nodeNames(), out);
// Writing long and not a varlong as the latter requires 9 bytes for
hybrid timestamps.
out.writeLong(request.assignmentsTimestamp());
}
@@ -63,15 +56,4 @@ class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGrou
return new ManualGroupRestartRequest(operationId, zoneId, tableId,
partitionIds, nodeNames, assignmentsTimestamp);
}
-
- private static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
- int size = in.readVarIntAsInt();
-
- Set<String> result = new HashSet<>(IgniteUtils.capacity(size));
- for (int i = 0; i < size; i++) {
- result.add(in.readUTF());
- }
-
- return result;
- }
}