This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7e4873dbdb IGNITE-23470 Do not use ByteUtils#toBytes to persist
disaster recovery requests (#4582)
7e4873dbdb is described below
commit 7e4873dbdb3ac62f44ae0df09cf08794150266fe
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Oct 17 17:56:23 2024 +0400
IGNITE-23470 Do not use ByteUtils#toBytes to persist disaster recovery
requests (#4582)
---
.../disaster/DisasterRecoveryManager.java | 9 +-
.../disaster/DisasterRecoveryRequest.java | 3 +-
.../DisasterRecoveryRequestSerializer.java | 96 +++++++++++++++++++++
.../DisasterRecoveryRequestsSerialization.java | 46 ++++++++++
.../disaster/ManualGroupRestartRequest.java | 6 +-
.../ManualGroupRestartRequestSerializer.java | 77 +++++++++++++++++
.../disaster/ManualGroupUpdateRequest.java | 3 -
.../ManualGroupUpdateRequestSerializer.java | 56 ++++++++++++
.../DisasterRecoveryRequestSerializerTest.java | 99 ++++++++++++++++++++++
9 files changed, 384 insertions(+), 11 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index a637a89c14..0aa9288b64 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -60,6 +60,7 @@ import
org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -91,8 +92,8 @@ import
org.apache.ignite.internal.table.distributed.disaster.exceptions.Disaster
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -489,14 +490,14 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
ongoingOperationsById.put(operationId, operationFuture);
- metaStorageManager.put(RECOVERY_TRIGGER_KEY,
ByteUtils.toBytes(request));
+ metaStorageManager.put(RECOVERY_TRIGGER_KEY,
VersionedSerialization.toBytes(request,
DisasterRecoveryRequestSerializer.INSTANCE));
return operationFuture;
}
/**
* Handler for {@link #RECOVERY_TRIGGER_KEY} update event. Deserializes
the request and delegates the execution to
- * {@link DisasterRecoveryRequest#handle(DisasterRecoveryManager, long)}.
+ * {@link DisasterRecoveryRequest#handle(DisasterRecoveryManager, long,
HybridTimestamp)}.
*/
private void handleTriggerKeyUpdate(WatchEvent watchEvent) {
Entry newEntry = watchEvent.entryEvent().newEntry();
@@ -506,7 +507,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
DisasterRecoveryRequest request;
try {
- request = ByteUtils.fromBytes(requestBytes);
+ request = VersionedSerialization.fromBytes(requestBytes,
DisasterRecoveryRequestSerializer.INSTANCE);
} catch (Exception e) {
LOG.warn("Unable to deserialize disaster recovery request.", e);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
index 592a7f39f7..f2ac3c5540 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed.disaster;
-import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -25,7 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
* General interface for disaster recovery requests.
*/
-interface DisasterRecoveryRequest extends Serializable {
+interface DisasterRecoveryRequest {
/**
* Returns an ID of the operation, associated with request.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
new file mode 100644
index 0000000000..4a7c96c74e
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link DisasterRecoveryRequest} instances.
+ */
+class DisasterRecoveryRequestSerializer extends
VersionedSerializer<DisasterRecoveryRequest> {
+ static final DisasterRecoveryRequestSerializer INSTANCE = new
DisasterRecoveryRequestSerializer();
+
+ @Override
+ protected void writeExternalData(DisasterRecoveryRequest request,
IgniteDataOutput out) throws IOException {
+ Operation operation = Operation.findByRequest(request);
+
+ out.writeVarInt(operation.code);
+ operation.serialize(request, out);
+ }
+
+ @Override
+ protected DisasterRecoveryRequest readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ int operationCode = in.readVarIntAsInt();
+ Operation operation = Operation.findByCode(operationCode);
+
+ return operation.deserialize(in);
+ }
+
+ private enum Operation {
+ MANUAL_GROUP_UPDATE(0, ManualGroupUpdateRequestSerializer.INSTANCE),
+ MANUAL_GROUP_RESTART(1, ManualGroupRestartRequestSerializer.INSTANCE);
+
+ private static final Map<Integer, Operation> valuesByCode =
Arrays.stream(values())
+ .collect(toUnmodifiableMap(op -> op.code, identity()));
+
+ private final int code;
+ private final VersionedSerializer<DisasterRecoveryRequest> serializer;
+
+ Operation(int code, VersionedSerializer<? extends
DisasterRecoveryRequest> serializer) {
+ this.code = code;
+ this.serializer = (VersionedSerializer<DisasterRecoveryRequest>)
serializer;
+ }
+
+ static Operation findByCode(int code) {
+ Operation operation = valuesByCode.get(code);
+
+ if (operation == null) {
+ throw new IllegalArgumentException("Unknown operation code: "
+ code);
+ }
+
+ return operation;
+ }
+
+ static Operation findByRequest(DisasterRecoveryRequest request) {
+ if (request instanceof ManualGroupUpdateRequest) {
+ return MANUAL_GROUP_UPDATE;
+ }
+ if (request instanceof ManualGroupRestartRequest) {
+ return MANUAL_GROUP_RESTART;
+ }
+
+ throw new IllegalArgumentException("Unknown request type: " +
request);
+ }
+
+ void serialize(DisasterRecoveryRequest request, IgniteDataOutput out)
throws IOException {
+ serializer.writeExternal(request, out);
+ }
+
+ <T extends DisasterRecoveryRequest> T deserialize(IgniteDataInput in)
throws IOException {
+ return (T) serializer.readExternal(in);
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestsSerialization.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestsSerialization.java
new file mode 100644
index 0000000000..4e22db26e6
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestsSerialization.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+class DisasterRecoveryRequestsSerialization {
+ static void writeVarIntSet(Set<Integer> partitionIds, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(partitionIds.size());
+
+ for (int partitionId : partitionIds) {
+ out.writeVarInt(partitionId);
+ }
+ }
+
+ static Set<Integer> readVarIntSet(IgniteDataInput in) throws IOException {
+ int length = in.readVarIntAsInt();
+
+ Set<Integer> set = new HashSet<>(IgniteUtils.capacity(length));
+ for (int i = 0; i < length; i++) {
+ set.add(in.readVarIntAsInt());
+ }
+
+ return set;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index eba6226b98..1ccee4d832 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -30,8 +30,6 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tostring.S;
class ManualGroupRestartRequest implements DisasterRecoveryRequest {
- private static final long serialVersionUID = 0L;
-
private final UUID operationId;
private final int zoneId;
@@ -87,6 +85,10 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
return nodeNames;
}
+ long assignmentsTimestamp() {
+ return assignmentsTimestamp;
+ }
+
@Override
public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
if (!nodeNames.isEmpty() &&
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
new file mode 100644
index 0000000000..ec53a418b0
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestsSerialization.readVarIntSet;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestsSerialization.writeVarIntSet;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link ManualGroupRestartRequest} instances.
+ */
+class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGroupRestartRequest> {
+ /** Serializer instance. */
+ static final ManualGroupRestartRequestSerializer INSTANCE = new
ManualGroupRestartRequestSerializer();
+
+ @Override
+ protected void writeExternalData(ManualGroupRestartRequest request,
IgniteDataOutput out) throws IOException {
+ out.writeUuid(request.operationId());
+ out.writeVarInt(request.zoneId());
+ out.writeVarInt(request.tableId());
+ writeVarIntSet(request.partitionIds(), out);
+
+ out.writeVarInt(request.nodeNames().size());
+ for (String nodeName : request.nodeNames()) {
+ out.writeUTF(nodeName);
+ }
+
+ // Writing long and not a varlong as the latter requires 9 bytes for
hybrid timestamps.
+ out.writeLong(request.assignmentsTimestamp());
+ }
+
+ @Override
+ protected ManualGroupRestartRequest readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ UUID operationId = in.readUuid();
+ int zoneId = in.readVarIntAsInt();
+ int tableId = in.readVarIntAsInt();
+ Set<Integer> partitionIds = readVarIntSet(in);
+ Set<String> nodeNames = readStringSet(in);
+ long assignmentsTimestamp = in.readLong();
+
+ return new ManualGroupRestartRequest(operationId, zoneId, tableId,
partitionIds, nodeNames, assignmentsTimestamp);
+ }
+
+ private static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
+ int size = in.readVarIntAsInt();
+
+ Set<String> result = new HashSet<>(IgniteUtils.capacity(size));
+ for (int i = 0; i < size; i++) {
+ result.add(in.readUTF());
+ }
+
+ return result;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
index 7d0aeaf81b..fb05b50d16 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
@@ -74,9 +74,6 @@ import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
class ManualGroupUpdateRequest implements DisasterRecoveryRequest {
- /** Serial version UID. */
- private static final long serialVersionUID = 0L;
-
private final UUID operationId;
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
new file mode 100644
index 0000000000..ee34659b09
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestsSerialization.readVarIntSet;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestsSerialization.writeVarIntSet;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link ManualGroupUpdateRequest} instances.
+ */
+class ManualGroupUpdateRequestSerializer extends
VersionedSerializer<ManualGroupUpdateRequest> {
+ /** Serializer instance. */
+ static final ManualGroupUpdateRequestSerializer INSTANCE = new
ManualGroupUpdateRequestSerializer();
+
+ @Override
+ protected void writeExternalData(ManualGroupUpdateRequest request,
IgniteDataOutput out) throws IOException {
+ out.writeUuid(request.operationId());
+ out.writeVarInt(request.catalogVersion());
+ out.writeVarInt(request.zoneId());
+ out.writeVarInt(request.tableId());
+ writeVarIntSet(request.partitionIds(), out);
+ }
+
+ @Override
+ protected ManualGroupUpdateRequest readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ UUID operationId = in.readUuid();
+ int catalogVersion = in.readVarIntAsInt();
+ int zoneId = in.readVarIntAsInt();
+ int tableId = in.readVarIntAsInt();
+ Set<Integer> partitionIds = readVarIntSet(in);
+
+ return new ManualGroupUpdateRequest(operationId, catalogVersion,
zoneId, tableId, partitionIds);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
new file mode 100644
index 0000000000..432b6f3800
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class DisasterRecoveryRequestSerializerTest {
+ private final DisasterRecoveryRequestSerializer serializer = new
DisasterRecoveryRequestSerializer();
+
+ @Test
+ void serializationAndDeserializationOfManualGroupUpdateRequest() {
+ var originalRequest = new ManualGroupUpdateRequest(
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
+ 1000,
+ 2000,
+ 3000,
+ Set.of(11, 21, 31)
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalRequest,
serializer);
+ ManualGroupUpdateRequest restoredRequest = (ManualGroupUpdateRequest)
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ assertThat(restoredRequest.catalogVersion(), is(1000));
+ assertThat(restoredRequest.zoneId(), is(2000));
+ assertThat(restoredRequest.tableId(), is(3000));
+ assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ }
+
+ @Test
+ void v1OfManualGroupUpdateRequestCanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwEB775D782rkHhWNBIhQ2WHCbrc/ukH0Q+5FwQWDCA=");
+ ManualGroupUpdateRequest restoredRequest = (ManualGroupUpdateRequest)
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ assertThat(restoredRequest.catalogVersion(), is(1000));
+ assertThat(restoredRequest.zoneId(), is(2000));
+ assertThat(restoredRequest.tableId(), is(3000));
+ assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ }
+
+ @Test
+ void serializationAndDeserializationOfManualGroupRestartRequest() {
+ var originalRequest = new ManualGroupRestartRequest(
+ new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
+ 2000,
+ 3000,
+ Set.of(11, 21, 31),
+ Set.of("a", "b"),
+ HybridTimestamp.MAX_VALUE.longValue()
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalRequest,
serializer);
+ ManualGroupRestartRequest restoredRequest =
(ManualGroupRestartRequest) VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ assertThat(restoredRequest.zoneId(), is(2000));
+ assertThat(restoredRequest.tableId(), is(3000));
+ assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ assertThat(restoredRequest.nodeNames(), is(Set.of("a", "b")));
+ assertThat(restoredRequest.assignmentsTimestamp(),
is(HybridTimestamp.MAX_VALUE.longValue()));
+ }
+
+ @Test
+ void v1OfManualGroupRestartRequestCanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwIB775D782rkHhWNBIhQ2WHCbrc/tEPuRcEDCAWAwJiAmH/////////fw==");
+ ManualGroupRestartRequest restoredRequest =
(ManualGroupRestartRequest) VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ assertThat(restoredRequest.zoneId(), is(2000));
+ assertThat(restoredRequest.tableId(), is(3000));
+ assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ assertThat(restoredRequest.nodeNames(), is(Set.of("a", "b")));
+ assertThat(restoredRequest.assignmentsTimestamp(),
is(HybridTimestamp.MAX_VALUE.longValue()));
+ }
+}