This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fb97554d0c IGNITE-23494 Do not use ByteUtils#toBytes to serialize 
assignments (#4596)
fb97554d0c is described below

commit fb97554d0c5148c7faff9c11901a65718a3a389e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Oct 22 16:55:26 2024 +0400

    IGNITE-23494 Do not use ByteUtils#toBytes to serialize assignments (#4596)
---
 modules/partition-distribution/build.gradle        |  2 +
 .../internal/partitiondistribution/Assignment.java |  5 +-
 .../partitiondistribution/Assignments.java         | 12 +--
 .../AssignmentsSerializer.java                     | 77 +++++++++++++++++++
 .../AssignmentsSerializerTest.java                 | 89 ++++++++++++++++++++++
 .../RendezvousDistributionFunctionTest.java        | 37 ---------
 .../ItDisasterRecoveryReconfigurationTest.java     |  3 +-
 7 files changed, 174 insertions(+), 51 deletions(-)

diff --git a/modules/partition-distribution/build.gradle 
b/modules/partition-distribution/build.gradle
index 50788d0cdb..092855855c 100644
--- a/modules/partition-distribution/build.gradle
+++ b/modules/partition-distribution/build.gradle
@@ -25,4 +25,6 @@ dependencies {
     implementation project(':ignite-api')
     implementation project(':ignite-core')
     implementation libs.jetbrains.annotations
+
+    testImplementation libs.hamcrest.core
 }
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
index ef82cccf82..09e8c455ac 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.partitiondistribution;
 
-import java.io.Serializable;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -27,9 +26,7 @@ import org.apache.ignite.internal.tostring.S;
  * the asynchronous members (a.k.a. "learners") of the same group. Peers get 
synchronously updated during write operations, while learners
  * are eventually consistent and received updates some time in the future.
  */
-public class Assignment implements Serializable {
-    private static final long serialVersionUID = -8892379245627437834L;
-
+public class Assignment {
     private final String consistentId;
 
     private final boolean isPeer;
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
index e60ed4cff4..4999daed50 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.partitiondistribution;
 
 import static java.util.Collections.unmodifiableSet;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,17 +28,14 @@ import java.util.Set;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.jetbrains.annotations.Contract;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Class that encapsulates a set of nodes and its metadata.
  */
-public class Assignments implements Serializable {
-    /** Serial version UID. */
-    private static final long serialVersionUID = -59553172012153869L;
-
+public class Assignments {
     /** Empty assignments. */
     public static final Assignments EMPTY =
             new Assignments(Collections.emptySet(), false, 
HybridTimestamp.NULL_HYBRID_TIMESTAMP);
@@ -140,7 +136,7 @@ public class Assignments implements Serializable {
      * Serializes the instance into an array of bytes.
      */
     public byte[] toBytes() {
-        return ByteUtils.toBytes(this);
+        return VersionedSerialization.toBytes(this, 
AssignmentsSerializer.INSTANCE);
     }
 
     /**
@@ -158,7 +154,7 @@ public class Assignments implements Serializable {
     @Nullable
     @Contract("null -> null; !null -> !null")
     public static Assignments fromBytes(byte @Nullable [] bytes) {
-        return bytes == null ? null : ByteUtils.fromBytes(bytes);
+        return bytes == null ? null : VersionedSerialization.fromBytes(bytes, 
AssignmentsSerializer.INSTANCE);
     }
 
     @Override
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java
new file mode 100644
index 0000000000..c3d2620787
--- /dev/null
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link Assignments} instances.
+ */
+public class AssignmentsSerializer extends VersionedSerializer<Assignments> {
+    /** Serializer instance. */
+    public static final AssignmentsSerializer INSTANCE = new 
AssignmentsSerializer();
+
+    @Override
+    protected void writeExternalData(Assignments assignments, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(assignments.nodes().size());
+        for (Assignment assignment : assignments.nodes()) {
+            writeAssignment(assignment, out);
+        }
+
+        out.writeBoolean(assignments.force());
+        // Writing long and not varlong as the latter will take 9 bytes for 
timestamps.
+        out.writeLong(assignments.timestamp());
+    }
+
+    private static void writeAssignment(Assignment assignment, 
IgniteDataOutput out) throws IOException {
+        out.writeUTF(assignment.consistentId());
+        out.writeBoolean(assignment.isPeer());
+    }
+
+    @Override
+    protected Assignments readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        Set<Assignment> nodes = readNodes(in);
+        boolean force = in.readBoolean();
+        long timestamp = in.readLong();
+
+        return force ? Assignments.forced(nodes, timestamp) : 
Assignments.of(nodes, timestamp);
+    }
+
+    private static Set<Assignment> readNodes(IgniteDataInput in) throws 
IOException {
+        int length = in.readVarIntAsInt();
+
+        Set<Assignment> nodes = new HashSet<>();
+        for (int i = 0; i < length; i++) {
+            nodes.add(readAssignment(in));
+        }
+
+        return nodes;
+    }
+
+    private static Assignment readAssignment(IgniteDataInput in) throws 
IOException {
+        String consistentId = in.readUTF();
+        boolean isPeer = in.readBoolean();
+
+        return isPeer ? Assignment.forPeer(consistentId) : 
Assignment.forLearner(consistentId);
+    }
+}
diff --git 
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java
 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java
new file mode 100644
index 0000000000..50692a61f9
--- /dev/null
+++ 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class AssignmentsSerializerTest {
+    private static final String NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = 
"Ae++QwMCYQECYgAA6AMAAAAAAAA=";
+    private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = 
"Ae++QwMCYQECYgAB6AMAAAAAAAA=";
+
+    private final AssignmentsSerializer serializer = new 
AssignmentsSerializer();
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void serializationAndDeserialization(boolean force) {
+        Set<Assignment> nodes = Set.of(Assignment.forPeer("abc"), 
Assignment.forLearner("def"));
+        Assignments originalAssignments = force ? Assignments.forced(nodes, 
1000L) : Assignments.of(nodes, 1000L);
+
+        byte[] bytes = VersionedSerialization.toBytes(originalAssignments, 
serializer);
+        Assignments restoredAssignments = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredAssignments, equalTo(originalAssignments));
+    }
+
+    @Test
+    void v1NotForcedCanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode(NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
+        Assignments restoredAssignments = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertNodesFromV1(restoredAssignments);
+
+        assertThat(restoredAssignments.force(), is(false));
+        assertThat(restoredAssignments.timestamp(), is(1000L));
+    }
+
+    @Test
+    void v1ForcedCanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode(FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
+        Assignments restoredAssignments = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertNodesFromV1(restoredAssignments);
+
+        assertThat(restoredAssignments.force(), is(true));
+        assertThat(restoredAssignments.timestamp(), is(1000L));
+    }
+
+    private static void assertNodesFromV1(Assignments restoredAssignments) {
+        assertThat(restoredAssignments.nodes(), hasSize(2));
+        List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
+                .sorted(comparing(Assignment::consistentId))
+                .collect(toList());
+
+        Assignment assignment1 = orderedNodes.get(0);
+        assertThat(assignment1.consistentId(), is("a"));
+        assertThat(assignment1.isPeer(), is(true));
+
+        Assignment assignment2 = orderedNodes.get(1);
+        assertThat(assignment2.consistentId(), is("b"));
+        assertThat(assignment2.isPeer(), is(false));
+    }
+}
diff --git 
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
index 6d9051dae1..447c0dd0f7 100644
--- 
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
+++ 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.partitiondistribution;
 
 import static java.util.Objects.nonNull;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -30,19 +29,12 @@ import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.network.ClusterNode;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test for Rendezvous distribution function.
  */
 public class RendezvousDistributionFunctionTest {
-    /** The logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(RendezvousDistributionFunctionTest.class);
-
     /** Distribution deviation ratio. */
     public static final double DISTRIBUTION_DEVIATION_RATIO = 0.2;
 
@@ -106,35 +98,6 @@ public class RendezvousDistributionFunctionTest {
                 .collect(Collectors.toUnmodifiableList());
     }
 
-    @Test
-    public void serializeAssignment() {
-        int nodeCount = 50;
-
-        int parts = 10_000;
-
-        int replicas = 4;
-
-        List<String> nodes = prepareNetworkTopology(nodeCount);
-
-        assertTrue(parts > nodeCount, "Partitions should be more than nodes");
-
-        List<List<String>> assignment = 
RendezvousDistributionFunction.assignPartitions(
-                nodes,
-                parts,
-                replicas,
-                false,
-                null
-        );
-
-        byte[] assignmentBytes = ByteUtils.toBytes(assignment);
-
-        LOG.info("Assignment is serialized successfully [bytes={}]", 
assignmentBytes.length);
-
-        List<List<ClusterNode>> deserializedAssignment = 
(List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentBytes);
-
-        assertEquals(assignment, deserializedAssignment);
-    }
-
     /**
      * Returns sorted and compacted string representation of given {@code 
col}. Two nearby numbers with difference at most 1 are compacted
      * to one continuous segment. E.g. collection of [1, 2, 3, 5, 6, 7, 10] 
will be compacted to [1-3, 5-7, 10].
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index f73f488c24..b3d8424877 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -89,7 +89,6 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
 import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
 import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
@@ -455,7 +454,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
                         ByteArray opKey = new 
ByteArray(toByteArray(operation.key()));
 
                         if (operation.type() == OperationType.PUT && 
opKey.equals(stablePartAssignmentsKey)) {
-                            return 
blockedAssignments.equals(ByteUtils.fromBytes(toByteArray(operation.value())));
+                            return 
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
                         }
                     }
                 }

Reply via email to