This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5800d5d47d IGNITE-23469 Do not use ByteUtils#toBytes to persist system
disaster recovery messages (#4578)
5800d5d47d is described below
commit 5800d5d47dc004422bedcb8f92fd9a5d44b2ac62
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 18 17:01:12 2024 +0400
IGNITE-23469 Do not use ByteUtils#toBytes to persist system disaster
recovery messages (#4578)
---
.../raft/MetaStorageSnapshotStorageFactory.java | 6 +-
.../server/raft/MetaStorageListener.java | 6 +-
.../internal/raft/RaftGroupConfiguration.java | 2 +-
.../raft/RaftGroupConfigurationConverter.java | 9 +-
.../raft/RaftGroupConfigurationSerializer.java | 91 ++++++++++++
.../raft/RaftGroupConfigurationSerializerTest.java | 73 ++++++++++
.../system/message/ResetClusterMessage.java | 10 +-
.../ResetClusterMessagePersistentSerializer.java | 160 +++++++++++++++++++++
.../system/SystemDisasterRecoveryStorage.java | 13 +-
...esetClusterMessagePersistentSerializerTest.java | 103 +++++++++++++
.../SystemDisasterRecoveryManagerImplTest.java | 6 +-
.../raft/snapshot/PartitionAccessImpl.java | 2 +-
.../SnapshotAwarePartitionDataStorage.java | 2 +-
.../raft/PartitionCommandListenerTest.java | 1 +
.../raft/RaftGroupConfigurationConverterTest.java | 1 +
.../incoming/IncomingSnapshotCopierTest.java | 2 +-
.../SnapshotAwarePartitionDataStorageTest.java | 2 +-
.../distributed/TestPartitionDataStorage.java | 2 +-
18 files changed, 462 insertions(+), 29 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
index 9db4cc5016..8041ca30dc 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -41,6 +41,8 @@ public class MetaStorageSnapshotStorageFactory implements
SnapshotStorageFactory
/** Snapshot meta, constructed from the storage data and raft group
configuration at startup. {@code null} if the storage is empty. */
private final @Nullable RaftOutter.SnapshotMeta startupSnapshotMeta;
+ private final RaftGroupConfigurationConverter configurationConverter = new
RaftGroupConfigurationConverter();
+
/**
* Constructor. We will try to read a snapshot meta here.
*
@@ -62,7 +64,7 @@ public class MetaStorageSnapshotStorageFactory implements
SnapshotStorageFactory
byte[] configBytes = storage.getConfiguration();
assert configBytes != null;
- RaftGroupConfiguration configuration =
ByteUtils.fromBytes(configBytes);
+ RaftGroupConfiguration configuration =
configurationConverter.fromBytes(configBytes);
assert configuration != null;
return new RaftMessagesFactory().snapshotMeta()
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 40d860f1b8..cbc49a8581 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -40,13 +40,13 @@ import
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
@@ -66,6 +66,8 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
private final Consumer<CommittedConfiguration> onConfigurationCommitted;
+ private final RaftGroupConfigurationConverter configurationConverter = new
RaftGroupConfigurationConverter();
+
/**
* Constructor.
*
@@ -211,7 +213,7 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
public void onConfigurationCommitted(CommittedConfiguration config) {
RaftGroupConfiguration configuration =
RaftGroupConfiguration.fromCommittedConfiguration(config);
- storage.saveConfiguration(ByteUtils.toBytes(configuration),
config.index(), config.term());
+
storage.saveConfiguration(configurationConverter.toBytes(configuration),
config.index(), config.term());
onConfigurationCommitted.accept(config);
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
index 486a1682c2..c53f7f7bf1 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfiguration.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
- * A POJO for a RAFT group configuration, could be used to by other modules.
Not used by a RAFT module itself.
+ * A POJO for a RAFT group configuration, could be used by other modules. Not
used by the RAFT module itself.
*/
public class RaftGroupConfiguration implements Serializable {
private static final long serialVersionUID = 0;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverter.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationConverter.java
similarity index 83%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverter.java
rename to
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationConverter.java
index e0a00728d8..1dd077ef3a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverter.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationConverter.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.raft;
+package org.apache.ignite.internal.raft;
-import org.apache.ignite.internal.raft.RaftGroupConfiguration;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
/**
@@ -37,7 +36,7 @@ public class RaftGroupConfigurationConverter {
return null;
}
- return ByteUtils.fromBytes(bytes);
+ return VersionedSerialization.fromBytes(bytes,
RaftGroupConfigurationSerializer.INSTANCE);
}
/**
@@ -47,6 +46,6 @@ public class RaftGroupConfigurationConverter {
* @return Byte representation.
*/
public byte[] toBytes(RaftGroupConfiguration configuration) {
- return ByteUtils.toBytes(configuration);
+ return VersionedSerialization.toBytes(configuration,
RaftGroupConfigurationSerializer.INSTANCE);
}
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializer.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializer.java
new file mode 100644
index 0000000000..dea2ba21a5
--- /dev/null
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link RaftGroupConfiguration} instances.
+ */
+public class RaftGroupConfigurationSerializer extends
VersionedSerializer<RaftGroupConfiguration> {
+ /** Serializer instance. */
+ public static final RaftGroupConfigurationSerializer INSTANCE = new
RaftGroupConfigurationSerializer();
+
+ @Override
+ protected void writeExternalData(RaftGroupConfiguration config,
IgniteDataOutput out) throws IOException {
+ writeStringList(config.peers(), out);
+ writeStringList(config.learners(), out);
+ writeNullableStringList(config.oldPeers(), out);
+ writeNullableStringList(config.oldLearners(), out);
+ }
+
+ private static void writeStringList(List<String> strings, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(strings.size());
+ for (String str : strings) {
+ out.writeUTF(str);
+ }
+ }
+
+ private static void writeNullableStringList(@Nullable List<String>
strings, IgniteDataOutput out) throws IOException {
+ if (strings == null) {
+ out.writeVarInt(-1);
+ } else {
+ writeStringList(strings, out);
+ }
+ }
+
+ @Override
+ protected RaftGroupConfiguration readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ List<String> peers = readStringList(in);
+ List<String> learners = readStringList(in);
+ List<String> oldPeers = readNullableStringList(in);
+ List<String> oldLearners = readNullableStringList(in);
+
+ return new RaftGroupConfiguration(peers, learners, oldPeers,
oldLearners);
+ }
+
+ private static List<String> readStringList(IgniteDataInput in) throws
IOException {
+ int length = in.readVarIntAsInt();
+ return readStringList(length, in);
+ }
+
+ private static List<String> readStringList(int length, IgniteDataInput in)
throws IOException {
+ assert length >= 0 : "Invalid length: " + length;
+
+ var list = new ArrayList<String>();
+ for (int i = 0; i < length; i++) {
+ list.add(in.readUTF());
+ }
+ return list;
+ }
+
+ private static @Nullable List<String>
readNullableStringList(IgniteDataInput in) throws IOException {
+ int lengthOrMinusOne = in.readVarIntAsInt();
+ if (lengthOrMinusOne == -1) {
+ return null;
+ }
+
+ return readStringList(lengthOrMinusOne, in);
+ }
+}
diff --git
a/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializerTest.java
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializerTest.java
new file mode 100644
index 0000000000..e22ce33767
--- /dev/null
+++
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/RaftGroupConfigurationSerializerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class RaftGroupConfigurationSerializerTest {
+ private final RaftGroupConfigurationSerializer serializer = new
RaftGroupConfigurationSerializer();
+
+ @Test
+ void serializationAndDeserializationWithoutNulls() {
+ RaftGroupConfiguration originalConfig = new RaftGroupConfiguration(
+ List.of("peer1", "peer2"),
+ List.of("learner1", "learner2"),
+ List.of("old-peer1", "old-peer2"),
+ List.of("old-learner1", "old-learner2")
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalConfig,
serializer);
+ RaftGroupConfiguration restoredConfig =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredConfig, equalTo(originalConfig));
+ }
+
+ @Test
+ void serializationAndDeserializationWithNulls() {
+ RaftGroupConfiguration originalConfig = new RaftGroupConfiguration(
+ List.of("peer1", "peer2"),
+ List.of("learner1", "learner2"),
+ null,
+ null
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalConfig,
serializer);
+ RaftGroupConfiguration restoredConfig =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredConfig, equalTo(originalConfig));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwMGcGVlcjEGcGVlcjIDCWxlYXJuZXIxCWxlYXJuZXIyAwpvbGQtcGVlcjEKb2xkLXBlZXIyAw1vbGQ"
+ + "tbGVhcm5lcjENb2xkLWxlYXJuZXIy");
+ RaftGroupConfiguration restoredConfig =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredConfig.peers(), is(List.of("peer1", "peer2")));
+ assertThat(restoredConfig.learners(), is(List.of("learner1",
"learner2")));
+ assertThat(restoredConfig.oldPeers(), is(List.of("old-peer1",
"old-peer2")));
+ assertThat(restoredConfig.oldLearners(), is(List.of("old-learner1",
"old-learner2")));
+ }
+}
diff --git
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
index a7214ed551..41e829c9f6 100644
---
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
+++
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
@@ -50,17 +50,17 @@ public interface ResetClusterMessage extends
NetworkMessage, Serializable {
*/
UUID clusterId();
- /**
- * IDs that the cluster had before (including the current incarnation by
which this message is sent).
- */
- List<UUID> formerClusterIds();
-
/**
* Initial cluster configuration ({@code null} if no initial configuration
was passed on init).
*/
@Nullable
String initialClusterConfiguration();
+ /**
+ * IDs that the cluster had before (including the current incarnation by
which this message is sent).
+ */
+ List<UUID> formerClusterIds();
+
/**
* Number of nodes in the Raft voting member set for Metastorage. Only
non-null if Metastorage is to be repaired.
*/
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
new file mode 100644
index 0000000000..bc740063f1
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link ResetClusterMessage} instances.
+ */
+public class ResetClusterMessagePersistentSerializer extends
VersionedSerializer<ResetClusterMessage> {
+ private static final SystemDisasterRecoveryMessagesFactory
MESSAGES_FACTORY = new SystemDisasterRecoveryMessagesFactory();
+
+ /** Serializer instance. */
+ public static final ResetClusterMessagePersistentSerializer INSTANCE = new
ResetClusterMessagePersistentSerializer();
+
+ @Override
+ protected void writeExternalData(ResetClusterMessage message,
IgniteDataOutput out) throws IOException {
+ writeStringSet(message.newCmgNodes(), out);
+ writeStringSet(message.currentMetaStorageNodes(), out);
+ out.writeUTF(message.clusterName());
+ out.writeUuid(message.clusterId());
+ writeNullableString(message.initialClusterConfiguration(), out);
+
+ out.writeVarInt(message.formerClusterIds().size());
+ for (UUID id : message.formerClusterIds()) {
+ out.writeUuid(id);
+ }
+
+ Integer metastorageReplicationFactor =
message.metastorageReplicationFactor();
+ out.writeVarInt(metastorageReplicationFactor == null ? -1 :
metastorageReplicationFactor);
+
+ writeNullableString(message.conductor(), out);
+
+ Set<String> participatingNodes = message.participatingNodes();
+ if (participatingNodes == null) {
+ out.writeVarInt(-1);
+ } else {
+ writeStringSet(participatingNodes, out);
+ }
+ }
+
+ private static void writeStringSet(Set<String> strings, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(strings.size());
+ for (String str : strings) {
+ out.writeUTF(str);
+ }
+ }
+
+ private static void writeNullableString(@Nullable String str,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(str == null ? -1 : str.length());
+ if (str != null) {
+ out.writeByteArray(str.getBytes(UTF_8));
+ }
+ }
+
+ @Override
+ protected ResetClusterMessage readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ Set<String> newCmgNodes = readStringSet(in);
+ Set<String> currentMetaStorageNodes = readStringSet(in);
+ String clusterName = in.readUTF();
+ UUID clusterId = in.readUuid();
+ String initialClusterConfiguration = readNullableString(in);
+ List<UUID> formerClusterIds = readFormerClusterIds(in);
+ Integer metastorageReplicationFactor = readNullableInteger(in);
+ String conductor = readNullableString(in);
+ Set<String> participatingNodes = readNullableStringSet(in);
+
+ return MESSAGES_FACTORY.resetClusterMessage()
+ .newCmgNodes(newCmgNodes)
+ .currentMetaStorageNodes(currentMetaStorageNodes)
+ .clusterName(clusterName)
+ .clusterId(clusterId)
+ .initialClusterConfiguration(initialClusterConfiguration)
+ .formerClusterIds(formerClusterIds)
+ .metastorageReplicationFactor(metastorageReplicationFactor)
+ .conductor(conductor)
+ .participatingNodes(participatingNodes)
+ .build();
+ }
+
+ private static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
+ int size = in.readVarIntAsInt();
+
+ return readStringSet(size, in);
+ }
+
+ private static Set<String> readStringSet(int size, IgniteDataInput in)
throws IOException {
+ Set<String> result = new HashSet<>(size);
+ for (int i = 0; i < size; i++) {
+ result.add(in.readUTF());
+ }
+
+ return result;
+ }
+
+ private static @Nullable String readNullableString(IgniteDataInput in)
throws IOException {
+ int lengthOrMinusOne = in.readVarIntAsInt();
+ if (lengthOrMinusOne == -1) {
+ return null;
+ }
+
+ return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
+ }
+
+ private static List<UUID> readFormerClusterIds(IgniteDataInput in) throws
IOException {
+ int length = in.readVarIntAsInt();
+
+ assert length >= 0 : length;
+
+ List<UUID> result = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ result.add(in.readUuid());
+ }
+
+ return result;
+ }
+
+ private static @Nullable Integer readNullableInteger(IgniteDataInput in)
throws IOException {
+ int val = in.readVarIntAsInt();
+ return val == -1 ? null : val;
+ }
+
+ private static @Nullable Set<String> readNullableStringSet(IgniteDataInput
in) throws IOException {
+ int lengthOrMinusOne = in.readVarIntAsInt();
+
+ if (lengthOrMinusOne == -1) {
+ return null;
+ }
+
+ return readStringSet(lengthOrMinusOne, in);
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
index 066e8eb360..6aa905b4ee 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -53,7 +53,8 @@ public class SystemDisasterRecoveryStorage implements
ClusterResetStorage {
@Override
public @Nullable ResetClusterMessage readResetClusterMessage() {
- return readFromVault(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+ VaultEntry entry = vault.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
+ return entry != null ? VersionedSerialization.fromBytes(entry.value(),
ResetClusterMessagePersistentSerializer.INSTANCE) : null;
}
@Override
@@ -78,11 +79,6 @@ public class SystemDisasterRecoveryStorage implements
ClusterResetStorage {
return entry != null ? VersionedSerialization.fromBytes(entry.value(),
ClusterStatePersistentSerializer.INSTANCE) : null;
}
- private <T> @Nullable T readFromVault(ByteArray key) {
- VaultEntry entry = vault.get(key);
- return entry != null ? ByteUtils.fromBytes(entry.value()) : null;
- }
-
void saveClusterState(ClusterState clusterState) {
vault.put(CLUSTER_STATE_VAULT_KEY,
VersionedSerialization.toBytes(clusterState,
ClusterStatePersistentSerializer.INSTANCE));
}
@@ -97,7 +93,10 @@ public class SystemDisasterRecoveryStorage implements
ClusterResetStorage {
}
void saveResetClusterMessage(ResetClusterMessage message) {
- vault.put(RESET_CLUSTER_MESSAGE_VAULT_KEY, ByteUtils.toBytes(message));
+ vault.put(
+ RESET_CLUSTER_MESSAGE_VAULT_KEY,
+ VersionedSerialization.toBytes(message,
ResetClusterMessagePersistentSerializer.INSTANCE)
+ );
}
@Override
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializerTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializerTest.java
new file mode 100644
index 0000000000..4aa470cd9c
--- /dev/null
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/ResetClusterMessagePersistentSerializerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class ResetClusterMessagePersistentSerializerTest {
+ private static final SystemDisasterRecoveryMessagesFactory
MESSAGES_FACTORY = new SystemDisasterRecoveryMessagesFactory();
+
+ private final ResetClusterMessagePersistentSerializer serializer = new
ResetClusterMessagePersistentSerializer();
+
+ @Test
+ void serializationAndDeserializationWithoutNulls() {
+ ResetClusterMessage originalMessage =
MESSAGES_FACTORY.resetClusterMessage()
+ .newCmgNodes(Set.of("a", "b"))
+ .currentMetaStorageNodes(Set.of("c", "d"))
+ .clusterName("cluster")
+ .clusterId(new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L))
+ .initialClusterConfiguration("config")
+ .formerClusterIds(List.of(
+ new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)
+ ))
+ .metastorageReplicationFactor(3)
+ .conductor("a")
+ .participatingNodes(Set.of("a", "b", "c"))
+ .build();
+
+ byte[] bytes = VersionedSerialization.toBytes(originalMessage,
serializer);
+ ResetClusterMessage restoredMessage =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMessage, equalTo(originalMessage));
+ }
+
+ @Test
+ void serializationAndDeserializationWithNulls() {
+ ResetClusterMessage originalMessage =
MESSAGES_FACTORY.resetClusterMessage()
+ .newCmgNodes(Set.of("a", "b"))
+ .currentMetaStorageNodes(Set.of("c", "d"))
+ .clusterName("cluster")
+ .clusterId(new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L))
+ .initialClusterConfiguration(null)
+ .formerClusterIds(List.of(
+ new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)
+ ))
+ .metastorageReplicationFactor(null)
+ .conductor(null)
+ .participatingNodes(null)
+ .build();
+
+ byte[] bytes = VersionedSerialization.toBytes(originalMessage,
serializer);
+ ResetClusterMessage restoredMessage =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMessage, equalTo(originalMessage));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwMCYQJiAwJjAmQIY2x1c3Rlcu/Nq5B4VjQSIUNlhwm63P4HY29uZmlnAyFDZYcJutz+782rkHhWNBL"
+ + "vzauQeFY0EiFDZYcJutz+BAJhBAJhAmMCYg==");
+ ResetClusterMessage restoredMessage =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredMessage.newCmgNodes(), equalTo(Set.of("a", "b")));
+ assertThat(restoredMessage.currentMetaStorageNodes(),
equalTo(Set.of("c", "d")));
+ assertThat(restoredMessage.clusterName(), equalTo("cluster"));
+ assertThat(restoredMessage.clusterId(), equalTo(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ assertThat(restoredMessage.initialClusterConfiguration(),
is("config"));
+ assertThat(restoredMessage.formerClusterIds(), equalTo(List.of(
+ new UUID(0xFEDCBA0987654321L, 0x1234567890ABCDEFL),
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)
+ )));
+ assertThat(restoredMessage.metastorageReplicationFactor(), equalTo(3));
+ assertThat(restoredMessage.conductor(), equalTo("a"));
+ assertThat(restoredMessage.participatingNodes(), equalTo(Set.of("a",
"b", "c")));
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index d3031a3706..9f0abc84cf 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -461,7 +460,10 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
VaultEntry entry = vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
assertThat(entry, is(notNullValue()));
- ResetClusterMessage savedMessage = fromBytes(entry.value());
+ ResetClusterMessage savedMessage = VersionedSerialization.fromBytes(
+ entry.value(),
+ ResetClusterMessagePersistentSerializer.INSTANCE
+ );
assertThatResetClusterMessageContentIsAsExpected(savedMessage,
mgRepair);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 748be19ccf..8f179c6c2f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowUpgrader;
@@ -42,7 +43,6 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 2d4fb9fb7d..399b8a42fd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.storage.gc.GcEntry;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
-import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 39d7c65dd3..21df00e337 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -86,6 +86,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.WriteInte
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
index 8bc3c4e5ec..50f3d12d1d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RaftGroupConfigurationConverterTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.nullValue;
import java.util.List;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.junit.jupiter.api.Test;
class RaftGroupConfigurationConverterTest {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 041a49e8ee..5e8d5e5dda 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -78,6 +78,7 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDa
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -94,7 +95,6 @@ import
org.apache.ignite.internal.storage.impl.TestMvTableStorage;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 1f69baf4b3..e2ca24a32b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -37,10 +37,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
-import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 2c46830285..92a06663c9 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.storage.gc.GcEntry;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
-import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;