This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5800d5d47d IGNITE-23469 Do not use ByteUtils#toBytes to persist system 
disaster recovery messages (#4578)
5800d5d47d is described below

commit 5800d5d47dc004422bedcb8f92fd9a5d44b2ac62
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 18 17:01:12 2024 +0400

    IGNITE-23469 Do not use ByteUtils#toBytes to persist system disaster 
recovery messages (#4578)
---
 .../raft/MetaStorageSnapshotStorageFactory.java    |   6 +-
 .../server/raft/MetaStorageListener.java           |   6 +-
 .../internal/raft/RaftGroupConfiguration.java      |   2 +-
 .../raft/RaftGroupConfigurationConverter.java      |   9 +-
 .../raft/RaftGroupConfigurationSerializer.java     |  91 ++++++++++++
 .../raft/RaftGroupConfigurationSerializerTest.java |  73 ++++++++++
 .../system/message/ResetClusterMessage.java        |  10 +-
 .../ResetClusterMessagePersistentSerializer.java   | 160 +++++++++++++++++++++
 .../system/SystemDisasterRecoveryStorage.java      |  13 +-
 ...esetClusterMessagePersistentSerializerTest.java | 103 +++++++++++++
 .../SystemDisasterRecoveryManagerImplTest.java     |   6 +-
 .../raft/snapshot/PartitionAccessImpl.java         |   2 +-
 .../SnapshotAwarePartitionDataStorage.java         |   2 +-
 .../raft/PartitionCommandListenerTest.java         |   1 +
 .../raft/RaftGroupConfigurationConverterTest.java  |   1 +
 .../incoming/IncomingSnapshotCopierTest.java       |   2 +-
 .../SnapshotAwarePartitionDataStorageTest.java     |   2 +-
 .../distributed/TestPartitionDataStorage.java      |   2 +-
 18 files changed, 462 insertions(+), 29 deletions(-)

diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
index 9db4cc5016..8041ca30dc 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.entity.RaftOutter;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -41,6 +41,8 @@ public class MetaStorageSnapshotStorageFactory implements 
SnapshotStorageFactory
     /** Snapshot meta, constructed from the storage data and raft group 
configuration at startup. {@code null} if the storage is empty. */
     private final @Nullable RaftOutter.SnapshotMeta startupSnapshotMeta;
 
+    private final RaftGroupConfigurationConverter configurationConverter = new 
RaftGroupConfigurationConverter();
+
     /**
      * Constructor. We will try to read a snapshot meta here.
      *
@@ -62,7 +64,7 @@ public class MetaStorageSnapshotStorageFactory implements 
SnapshotStorageFactory
         byte[] configBytes = storage.getConfiguration();
         assert configBytes != null;
 
-        RaftGroupConfiguration configuration = 
ByteUtils.fromBytes(configBytes);
+        RaftGroupConfiguration configuration = 
configurationConverter.fromBytes(configBytes);
         assert configuration != null;
 
         return new RaftMessagesFactory().snapshotMeta()
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 40d860f1b8..cbc49a8581 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -40,13 +40,13 @@ import 
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
@@ -66,6 +66,8 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
 
     private final Consumer<CommittedConfiguration> onConfigurationCommitted;
 
+    private final RaftGroupConfigurationConverter configurationConverter = new 
RaftGroupConfigurationConverter();
+
     /**
      * Constructor.
      *
@@ -211,7 +213,7 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
     public void onConfigurationCommitted(CommittedConfiguration config) {
         RaftGroupConfiguration configuration = 
RaftGroupConfiguration.fromCommittedConfiguration(config);
 
-        storage.saveConfiguration(ByteUtils.toBytes(configuration), 
config.index(), config.term());
+        
storage.saveConfiguration(configurationConverter.toBytes(configuration), 
config.index(), config.term());
 
         onConfigurationCommitted.accept(config);
     }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
index 486a1682c2..c53f7f7bf1 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * A POJO for a RAFT group configuration, could be used to by other modules. 
Not used by a RAFT module itself.
+ * A POJO for a RAFT group configuration, could be used by other modules. Not 
used by the RAFT module itself.
  */
 public class RaftGroupConfiguration implements Serializable {
     private static final long serialVersionUID = 0;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverter.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationConverter.java
similarity index 83%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverter.java
rename to 
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationConverter.java
index e0a00728d8..1dd077ef3a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverter.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationConverter.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.raft;
+package org.apache.ignite.internal.raft;
 
-import org.apache.ignite.internal.raft.RaftGroupConfiguration;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -37,7 +36,7 @@ public class RaftGroupConfigurationConverter {
             return null;
         }
 
-        return ByteUtils.fromBytes(bytes);
+        return VersionedSerialization.fromBytes(bytes, 
RaftGroupConfigurationSerializer.INSTANCE);
     }
 
     /**
@@ -47,6 +46,6 @@ public class RaftGroupConfigurationConverter {
      * @return Byte representation.
      */
     public byte[] toBytes(RaftGroupConfiguration configuration) {
-        return ByteUtils.toBytes(configuration);
+        return VersionedSerialization.toBytes(configuration, 
RaftGroupConfigurationSerializer.INSTANCE);
     }
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializer.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializer.java
new file mode 100644
index 0000000000..dea2ba21a5
--- /dev/null
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link RaftGroupConfiguration} instances.
+ */
+public class RaftGroupConfigurationSerializer extends 
VersionedSerializer<RaftGroupConfiguration> {
+    /** Serializer instance. */
+    public static final RaftGroupConfigurationSerializer INSTANCE = new 
RaftGroupConfigurationSerializer();
+
+    @Override
+    protected void writeExternalData(RaftGroupConfiguration config, 
IgniteDataOutput out) throws IOException {
+        writeStringList(config.peers(), out);
+        writeStringList(config.learners(), out);
+        writeNullableStringList(config.oldPeers(), out);
+        writeNullableStringList(config.oldLearners(), out);
+    }
+
+    private static void writeStringList(List<String> strings, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(strings.size());
+        for (String str : strings) {
+            out.writeUTF(str);
+        }
+    }
+
+    private static void writeNullableStringList(@Nullable List<String> 
strings, IgniteDataOutput out) throws IOException {
+        if (strings == null) {
+            out.writeVarInt(-1);
+        } else {
+            writeStringList(strings, out);
+        }
+    }
+
+    @Override
+    protected RaftGroupConfiguration readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        List<String> peers = readStringList(in);
+        List<String> learners = readStringList(in);
+        List<String> oldPeers = readNullableStringList(in);
+        List<String> oldLearners = readNullableStringList(in);
+
+        return new RaftGroupConfiguration(peers, learners, oldPeers, 
oldLearners);
+    }
+
+    private static List<String> readStringList(IgniteDataInput in) throws 
IOException {
+        int length = in.readVarIntAsInt();
+        return readStringList(length, in);
+    }
+
+    private static List<String> readStringList(int length, IgniteDataInput in) 
throws IOException {
+        assert length >= 0 : "Invalid length: " + length;
+
+        var list = new ArrayList<String>();
+        for (int i = 0; i < length; i++) {
+            list.add(in.readUTF());
+        }
+        return list;
+    }
+
+    private static @Nullable List<String> 
readNullableStringList(IgniteDataInput in) throws IOException {
+        int lengthOrMinusOne = in.readVarIntAsInt();
+        if (lengthOrMinusOne == -1) {
+            return null;
+        }
+
+        return readStringList(lengthOrMinusOne, in);
+    }
+}
diff --git 
a/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializerTest.java
 
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializerTest.java
new file mode 100644
index 0000000000..e22ce33767
--- /dev/null
+++ 
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class RaftGroupConfigurationSerializerTest {
+    private final RaftGroupConfigurationSerializer serializer = new 
RaftGroupConfigurationSerializer();
+
+    @Test
+    void serializationAndDeserializationWithoutNulls() {
+        RaftGroupConfiguration originalConfig = new RaftGroupConfiguration(
+                List.of("peer1", "peer2"),
+                List.of("learner1", "learner2"),
+                List.of("old-peer1", "old-peer2"),
+                List.of("old-learner1", "old-learner2")
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalConfig, 
serializer);
+        RaftGroupConfiguration restoredConfig = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredConfig, equalTo(originalConfig));
+    }
+
+    @Test
+    void serializationAndDeserializationWithNulls() {
+        RaftGroupConfiguration originalConfig = new RaftGroupConfiguration(
+                List.of("peer1", "peer2"),
+                List.of("learner1", "learner2"),
+                null,
+                null
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalConfig, 
serializer);
+        RaftGroupConfiguration restoredConfig = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredConfig, equalTo(originalConfig));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwMGcGVlcjEGcGVlcjIDCWxlYXJuZXIxCWxlYXJuZXIyAwpvbGQtcGVlcjEKb2xkLXBlZXIyAw1vbGQ"
+                + "tbGVhcm5lcjENb2xkLWxlYXJuZXIy");
+        RaftGroupConfiguration restoredConfig = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredConfig.peers(), is(List.of("peer1", "peer2")));
+        assertThat(restoredConfig.learners(), is(List.of("learner1", 
"learner2")));
+        assertThat(restoredConfig.oldPeers(), is(List.of("old-peer1", 
"old-peer2")));
+        assertThat(restoredConfig.oldLearners(), is(List.of("old-learner1", 
"old-learner2")));
+    }
+}
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
index a7214ed551..41e829c9f6 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
@@ -50,17 +50,17 @@ public interface ResetClusterMessage extends 
NetworkMessage, Serializable {
      */
     UUID clusterId();
 
-    /**
-     * IDs that the cluster had before (including the current incarnation by 
which this message is sent).
-     */
-    List<UUID> formerClusterIds();
-
     /**
      * Initial cluster configuration ({@code null} if no initial configuration 
was passed on init).
      */
     @Nullable
     String initialClusterConfiguration();
 
+    /**
+     * IDs that the cluster had before (including the current incarnation by 
which this message is sent).
+     */
+    List<UUID> formerClusterIds();
+
     /**
      * Number of nodes in the Raft voting member set for Metastorage. Only 
non-null if Metastorage is to be repaired.
      */
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
new file mode 100644
index 0000000000..bc740063f1
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link ResetClusterMessage} instances.
+ */
+public class ResetClusterMessagePersistentSerializer extends 
VersionedSerializer<ResetClusterMessage> {
+    private static final SystemDisasterRecoveryMessagesFactory 
MESSAGES_FACTORY = new SystemDisasterRecoveryMessagesFactory();
+
+    /** Serializer instance. */
+    public static final ResetClusterMessagePersistentSerializer INSTANCE = new 
ResetClusterMessagePersistentSerializer();
+
+    @Override
+    protected void writeExternalData(ResetClusterMessage message, 
IgniteDataOutput out) throws IOException {
+        writeStringSet(message.newCmgNodes(), out);
+        writeStringSet(message.currentMetaStorageNodes(), out);
+        out.writeUTF(message.clusterName());
+        out.writeUuid(message.clusterId());
+        writeNullableString(message.initialClusterConfiguration(), out);
+
+        out.writeVarInt(message.formerClusterIds().size());
+        for (UUID id : message.formerClusterIds()) {
+            out.writeUuid(id);
+        }
+
+        Integer metastorageReplicationFactor = 
message.metastorageReplicationFactor();
+        out.writeVarInt(metastorageReplicationFactor == null ? -1 : 
metastorageReplicationFactor);
+
+        writeNullableString(message.conductor(), out);
+
+        Set<String> participatingNodes = message.participatingNodes();
+        if (participatingNodes == null) {
+            out.writeVarInt(-1);
+        } else {
+            writeStringSet(participatingNodes, out);
+        }
+    }
+
+    private static void writeStringSet(Set<String> strings, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(strings.size());
+        for (String str : strings) {
+            out.writeUTF(str);
+        }
+    }
+
+    private static void writeNullableString(@Nullable String str, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(str == null ? -1 : str.length());
+        if (str != null) {
+            out.writeByteArray(str.getBytes(UTF_8));
+        }
+    }
+
+    @Override
+    protected ResetClusterMessage readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        Set<String> newCmgNodes = readStringSet(in);
+        Set<String> currentMetaStorageNodes = readStringSet(in);
+        String clusterName = in.readUTF();
+        UUID clusterId = in.readUuid();
+        String initialClusterConfiguration = readNullableString(in);
+        List<UUID> formerClusterIds = readFormerClusterIds(in);
+        Integer metastorageReplicationFactor = readNullableInteger(in);
+        String conductor = readNullableString(in);
+        Set<String> participatingNodes = readNullableStringSet(in);
+
+        return MESSAGES_FACTORY.resetClusterMessage()
+                .newCmgNodes(newCmgNodes)
+                .currentMetaStorageNodes(currentMetaStorageNodes)
+                .clusterName(clusterName)
+                .clusterId(clusterId)
+                .initialClusterConfiguration(initialClusterConfiguration)
+                .formerClusterIds(formerClusterIds)
+                .metastorageReplicationFactor(metastorageReplicationFactor)
+                .conductor(conductor)
+                .participatingNodes(participatingNodes)
+                .build();
+    }
+
+    private static Set<String> readStringSet(IgniteDataInput in) throws 
IOException {
+        int size = in.readVarIntAsInt();
+
+        return readStringSet(size, in);
+    }
+
+    private static Set<String> readStringSet(int size, IgniteDataInput in) 
throws IOException {
+        Set<String> result = new HashSet<>(size);
+        for (int i = 0; i < size; i++) {
+            result.add(in.readUTF());
+        }
+
+        return result;
+    }
+
+    private static @Nullable String readNullableString(IgniteDataInput in) 
throws IOException {
+        int lengthOrMinusOne = in.readVarIntAsInt();
+        if (lengthOrMinusOne == -1) {
+            return null;
+        }
+
+        return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
+    }
+
+    private static List<UUID> readFormerClusterIds(IgniteDataInput in) throws 
IOException {
+        int length = in.readVarIntAsInt();
+
+        assert length >= 0 : length;
+
+        List<UUID> result = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            result.add(in.readUuid());
+        }
+
+        return result;
+    }
+
+    private static @Nullable Integer readNullableInteger(IgniteDataInput in) 
throws IOException {
+        int val = in.readVarIntAsInt();
+        return val == -1 ? null : val;
+    }
+
+    private static @Nullable Set<String> readNullableStringSet(IgniteDataInput 
in) throws IOException {
+        int lengthOrMinusOne = in.readVarIntAsInt();
+
+        if (lengthOrMinusOne == -1) {
+            return null;
+        }
+
+        return readStringSet(lengthOrMinusOne, in);
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
index 066e8eb360..6aa905b4ee 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -53,7 +53,8 @@ public class SystemDisasterRecoveryStorage implements 
ClusterResetStorage {
 
     @Override
     public @Nullable ResetClusterMessage readResetClusterMessage() {
-        return readFromVault(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+        VaultEntry entry = vault.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+        return entry != null ? VersionedSerialization.fromBytes(entry.value(), 
ResetClusterMessagePersistentSerializer.INSTANCE) : null;
     }
 
     @Override
@@ -78,11 +79,6 @@ public class SystemDisasterRecoveryStorage implements 
ClusterResetStorage {
         return entry != null ? VersionedSerialization.fromBytes(entry.value(), 
ClusterStatePersistentSerializer.INSTANCE) : null;
     }
 
-    private <T> @Nullable T readFromVault(ByteArray key) {
-        VaultEntry entry = vault.get(key);
-        return entry != null ? ByteUtils.fromBytes(entry.value()) : null;
-    }
-
     void saveClusterState(ClusterState clusterState) {
         vault.put(CLUSTER_STATE_VAULT_KEY, 
VersionedSerialization.toBytes(clusterState, 
ClusterStatePersistentSerializer.INSTANCE));
     }
@@ -97,7 +93,10 @@ public class SystemDisasterRecoveryStorage implements 
ClusterResetStorage {
     }
 
     void saveResetClusterMessage(ResetClusterMessage message) {
-        vault.put(RESET_CLUSTER_MESSAGE_VAULT_KEY, ByteUtils.toBytes(message));
+        vault.put(
+                RESET_CLUSTER_MESSAGE_VAULT_KEY,
+                VersionedSerialization.toBytes(message, 
ResetClusterMessagePersistentSerializer.INSTANCE)
+        );
     }
 
     @Override
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializerTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializerTest.java
new file mode 100644
index 0000000000..4aa470cd9c
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class ResetClusterMessagePersistentSerializerTest {
+    private static final SystemDisasterRecoveryMessagesFactory 
MESSAGES_FACTORY = new SystemDisasterRecoveryMessagesFactory();
+
+    private final ResetClusterMessagePersistentSerializer serializer = new 
ResetClusterMessagePersistentSerializer();
+
+    @Test
+    void serializationAndDeserializationWithoutNulls() {
+        ResetClusterMessage originalMessage = 
MESSAGES_FACTORY.resetClusterMessage()
+                .newCmgNodes(Set.of("a", "b"))
+                .currentMetaStorageNodes(Set.of("c", "d"))
+                .clusterName("cluster")
+                .clusterId(new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L))
+                .initialClusterConfiguration("config")
+                .formerClusterIds(List.of(
+                        new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+                        new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)
+                ))
+                .metastorageReplicationFactor(3)
+                .conductor("a")
+                .participatingNodes(Set.of("a", "b", "c"))
+                .build();
+
+        byte[] bytes = VersionedSerialization.toBytes(originalMessage, 
serializer);
+        ResetClusterMessage restoredMessage = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMessage, equalTo(originalMessage));
+    }
+
+    @Test
+    void serializationAndDeserializationWithNulls() {
+        ResetClusterMessage originalMessage = 
MESSAGES_FACTORY.resetClusterMessage()
+                .newCmgNodes(Set.of("a", "b"))
+                .currentMetaStorageNodes(Set.of("c", "d"))
+                .clusterName("cluster")
+                .clusterId(new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L))
+                .initialClusterConfiguration(null)
+                .formerClusterIds(List.of(
+                        new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+                        new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)
+                ))
+                .metastorageReplicationFactor(null)
+                .conductor(null)
+                .participatingNodes(null)
+                .build();
+
+        byte[] bytes = VersionedSerialization.toBytes(originalMessage, 
serializer);
+        ResetClusterMessage restoredMessage = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMessage, equalTo(originalMessage));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwMCYQJiAwJjAmQIY2x1c3Rlcu/Nq5B4VjQSIUNlhwm63P4HY29uZmlnAyFDZYcJutz+782rkHhWNBL"
+                + "vzauQeFY0EiFDZYcJutz+BAJhBAJhAmMCYg==");
+        ResetClusterMessage restoredMessage = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredMessage.newCmgNodes(), equalTo(Set.of("a", "b")));
+        assertThat(restoredMessage.currentMetaStorageNodes(), 
equalTo(Set.of("c", "d")));
+        assertThat(restoredMessage.clusterName(), equalTo("cluster"));
+        assertThat(restoredMessage.clusterId(), equalTo(new 
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+        assertThat(restoredMessage.initialClusterConfiguration(), 
is("config"));
+        assertThat(restoredMessage.formerClusterIds(), equalTo(List.of(
+                new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+                new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)
+        )));
+        assertThat(restoredMessage.metastorageReplicationFactor(), equalTo(3));
+        assertThat(restoredMessage.conductor(), equalTo("a"));
+        assertThat(restoredMessage.participatingNodes(), equalTo(Set.of("a", 
"b", "c")));
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index d3031a3706..9f0abc84cf 100644
--- 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -461,7 +460,10 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         VaultEntry entry = vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
         assertThat(entry, is(notNullValue()));
 
-        ResetClusterMessage savedMessage = fromBytes(entry.value());
+        ResetClusterMessage savedMessage = VersionedSerialization.fromBytes(
+                entry.value(),
+                ResetClusterMessagePersistentSerializer.INSTANCE
+        );
 
         assertThatResetClusterMessageContentIsAsExpected(savedMessage, 
mgRepair);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 748be19ccf..8f179c6c2f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowUpgrader;
@@ -42,7 +43,6 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 2d4fb9fb7d..399b8a42fd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
-import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 39d7c65dd3..21df00e337 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -86,6 +86,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.WriteInte
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.CommittedConfiguration;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
index 8bc3c4e5ec..50f3d12d1d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.nullValue;
 
 import java.util.List;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.junit.jupiter.api.Test;
 
 class RaftGroupConfigurationConverterTest {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 041a49e8ee..5e8d5e5dda 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -78,6 +78,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDa
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -94,7 +95,6 @@ import 
org.apache.ignite.internal.storage.impl.TestMvTableStorage;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 1f69baf4b3..e2ca24a32b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -37,10 +37,10 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
-import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.BeforeEach;
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 2c46830285..92a06663c9 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
-import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 


Reply via email to