This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fb97554d0c IGNITE-23494 Do not use ByteUtils#toBytes to serialize
assignments (#4596)
fb97554d0c is described below
commit fb97554d0c5148c7faff9c11901a65718a3a389e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Oct 22 16:55:26 2024 +0400
IGNITE-23494 Do not use ByteUtils#toBytes to serialize assignments (#4596)
---
modules/partition-distribution/build.gradle | 2 +
.../internal/partitiondistribution/Assignment.java | 5 +-
.../partitiondistribution/Assignments.java | 12 +--
.../AssignmentsSerializer.java | 77 +++++++++++++++++++
.../AssignmentsSerializerTest.java | 89 ++++++++++++++++++++++
.../RendezvousDistributionFunctionTest.java | 37 ---------
.../ItDisasterRecoveryReconfigurationTest.java | 3 +-
7 files changed, 174 insertions(+), 51 deletions(-)
diff --git a/modules/partition-distribution/build.gradle
b/modules/partition-distribution/build.gradle
index 50788d0cdb..092855855c 100644
--- a/modules/partition-distribution/build.gradle
+++ b/modules/partition-distribution/build.gradle
@@ -25,4 +25,6 @@ dependencies {
implementation project(':ignite-api')
implementation project(':ignite-core')
implementation libs.jetbrains.annotations
+
+ testImplementation libs.hamcrest.core
}
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
index ef82cccf82..09e8c455ac 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignment.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.partitiondistribution;
-import java.io.Serializable;
import org.apache.ignite.internal.tostring.S;
/**
@@ -27,9 +26,7 @@ import org.apache.ignite.internal.tostring.S;
* the asynchronous members (a.k.a. "learners") of the same group. Peers get
synchronously updated during write operations, while learners
* are eventually consistent and received updates some time in the future.
*/
-public class Assignment implements Serializable {
- private static final long serialVersionUID = -8892379245627437834L;
-
+public class Assignment {
private final String consistentId;
private final boolean isPeer;
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
index e60ed4cff4..4999daed50 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.partitiondistribution;
import static java.util.Collections.unmodifiableSet;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -29,17 +28,14 @@ import java.util.Set;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.Nullable;
/**
* Class that encapsulates a set of nodes and its metadata.
*/
-public class Assignments implements Serializable {
- /** Serial version UID. */
- private static final long serialVersionUID = -59553172012153869L;
-
+public class Assignments {
/** Empty assignments. */
public static final Assignments EMPTY =
new Assignments(Collections.emptySet(), false,
HybridTimestamp.NULL_HYBRID_TIMESTAMP);
@@ -140,7 +136,7 @@ public class Assignments implements Serializable {
* Serializes the instance into an array of bytes.
*/
public byte[] toBytes() {
- return ByteUtils.toBytes(this);
+ return VersionedSerialization.toBytes(this,
AssignmentsSerializer.INSTANCE);
}
/**
@@ -158,7 +154,7 @@ public class Assignments implements Serializable {
@Nullable
@Contract("null -> null; !null -> !null")
public static Assignments fromBytes(byte @Nullable [] bytes) {
- return bytes == null ? null : ByteUtils.fromBytes(bytes);
+ return bytes == null ? null : VersionedSerialization.fromBytes(bytes,
AssignmentsSerializer.INSTANCE);
}
@Override
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java
new file mode 100644
index 0000000000..c3d2620787
--- /dev/null
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link Assignments} instances.
+ */
+public class AssignmentsSerializer extends VersionedSerializer<Assignments> {
+ /** Serializer instance. */
+ public static final AssignmentsSerializer INSTANCE = new
AssignmentsSerializer();
+
+ @Override
+ protected void writeExternalData(Assignments assignments, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(assignments.nodes().size());
+ for (Assignment assignment : assignments.nodes()) {
+ writeAssignment(assignment, out);
+ }
+
+ out.writeBoolean(assignments.force());
+ // Writing long and not varlong as the latter will take 9 bytes for
timestamps.
+ out.writeLong(assignments.timestamp());
+ }
+
+ private static void writeAssignment(Assignment assignment,
IgniteDataOutput out) throws IOException {
+ out.writeUTF(assignment.consistentId());
+ out.writeBoolean(assignment.isPeer());
+ }
+
+ @Override
+ protected Assignments readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
+ Set<Assignment> nodes = readNodes(in);
+ boolean force = in.readBoolean();
+ long timestamp = in.readLong();
+
+ return force ? Assignments.forced(nodes, timestamp) :
Assignments.of(nodes, timestamp);
+ }
+
+ private static Set<Assignment> readNodes(IgniteDataInput in) throws
IOException {
+ int length = in.readVarIntAsInt();
+
+ Set<Assignment> nodes = new HashSet<>();
+ for (int i = 0; i < length; i++) {
+ nodes.add(readAssignment(in));
+ }
+
+ return nodes;
+ }
+
+ private static Assignment readAssignment(IgniteDataInput in) throws
IOException {
+ String consistentId = in.readUTF();
+ boolean isPeer = in.readBoolean();
+
+ return isPeer ? Assignment.forPeer(consistentId) :
Assignment.forLearner(consistentId);
+ }
+}
diff --git
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java
new file mode 100644
index 0000000000..50692a61f9
--- /dev/null
+++
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class AssignmentsSerializerTest {
+ private static final String NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 =
"Ae++QwMCYQECYgAA6AMAAAAAAAA=";
+ private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 =
"Ae++QwMCYQECYgAB6AMAAAAAAAA=";
+
+ private final AssignmentsSerializer serializer = new
AssignmentsSerializer();
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void serializationAndDeserialization(boolean force) {
+ Set<Assignment> nodes = Set.of(Assignment.forPeer("abc"),
Assignment.forLearner("def"));
+ Assignments originalAssignments = force ? Assignments.forced(nodes,
1000L) : Assignments.of(nodes, 1000L);
+
+ byte[] bytes = VersionedSerialization.toBytes(originalAssignments,
serializer);
+ Assignments restoredAssignments =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredAssignments, equalTo(originalAssignments));
+ }
+
+ @Test
+ void v1NotForcedCanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode(NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
+ Assignments restoredAssignments =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertNodesFromV1(restoredAssignments);
+
+ assertThat(restoredAssignments.force(), is(false));
+ assertThat(restoredAssignments.timestamp(), is(1000L));
+ }
+
+ @Test
+ void v1ForcedCanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode(FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
+ Assignments restoredAssignments =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertNodesFromV1(restoredAssignments);
+
+ assertThat(restoredAssignments.force(), is(true));
+ assertThat(restoredAssignments.timestamp(), is(1000L));
+ }
+
+ private static void assertNodesFromV1(Assignments restoredAssignments) {
+ assertThat(restoredAssignments.nodes(), hasSize(2));
+ List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
+ .sorted(comparing(Assignment::consistentId))
+ .collect(toList());
+
+ Assignment assignment1 = orderedNodes.get(0);
+ assertThat(assignment1.consistentId(), is("a"));
+ assertThat(assignment1.isPeer(), is(true));
+
+ Assignment assignment2 = orderedNodes.get(1);
+ assertThat(assignment2.consistentId(), is("b"));
+ assertThat(assignment2.isPeer(), is(false));
+ }
+}
diff --git
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
index 6d9051dae1..447c0dd0f7 100644
---
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
+++
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.partitiondistribution;
import static java.util.Objects.nonNull;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -30,19 +29,12 @@ import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.Test;
/**
* Test for Rendezvous distribution function.
*/
public class RendezvousDistributionFunctionTest {
- /** The logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(RendezvousDistributionFunctionTest.class);
-
/** Distribution deviation ratio. */
public static final double DISTRIBUTION_DEVIATION_RATIO = 0.2;
@@ -106,35 +98,6 @@ public class RendezvousDistributionFunctionTest {
.collect(Collectors.toUnmodifiableList());
}
- @Test
- public void serializeAssignment() {
- int nodeCount = 50;
-
- int parts = 10_000;
-
- int replicas = 4;
-
- List<String> nodes = prepareNetworkTopology(nodeCount);
-
- assertTrue(parts > nodeCount, "Partitions should be more than nodes");
-
- List<List<String>> assignment =
RendezvousDistributionFunction.assignPartitions(
- nodes,
- parts,
- replicas,
- false,
- null
- );
-
- byte[] assignmentBytes = ByteUtils.toBytes(assignment);
-
- LOG.info("Assignment is serialized successfully [bytes={}]",
assignmentBytes.length);
-
- List<List<ClusterNode>> deserializedAssignment =
(List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentBytes);
-
- assertEquals(assignment, deserializedAssignment);
- }
-
/**
* Returns sorted and compacted string representation of given {@code
col}. Two nearby numbers with difference at most 1 are compacted
* to one continuous segment. E.g. collection of [1, 2, 3, 5, 6, 7, 10]
will be compacted to [1-3, 5-7, 10].
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index f73f488c24..b3d8424877 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -89,7 +89,6 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
@@ -455,7 +454,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
ByteArray opKey = new
ByteArray(toByteArray(operation.key()));
if (operation.type() == OperationType.PUT &&
opKey.equals(stablePartAssignmentsKey)) {
- return
blockedAssignments.equals(ByteUtils.fromBytes(toByteArray(operation.value())));
+ return
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
}
}
}