This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new de146ce3c2 IGNITE-19453 Move raft command serialization before the action request creation. (#2772) de146ce3c2 is described below commit de146ce3c2842ed7b6f6dea6ad9e6cc2064f4a26 Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Tue Nov 14 16:35:32 2023 +0300 IGNITE-19453 Move raft command serialization before the action request creation. (#2772) --- .../impl/ItMetaStorageServicePersistenceTest.java | 7 ++ .../server/raft/ItMetaStorageRaftGroupTest.java | 33 +++++-- .../server/raft/MetaStorageListener.java | 4 +- .../server/raft/MetaStorageWriteHandler.java | 6 +- .../MessageDeserializerGenerator.java | 6 +- .../serialization/MessageSerializerGenerator.java | 7 +- .../ignite/network/annotations/Transient.java} | 17 ++-- modules/network/README.md | 3 + .../stream/DirectByteBufferStreamImplV1.java | 3 +- .../internal/placementdriver/ActiveActorTest.java | 8 +- .../placementdriver/PlacementDriverManager.java | 3 +- .../apache/ignite/internal/raft/Marshaller.java | 53 +++++++++++ .../apache/ignite/internal/raft/RaftManager.java | 7 +- .../ignite/internal/raft/RaftServiceFactory.java | 4 +- .../internal/raft/service/BeforeApplyHandler.java | 3 +- .../raft/server/ItJraftCounterServerTest.java | 32 ++++--- .../ignite/raft/server/ItJraftHlcServerTest.java | 8 +- .../raft/server/ItSimpleCounterServerTest.java | 15 +++- .../ignite/raft/server/JraftAbstractTest.java | 5 +- .../java/org/apache/ignite/internal/raft/Loza.java | 34 +++++-- .../ignite/internal/raft/RaftGroupServiceImpl.java | 20 ++++- .../internal/raft/server/RaftGroupOptions.java | 2 +- .../ignite/internal/raft/server/RaftServer.java | 1 + .../internal/raft/server/impl/JraftServerImpl.java | 15 ++-- .../internal/raft/util/OptimizedMarshaller.java | 2 +- .../raft/util/ThreadLocalOptimizedMarshaller.java | 2 +- .../ignite/raft/jraft/option/NodeOptions.java | 4 +- .../ignite/raft/jraft/rpc/WriteActionRequest.java | 14 ++- .../jraft/rpc/impl/ActionRequestInterceptor.java | 11 +-- .../jraft/rpc/impl/ActionRequestProcessor.java | 100 +++++++++++++++------ .../impl/InterceptingActionRequestProcessor.java | 8 +- .../rpc/impl/NullActionRequestInterceptor.java | 3 +- .../ignite/raft/jraft/storage/io/MessageFile.java | 7 +- .../snapshot/local/LocalSnapshotMetaTable.java | 6 +- .../ignite/raft/jraft/util/JDKMarshaller.java | 6 +- .../ignite/internal/raft/RaftGroupServiceTest.java | 24 +++-- .../raft/jraft/rpc/AppendEntriesBenchmark.java | 10 +-- .../service/ItAbstractListenerSnapshotTest.java | 16 ++-- .../raft/client/TopologyAwareRaftGroupService.java | 7 +- .../TopologyAwareRaftGroupServiceFactory.java | 7 +- .../client/TopologyAwareRaftGroupServiceTest.java | 15 +++- .../internal/sql/engine/ItBuildIndexTest.java | 10 ++- modules/table/build.gradle | 1 + .../ignite/distributed/ItTablePersistenceTest.java | 7 ++ .../raftsnapshot/ItTableRaftSnapshotsTest.java | 19 +++- .../internal/table/distributed/TableManager.java | 6 +- .../distributed/command/CatalogVersionAware.java | 7 ++ .../distributed/command/PartitionCommand.java | 12 ++- .../table/distributed/raft/PartitionListener.java | 4 +- .../schema/CheckCatalogVersionOnActionRequest.java | 18 ++-- .../schema/CheckCatalogVersionOnAppendEntries.java | 13 ++- .../schema/PartitionCommandsMarshaller.java | 9 +- .../schema/PartitionCommandsMarshallerImpl.java | 20 ++--- .../table/distributed/TableManagerTest.java | 10 ++- .../CheckCatalogVersionOnActionRequestTest.java | 25 +++--- .../PartitionCommandsMarshallerImplTest.java | 2 +- .../apache/ignite/distributed/ItTxTestCluster.java | 13 +-- 57 files changed, 511 insertions(+), 203 deletions(-) diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java index ea6930e0fa..bd4b87a4ab 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java @@ -32,11 +32,13 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -158,6 +160,11 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps return new TestReplicationGroupId("metastorage"); } + @Override + protected Marshaller commandsMarshaller(ClusterService clusterService) { + return new ThreadLocalOptimizedMarshaller(clusterService.serializationRegistry()); + } + /** * Check meta storage entry. * diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java index 3a84a2f05c..54e7d35bc0 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.Cursor; @@ -347,17 +348,22 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { assertTrue(cluster.size() > 1); + var commandsMarshaller = new ThreadLocalOptimizedMarshaller(cluster.get(0).serializationRegistry()); + NodeOptions opt1 = new NodeOptions(); opt1.setReplicationStateListeners( List.of(new UserReplicatorStateListener(replicatorStartedCounter, replicatorStoppedCounter))); + opt1.setCommandsMarshaller(commandsMarshaller); NodeOptions opt2 = new NodeOptions(); opt2.setReplicationStateListeners( List.of(new UserReplicatorStateListener(replicatorStartedCounter, replicatorStoppedCounter))); + opt2.setCommandsMarshaller(commandsMarshaller); NodeOptions opt3 = new NodeOptions(); opt3.setReplicationStateListeners( List.of(new UserReplicatorStateListener(replicatorStartedCounter, replicatorStoppedCounter))); + opt3.setCommandsMarshaller(commandsMarshaller); metaStorageRaftSrv1 = new JraftServerImpl(cluster.get(0), workDir.resolve("node1"), opt1, new RaftGroupEventsClientListener()); @@ -386,35 +392,38 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration, new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults()); - metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start( + metaStorageRaftGrpSvc1 = waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start( MetastorageGroupId.INSTANCE, cluster.get(0), FACTORY, raftConfiguration, membersConfiguration, true, - executor - ).get(); + executor, + commandsMarshaller + )); - metaStorageRaftGrpSvc2 = RaftGroupServiceImpl.start( + metaStorageRaftGrpSvc2 = waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start( MetastorageGroupId.INSTANCE, cluster.get(1), FACTORY, raftConfiguration, membersConfiguration, true, - executor - ).get(); + executor, + commandsMarshaller + )); - metaStorageRaftGrpSvc3 = RaftGroupServiceImpl.start( + metaStorageRaftGrpSvc3 = waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start( MetastorageGroupId.INSTANCE, cluster.get(2), FACTORY, raftConfiguration, membersConfiguration, true, - executor - ).get(); + executor, + commandsMarshaller + )); assertTrue(waitForCondition( () -> sameLeaders(metaStorageRaftGrpSvc1, metaStorageRaftGrpSvc2, metaStorageRaftGrpSvc3), 10_000), @@ -430,6 +439,12 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { return raftServersRaftGroups; } + private static RaftGroupService waitForRaftGroupServiceSafely(CompletableFuture<RaftGroupService> future) { + assertThat(future, willCompleteSuccessfully()); + + return future.join(); + } + /** * Checks if all raft groups have the same leader. * diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java index d3b5043d97..43cf5624e0 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java @@ -154,8 +154,8 @@ public class MetaStorageListener implements RaftGroupListener, BeforeApplyHandle } @Override - public void onBeforeApply(Command command) { - writeHandler.beforeApply(command); + public boolean onBeforeApply(Command command) { + return writeHandler.beforeApply(command); } @Override diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index 91feed7beb..3648d86827 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -294,7 +294,7 @@ public class MetaStorageWriteHandler { } } - void beforeApply(Command command) { + boolean beforeApply(Command command) { if (command instanceof MetaStorageWriteCommand) { // Initiator sends us a timestamp to adjust to. // Alter command by setting safe time based on the adjusted clock. @@ -303,6 +303,10 @@ public class MetaStorageWriteHandler { clusterTime.adjust(writeCommand.initiatorTime()); writeCommand.safeTimeLong(clusterTime.nowLong()); + + return true; } + + return false; } } diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java index e753dfb211..c9888afc2d 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.network.processor.serialization; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.network.processor.messages.MessageImplGenerator.getByteArrayFieldName; import com.squareup.javapoet.ArrayTypeName; @@ -35,6 +36,7 @@ import javax.tools.Diagnostic; import org.apache.ignite.internal.network.processor.MessageClass; import org.apache.ignite.internal.network.processor.MessageGroupWrapper; import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transient; import org.apache.ignite.network.serialization.MessageDeserializer; import org.apache.ignite.network.serialization.MessageMappingException; import org.apache.ignite.network.serialization.MessageReader; @@ -118,7 +120,9 @@ public class MessageDeserializerGenerator { .addParameter(MessageReader.class, "reader") .addException(MessageMappingException.class); - List<ExecutableElement> getters = message.getters(); + List<ExecutableElement> getters = message.getters().stream() + .filter(e -> e.getAnnotation(Transient.class) == null) + .collect(toList()); method .beginControlFlow("if (!reader.beforeMessageRead())") diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java index 36b90a4cef..9692ef9a10 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.network.processor.serialization; +import static java.util.stream.Collectors.toList; + import com.squareup.javapoet.ClassName; import com.squareup.javapoet.CodeBlock; import com.squareup.javapoet.MethodSpec; @@ -30,6 +32,7 @@ import javax.tools.Diagnostic; import org.apache.ignite.internal.network.processor.MessageClass; import org.apache.ignite.internal.network.processor.MessageGroupWrapper; import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Transient; import org.apache.ignite.network.serialization.MessageMappingException; import org.apache.ignite.network.serialization.MessageSerializer; import org.apache.ignite.network.serialization.MessageWriter; @@ -87,7 +90,9 @@ public class MessageSerializerGenerator { method.addStatement("$T message = ($T) msg", message.implClassName(), message.implClassName()).addCode("\n"); - List<ExecutableElement> getters = message.getters(); + List<ExecutableElement> getters = message.getters().stream() + .filter(e -> e.getAnnotation(Transient.class) == null) + .collect(toList()); method .beginControlFlow("if (!writer.isHeaderWritten())") diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/Transient.java similarity index 64% rename from modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java rename to modules/network-api/src/main/java/org/apache/ignite/network/annotations/Transient.java index cb4891e9cb..ecba91ed8d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java +++ b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/Transient.java @@ -14,14 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.ignite.raft.jraft.util; -import java.nio.ByteBuffer; +package org.apache.ignite.network.annotations; -public interface Marshaller { - public static Marshaller DEFAULT = new JDKMarshaller(); +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; - byte[] marshall(Object o); +/** + * Annotation for methods of {@link Transferable} interface, that makes generated serializer/deserializer ignore the property. + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.SOURCE) +public @interface Transient { - <T> T unmarshall(ByteBuffer raw); } diff --git a/modules/network/README.md b/modules/network/README.md index 12645de7b4..d189153aaa 100644 --- a/modules/network/README.md +++ b/modules/network/README.md @@ -18,6 +18,9 @@ Supported types: + `java.util.Map<K, V>` where `K` and `V` can be any supported type + Arrays of all supported types +If one wants to have a property in the message, that shouldn't be serialized, `@Transient` annotation should be used. +Such properties will still be available in builder, like the rest. + ## Threading Every Ignite node has three network thread pool executors and thread naming formats: + Client worker - handles channel events on a client (`{consistentId}-client-X`) diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java index fe3416e462..1014c85241 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.RandomAccess; import java.util.Set; import java.util.UUID; @@ -186,7 +187,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream { * @param serializationRegistry Serialization service. . */ public DirectByteBufferStreamImplV1(MessageSerializationRegistry serializationRegistry) { - this.serializationRegistry = serializationRegistry; + this.serializationRegistry = Objects.requireNonNull(serializationRegistry, "serializationRegistry"); } /** {@inheritDoc} */ diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java index 66feec2163..210dce87a1 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -465,10 +466,12 @@ public class ActiveActorTest extends IgniteAbstractTest { var dataPath = workDir.resolve("raft_" + localPeer.consistentId()); + NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setCommandsMarshaller(new ThreadLocalOptimizedMarshaller(cluster.serializationRegistry())); var raftServer = new JraftServerImpl( cluster, dataPath, - new NodeOptions(), + nodeOptions, eventsClientListener ); raftServer.start(); @@ -527,7 +530,8 @@ public class ActiveActorTest extends IgniteAbstractTest { executor, new LogicalTopologyServiceTestImpl(localClusterService), eventsClientListener, - notifyOnSubscription + notifyOnSubscription, + new ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry()) ).join(); } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java index 51526218e0..1b2a82f707 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java @@ -150,7 +150,8 @@ public class PlacementDriverManager implements IgniteComponent { return raftManager.startRaftGroupService( replicationGroupId, PeersAndLearners.fromConsistentIds(placementDriverNodes), - topologyAwareRaftGroupServiceFactory + topologyAwareRaftGroupServiceFactory, + null // Use default commands marshaller. ).thenCompose(client -> client.subscribeLeader(this::onLeaderChange).thenApply(v -> client)); } catch (NodeStoppingException e) { return failedFuture(e); diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java new file mode 100644 index 0000000000..a41a31268a --- /dev/null +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft; + +import java.nio.ByteBuffer; + +/** + * Marshaller interface, for instances that convert objects to {@code byte[]} and back. + */ +public interface Marshaller { + /** + * Converts an object into a byte array. + * + * @param o Object to serialize. + * @return Byte buffer with a serialized object. + */ + byte[] marshall(Object o); + + /** + * Converts byte buffer back to an object. + * + * @param raw Byte buffer with a serialized object. + * @param <T> Generic type for avoiding explicit cast of the result instance. + * @return Deserialized object. + */ + <T> T unmarshall(ByteBuffer raw); + + /** + * Overloaded {@link #unmarshall(ByteBuffer)}, which can be used to avoid manual {@link ByteBuffer#wrap(byte[])} calls. + * + * @param bytes Array with a serialized object. + * @param <T> Generic type for avoiding explicit cast of the result instance. + * @return Deserialized object. + */ + default <T> T unmarshall(byte[] bytes) { + return unmarshall(ByteBuffer.wrap(bytes)); + } +} diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java index e9d03e6408..a9cfff5c35 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Raft manager. @@ -148,6 +150,7 @@ public interface RaftManager extends IgniteComponent { * @return Future that will be completed with an instance of a Raft group service. * @throws NodeStoppingException If node stopping intention was detected. */ + @TestOnly CompletableFuture<RaftGroupService> startRaftGroupService( ReplicationGroupId groupId, PeersAndLearners configuration @@ -159,12 +162,14 @@ public interface RaftManager extends IgniteComponent { * @param groupId Raft group ID. * @param configuration Peers and Learners of the Raft group. * @param factory Factory that should be used to create raft service. + * @param commandsMarshaller Marshaller that should be used to serialize commands. {@code null} if default marshaller should be used. * @return Future that will be completed with an instance of a Raft group service. * @throws NodeStoppingException If node stopping intention was detected. */ <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService( ReplicationGroupId groupId, PeersAndLearners configuration, - RaftServiceFactory<T> factory + RaftServiceFactory<T> factory, + @Nullable Marshaller commandsMarshaller ) throws NodeStoppingException; } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java index ecea7e524d..c800a07cb9 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java @@ -34,12 +34,14 @@ public interface RaftServiceFactory<T extends RaftGroupService> { * @param peersAndLearners Peers configuration. * @param raftConfiguration Raft configuration. * @param raftClientExecutor Client executor. + * @param commandsMarshaller Marshaller that should be used to serialize commands. * @return Future that contains client when completes. */ CompletableFuture<T> startRaftGroupService( ReplicationGroupId groupId, PeersAndLearners peersAndLearners, RaftConfiguration raftConfiguration, - ScheduledExecutorService raftClientExecutor + ScheduledExecutorService raftClientExecutor, + Marshaller commandsMarshaller ); } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java index c8fd7ca1b4..ef78c7ffb5 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java @@ -38,6 +38,7 @@ public interface BeforeApplyHandler { * this is a place to do it. * * @param command The command. + * @return {@code true} if command has been changed, i.e. its fields have been modified. {@code false} otherwise. */ - void onBeforeApply(Command command); + boolean onBeforeApply(Command command); } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java index da083d2c31..9835fc8f6f 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.util.IgniteUtils; @@ -108,11 +109,13 @@ class ItJraftCounterServerTest extends JraftAbstractTest { Peer serverPeer = initialMembersConf.peer(localNodeName); + RaftGroupOptions groupOptions = groupOptions(raftServer); + raftServer.startRaftNode( - new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), defaults() + new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions ); raftServer.startRaftNode( - new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), defaults() + new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions ); }, opts -> {}); } @@ -131,7 +134,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group"), initialMembersConf.peer(localNodeName)); - raftServer.startRaftNode(nodeId, initialMembersConf, listenerFactory.get(), defaults()); + raftServer.startRaftNode(nodeId, initialMembersConf, listenerFactory.get(), groupOptions(raftServer)); }, opts -> {}); Set<Thread> threads = getAllDisruptorCurrentThreads(); @@ -150,7 +153,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { for (int i = 0; i < 10; i++) { var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group_" + i), serverPeer); - srv.startRaftNode(nodeId, initialMembersConf, listenerFactory.get(), defaults()); + srv.startRaftNode(nodeId, initialMembersConf, listenerFactory.get(), groupOptions(srv)); } }); @@ -535,7 +538,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest { Peer serverPeer = initialMembersConf.peer(localNodeName); - srv.startRaftNode(new RaftNodeId(groupId, serverPeer), initialMembersConf, listenerFactory.get(), defaults()); + RaftGroupOptions groupOptions = groupOptions(srv); + srv.startRaftNode(new RaftNodeId(groupId, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions); } })); } @@ -592,7 +596,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest { var listener = new UpdateCountRaftListener(counter, snapshotDataStorage); - RaftGroupOptions opts = defaults().snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)); + RaftGroupOptions opts = groupOptions(raftServer) + .snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)); raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), initialMembersConf, listener, opts); }, opts -> {}); @@ -654,7 +659,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest { var listener = new UpdateCountRaftListener(counter, snapshotDataStorage); - RaftGroupOptions opts = defaults().snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)); + RaftGroupOptions opts = groupOptions(raftServer) + .snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)); raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), initialMembersConf, listener, opts); @@ -789,8 +795,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest { Peer serverPeer = initialMembersConf.peer(localNodeName); - r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), defaults()); - r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), defaults()); + r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r)); + r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r)); }, opts -> {}); waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000); @@ -808,8 +814,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest { Peer serverPeer = initialMembersConf.peer(localNodeName); - r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), defaults()); - r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), defaults()); + r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r)); + r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r)); }, opts -> {}); waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000); @@ -872,4 +878,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest { clients.get(0).<Long>run(incrementAndGetCommand(1)).get(); assertEquals(index + 1, clients.get(0).readIndex().join()); } + + private static RaftGroupOptions groupOptions(RaftServer raftServer) { + return defaults().commandsMarshaller(new ThreadLocalOptimizedMarshaller(raftServer.clusterService().serializationRegistry())); + } } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java index f024ca4e52..2fcbeef9f4 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java @@ -24,6 +24,7 @@ import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress; import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology; +import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -39,6 +40,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; @@ -157,6 +159,8 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest { */ @Test public void testHlcOneInstancePerIgniteNode() { + ThreadLocalOptimizedMarshaller commandsMarshaller = new ThreadLocalOptimizedMarshaller(defaultSerializationRegistry()); + startServer(0, raftServer -> { String localNodeName = raftServer.clusterService().topologyService().localMember().name(); @@ -164,7 +168,7 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest { var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group"), localNode); - raftServer.startRaftNode(nodeId, initialConf, listenerFactory.get(), defaults()); + raftServer.startRaftNode(nodeId, initialConf, listenerFactory.get(), defaults().commandsMarshaller(commandsMarshaller)); }, opts -> {}); servers.forEach(srv -> { @@ -175,7 +179,7 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest { for (int i = 0; i < 5; i++) { var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group_" + i), localNode); - srv.startRaftNode(nodeId, initialConf, listenerFactory.get(), defaults()); + srv.startRaftNode(nodeId, initialConf, listenerFactory.get(), defaults().commandsMarshaller(commandsMarshaller)); } }); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java index 25770b4f4f..1520eb057b 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java @@ -39,9 +39,11 @@ import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; @@ -115,11 +117,16 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest { Peer serverPeer = memberConfiguration.peer(serverNodeName); + // Short name for long lines later in code. + var cmdMarshaller = new ThreadLocalOptimizedMarshaller(service.serializationRegistry()); + + RaftGroupOptions grpOptions = defaults().commandsMarshaller(cmdMarshaller); + assertTrue( - server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_0, serverPeer), memberConfiguration, new CounterListener(), defaults()) + server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_0, serverPeer), memberConfiguration, new CounterListener(), grpOptions) ); assertTrue( - server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_1, serverPeer), memberConfiguration, new CounterListener(), defaults()) + server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_1, serverPeer), memberConfiguration, new CounterListener(), grpOptions) ); ClusterService clientNode1 = clusterService(PORT + 1, List.of(addr), true); @@ -127,13 +134,13 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest { executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, logger())); client1 = RaftGroupServiceImpl - .start(COUNTER_GROUP_ID_0, clientNode1, FACTORY, raftConfiguration, memberConfiguration, false, executor) + .start(COUNTER_GROUP_ID_0, clientNode1, FACTORY, raftConfiguration, memberConfiguration, false, executor, cmdMarshaller) .get(3, TimeUnit.SECONDS); ClusterService clientNode2 = clusterService(PORT + 2, List.of(addr), true); client2 = RaftGroupServiceImpl - .start(COUNTER_GROUP_ID_1, clientNode2, FACTORY, raftConfiguration, memberConfiguration, false, executor) + .start(COUNTER_GROUP_ID_1, clientNode2, FACTORY, raftConfiguration, memberConfiguration, false, executor, cmdMarshaller) .get(3, TimeUnit.SECONDS); assertTrue(waitForTopology(service, 3, 10_000)); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java index 21207db67f..52bb3d7ca5 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; @@ -212,8 +213,10 @@ public abstract class JraftAbstractTest extends RaftServerAbstractTest { ClusterService clientNode = clusterService(CLIENT_PORT + clients.size(), List.of(addr), true); + var commandsMarshaller = new ThreadLocalOptimizedMarshaller(clientNode.serializationRegistry()); + RaftGroupService client = RaftGroupServiceImpl - .start(groupId, clientNode, FACTORY, raftConfiguration, configuration, false, executor) + .start(groupId, clientNode, FACTORY, raftConfiguration, configuration, false, executor, commandsMarshaller) .get(3, TimeUnit.SECONDS); clients.add(client); diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index a232853291..b6a80931c0 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.raft; +import static java.util.Objects.requireNonNullElse; + import java.nio.file.Path; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -38,6 +40,7 @@ import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -113,6 +116,7 @@ public class Loza implements RaftManager { NodeOptions options = new NodeOptions(); options.setClock(clock); + options.setCommandsMarshaller(new ThreadLocalOptimizedMarshaller(clusterNetSvc.serializationRegistry())); this.opts = options; @@ -300,7 +304,9 @@ public class Loza implements RaftManager { configuration, lsnr, eventsLsnr, - RaftGroupOptions.defaults().ownFsmCallerExecutorDisruptorConfig(disruptorConfiguration), + // Use default marshaller here, because this particular method is used in very specific circumstances. + RaftGroupOptions.defaults() + .ownFsmCallerExecutorDisruptorConfig(disruptorConfiguration), factory ); @@ -313,6 +319,7 @@ public class Loza implements RaftManager { } } + @TestOnly @Override public CompletableFuture<RaftGroupService> startRaftGroupService( ReplicationGroupId groupId, @@ -323,7 +330,8 @@ public class Loza implements RaftManager { } try { - return startRaftGroupServiceInternal(groupId, configuration); + // Use default command marshaller here. + return startRaftGroupServiceInternal(groupId, configuration, opts.getCommandsMarshaller()); } finally { busyLock.leaveBusy(); } @@ -333,14 +341,19 @@ public class Loza implements RaftManager { public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService( ReplicationGroupId groupId, PeersAndLearners configuration, - RaftServiceFactory<T> factory + RaftServiceFactory<T> factory, + @Nullable Marshaller commandsMarshaller ) throws NodeStoppingException { if (!busyLock.enterBusy()) { throw new NodeStoppingException(); } try { - return factory.startRaftGroupService(groupId, configuration, raftConfiguration, executor); + if (commandsMarshaller == null) { + commandsMarshaller = opts.getCommandsMarshaller(); + } + + return factory.startRaftGroupService(groupId, configuration, raftConfiguration, executor, commandsMarshaller); } finally { busyLock.leaveBusy(); } @@ -367,13 +380,17 @@ public class Loza implements RaftManager { )); } + Marshaller cmdMarshaller = requireNonNullElse(groupOptions.commandsMarshaller(), opts.getCommandsMarshaller()); + return raftServiceFactory == null - ? (CompletableFuture<T>) startRaftGroupServiceInternal(nodeId.groupId(), configuration) - : raftServiceFactory.startRaftGroupService(nodeId.groupId(), configuration, raftConfiguration, executor); + ? (CompletableFuture<T>) startRaftGroupServiceInternal(nodeId.groupId(), configuration, cmdMarshaller) + : raftServiceFactory.startRaftGroupService(nodeId.groupId(), configuration, raftConfiguration, executor, cmdMarshaller); } private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal( - ReplicationGroupId grpId, PeersAndLearners membersConfiguration + ReplicationGroupId grpId, + PeersAndLearners membersConfiguration, + Marshaller commandsMarshaller ) { return RaftGroupServiceImpl.start( grpId, @@ -382,7 +399,8 @@ public class Loza implements RaftManager { raftConfiguration, membersConfiguration, true, - executor + executor, + commandsMarshaller ); } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java index 66ca55ccbf..f0f3ec1304 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java @@ -111,6 +111,8 @@ public class RaftGroupServiceImpl implements RaftGroupService { /** Executor for scheduling retries of {@link RaftGroupServiceImpl#sendWithRetry} invocations. */ private final ScheduledExecutorService executor; + private final Marshaller commandsMarshaller; + /** Busy lock. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -124,6 +126,7 @@ public class RaftGroupServiceImpl implements RaftGroupService { * @param membersConfiguration Raft members configuration. * @param leader Group leader. * @param executor Executor for retrying requests. + * @param commandsMarshaller Marshaller that should be used to serialize/deserialize commands. */ private RaftGroupServiceImpl( ReplicationGroupId groupId, @@ -132,7 +135,8 @@ public class RaftGroupServiceImpl implements RaftGroupService { RaftConfiguration configuration, PeersAndLearners membersConfiguration, @Nullable Peer leader, - ScheduledExecutorService executor + ScheduledExecutorService executor, + Marshaller commandsMarshaller ) { this.cluster = cluster; this.configuration = configuration; @@ -143,6 +147,7 @@ public class RaftGroupServiceImpl implements RaftGroupService { this.realGroupId = groupId; this.leader = leader; this.executor = executor; + this.commandsMarshaller = commandsMarshaller; } /** @@ -164,7 +169,8 @@ public class RaftGroupServiceImpl implements RaftGroupService { RaftConfiguration configuration, PeersAndLearners membersConfiguration, boolean getLeader, - ScheduledExecutorService executor + ScheduledExecutorService executor, + Marshaller commandsMarshaller ) { var service = new RaftGroupServiceImpl( groupId, @@ -173,7 +179,8 @@ public class RaftGroupServiceImpl implements RaftGroupService { configuration, membersConfiguration, null, - executor + executor, + commandsMarshaller ); if (!getLeader) { @@ -453,9 +460,14 @@ public class RaftGroupServiceImpl implements RaftGroupService { Function<Peer, ActionRequest> requestFactory; if (cmd instanceof WriteCommand) { + byte[] commandBytes = commandsMarshaller.marshall(cmd); + requestFactory = targetPeer -> factory.writeActionRequest() .groupId(groupId) - .command((WriteCommand) cmd) + .command(commandBytes) + // Having prepared deserialized command makes its handling more efficient in the state machine. + // This saves us from extra-deserialization on a local machine, which would take precious time to do. + .deserializedCommand((WriteCommand) cmd) .build(); } else { requestFactory = targetPeer -> factory.readActionRequest() diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java index e849a99ffc..2825a7162b 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java @@ -17,11 +17,11 @@ package org.apache.ignite.internal.raft.server; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory; import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; -import org.apache.ignite.raft.jraft.util.Marshaller; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java index b757d2296e..d016dd9069 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java @@ -40,6 +40,7 @@ public interface RaftServer extends IgniteComponent { /** * Returns cluster service. */ + @TestOnly ClusterService clusterService(); /** diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 8b2404eb59..ca9393947d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.raft.server.impl; -import static java.util.Objects.requireNonNullElse; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; @@ -43,6 +42,7 @@ import java.util.stream.IntStream; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.IgniteSystemProperties; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -58,7 +58,6 @@ import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageFactory; -import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.network.ClusterService; @@ -91,7 +90,6 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy; -import org.apache.ignite.raft.jraft.util.Marshaller; import org.apache.ignite.raft.jraft.util.Utils; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -136,9 +134,6 @@ public class JraftServerImpl implements RaftServer { /** Request executor. */ private ExecutorService requestExecutor; - /** Marshaller for RAFT commands that is used if a marshaller is not specified in {@link RaftGroupOptions}. */ - private final Marshaller defaultCommandsMarshaller; - /** Raft service event interceptor. */ private final RaftServiceEventInterceptor serviceEventInterceptor; @@ -212,7 +207,6 @@ public class JraftServerImpl implements RaftServer { startGroupInProgressMonitors = Collections.unmodifiableList(monitors); - defaultCommandsMarshaller = new ThreadLocalOptimizedMarshaller(service.serializationRegistry()); serviceEventInterceptor = new RaftServiceEventInterceptor(); } @@ -474,10 +468,11 @@ public class JraftServerImpl implements RaftServer { nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString()); - Marshaller commandsMarshaller = requireNonNullElse(groupOptions.commandsMarshaller(), defaultCommandsMarshaller); - nodeOptions.setCommandsMarshaller(commandsMarshaller); + if (groupOptions.commandsMarshaller() != null) { + nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller()); + } - nodeOptions.setFsm(new DelegatingStateMachine(lsnr, commandsMarshaller)); + nodeOptions.setFsm(new DelegatingStateMachine(lsnr, nodeOptions.getCommandsMarshaller())); nodeOptions.setRaftGrpEvtsLsnr(new RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor, evLsnr)); diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java index 28121efda3..cc4609830e 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java @@ -24,11 +24,11 @@ import org.apache.ignite.internal.network.direct.DirectMessageReader; import org.apache.ignite.internal.network.direct.DirectMessageWriter; import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream; import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.serialization.MessageReader; import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.apache.ignite.network.serialization.MessageWriter; -import org.apache.ignite.raft.jraft.util.Marshaller; /** * Marshaller implementation that uses a {@link DirectByteBufferStream} variant to serialize/deserialize data. diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java index a25664c53d..e929125473 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.raft.util; import java.nio.ByteBuffer; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.network.serialization.MessageSerializationRegistry; -import org.apache.ignite.raft.jraft.util.Marshaller; /** * Thread-safe variant of {@link OptimizedMarshaller}. diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java index 05c424b473..785cd06189 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.raft.JraftGroupEventsListener; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; import org.apache.ignite.raft.jraft.JRaftServiceFactory; import org.apache.ignite.raft.jraft.StateMachine; @@ -35,14 +36,12 @@ import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor; import org.apache.ignite.raft.jraft.storage.SnapshotThrottle; import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl; import org.apache.ignite.raft.jraft.util.Copiable; -import org.apache.ignite.raft.jraft.util.Marshaller; import org.apache.ignite.raft.jraft.util.NoopTimeoutStrategy; import org.apache.ignite.raft.jraft.util.StringUtils; import org.apache.ignite.raft.jraft.util.TimeoutStrategy; import org.apache.ignite.raft.jraft.util.Utils; import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup; import org.apache.ignite.raft.jraft.util.timer.Timer; -import org.jetbrains.annotations.Nullable; /** * Node options. @@ -705,7 +704,6 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> { this.electionTimeoutStrategy = electionTimeoutStrategy; } - @Nullable public Marshaller getCommandsMarshaller() { return commandsMarshaller; } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java index 14ef3852e6..a2dc30eaa4 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java @@ -19,7 +19,9 @@ package org.apache.ignite.raft.jraft.rpc; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.network.annotations.Transient; import org.apache.ignite.raft.jraft.RaftMessageGroup.RpcActionMessageGroup; +import org.jetbrains.annotations.Nullable; /** * Submit a write action to a replication group. @@ -27,7 +29,15 @@ import org.apache.ignite.raft.jraft.RaftMessageGroup.RpcActionMessageGroup; @Transferable(RpcActionMessageGroup.WRITE_ACTION_REQUEST) public interface WriteActionRequest extends ActionRequest { /** - * Returns an action's command. + * @return Serialized action's command. Specific serialization format may differ from group to group. */ - WriteCommand command(); + byte[] command(); + + /** + * @return Original non-serialized command, if available. {@code null} if not. This field is used to avoid {@link #command()} + * deserialization in cases, where deserialized instance is already available. Typical situation for it is command's creation, where + * command is explicitly serialized into {@code byte[]} before building the message. + */ + @Transient + @Nullable WriteCommand deserializedCommand(); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java index d00bccacd7..e7e7e7e640 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java @@ -17,10 +17,10 @@ package org.apache.ignite.raft.jraft.rpc.impl; -import org.apache.ignite.raft.jraft.rpc.ActionRequest;import org.apache.ignite.raft.jraft.rpc.Message; -import org.apache.ignite.raft.jraft.rpc.RaftServerService; -import org.apache.ignite.raft.jraft.rpc.RpcContext;import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure; -import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; +import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.raft.jraft.rpc.ActionRequest; +import org.apache.ignite.raft.jraft.rpc.Message; +import org.apache.ignite.raft.jraft.rpc.RpcContext; import org.jetbrains.annotations.Nullable; /** @@ -34,7 +34,8 @@ public interface ActionRequestInterceptor { * * @param rpcCtx RPC context. * @param request Request in question. + * @param commandsMarshaller Marshaller that can be used to deserialize command from the request, if necessary. * @return A message to return to the caller, or {@code null} if standard handling should be used. */ - @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request); + @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request, Marshaller commandsMarshaller); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java index aa9bbcf185..358880a359 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java @@ -25,12 +25,15 @@ import java.util.concurrent.Executor; import org.apache.ignite.internal.lang.SafeTimeReorderException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine; import org.apache.ignite.internal.raft.service.BeforeApplyHandler; import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.raft.jraft.Closure; import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.RaftMessagesFactory; @@ -48,7 +51,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcProcessor; import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.rpc.WriteActionRequest; import org.apache.ignite.raft.jraft.util.BytesUtil; -import org.apache.ignite.raft.jraft.util.Marshaller; /** * Process action request. @@ -73,7 +75,7 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { /** {@inheritDoc} */ @Override - public void handleRequest(RpcContext rpcCtx, ActionRequest request) { + public final void handleRequest(RpcContext rpcCtx, ActionRequest request) { Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalConsistentId())); if (node == null) { @@ -82,39 +84,86 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { return; } - JraftServerImpl.DelegatingStateMachine fsm = (JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm(); + Marshaller commandsMarshaller = node.getOptions().getCommandsMarshaller(); + + assert commandsMarshaller != null : "Marshaller for group " + request.groupId() + " is not found."; + + handleRequestInternal(rpcCtx, node, request, commandsMarshaller); + } + + /** + * Internal part of the {@link #handleRequest(RpcContext, ActionRequest)}, that contains resolved RAFT node, as well as a commands + * marshaller instance. May be conveniently reused in subclasses. + */ + protected void handleRequestInternal(RpcContext rpcCtx, Node node, ActionRequest request, Marshaller commandsMarshaller) { + DelegatingStateMachine fsm = (DelegatingStateMachine) node.getOptions().getFsm(); + RaftGroupListener listener = fsm.getListener(); if (request instanceof WriteActionRequest) { + WriteActionRequest writeRequest = (WriteActionRequest)request; + + WriteCommand command = writeRequest.deserializedCommand(); + + if (command == null) { + command = commandsMarshaller.unmarshall(writeRequest.command()); + } + if (fsm.getListener() instanceof BeforeApplyHandler) { synchronized (groupIdSyncMonitor(request.groupId())) { try { - callOnBeforeApply(request, fsm); + writeRequest = patchCommandBeforeApply(writeRequest, (BeforeApplyHandler) listener, command, commandsMarshaller); } catch (SafeTimeReorderException e) { rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.EREORDER.getNumber()).build()); return; } - applyWrite(node, (WriteActionRequest) request, rpcCtx); + applyWrite(node, writeRequest, command, rpcCtx); } } else { - applyWrite(node, (WriteActionRequest) request, rpcCtx); + applyWrite(node, writeRequest, command, rpcCtx); } } else { - if (fsm.getListener() instanceof BeforeApplyHandler) { - callOnBeforeApply(request, fsm); + ReadActionRequest readRequest = (ReadActionRequest) request; + + if (listener instanceof BeforeApplyHandler) { + ReadCommand command = readRequest.command(); + + readRequest = patchCommandBeforeApply(readRequest, (BeforeApplyHandler) listener, command, commandsMarshaller); } - applyRead(node, (ReadActionRequest) request, rpcCtx); + applyRead(node, readRequest, rpcCtx); } } - private static void callOnBeforeApply(ActionRequest request, DelegatingStateMachine fsm) { - Command command = request instanceof WriteActionRequest - ? ((WriteActionRequest) request).command() - : ((ReadActionRequest) request).command(); + /** + * This method calls {@link BeforeApplyHandler#onBeforeApply(Command)} and returns action request with a serialized version of the + * updated command, if it has been updated. Otherwise, the method returns the original {@code request} instance. The reason for such + * behavior is the fact that we use {@code byte[]} in action requests, thus modified command should be serialized twice. + */ + private <AR extends ActionRequest> AR patchCommandBeforeApply( + AR request, + BeforeApplyHandler beforeApplyHandler, + Command command, + Marshaller commandsMarshaller + ) throws SafeTimeReorderException { + if (!beforeApplyHandler.onBeforeApply(command)) { + return request; + } - ((BeforeApplyHandler) fsm.getListener()).onBeforeApply(command); + if (request instanceof WriteActionRequest) { + return (AR) factory.writeActionRequest() + .groupId(request.groupId()) + .command(commandsMarshaller.marshall(command)) + .deserializedCommand((WriteCommand)command) + .build(); + } else { + return (AR) factory.readActionRequest() + .groupId(request.groupId()) + .command((ReadCommand)command) + .readOnlySafe(((ReadActionRequest)request).readOnlySafe()) + .build(); + } } private Object groupIdSyncMonitor(String groupId) { @@ -124,17 +173,14 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { } /** - * @param node The node. + * @param node The node. * @param request The request. - * @param rpcCtx The context. + * @param command The command. + * @param rpcCtx The context. */ - private void applyWrite(Node node, WriteActionRequest request, RpcContext rpcCtx) { - Marshaller commandsMarshaller = node.getOptions().getCommandsMarshaller(); - - assert commandsMarshaller != null; - - node.apply(new Task(ByteBuffer.wrap(commandsMarshaller.marshall(request.command())), - new CommandClosureImpl<>(request.command()) { + private void applyWrite(Node node, WriteActionRequest request, Command command, RpcContext rpcCtx) { + node.apply(new Task(ByteBuffer.wrap(request.command()), + new CommandClosureImpl<>(command) { @Override public void result(Serializable res) { if (res instanceof Throwable) { @@ -156,9 +202,9 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { } /** - * @param node The node. + * @param node The node. * @param request The request. - * @param rpcCtx The context. + * @param rpcCtx The context. */ private void applyRead(Node node, ReadActionRequest request, RpcContext rpcCtx) { if (request.readOnlySafe()) { @@ -171,7 +217,7 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { try { fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new CommandClosure<>() { @Override public ReadCommand command() { - return (ReadCommand)request.command(); + return request.command(); } @Override public void result(Serializable res) { @@ -201,7 +247,7 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { try { fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new CommandClosure<>() { @Override public ReadCommand command() { - return (ReadCommand)request.command(); + return request.command(); } @Override public void result(Serializable res) { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java index dac65dc7be..18fd8901dd 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java @@ -18,6 +18,8 @@ package org.apache.ignite.raft.jraft.rpc.impl; import java.util.concurrent.Executor; +import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.rpc.ActionRequest; import org.apache.ignite.raft.jraft.rpc.Message; @@ -39,13 +41,13 @@ public class InterceptingActionRequestProcessor extends ActionRequestProcessor { } @Override - public void handleRequest(RpcContext rpcCtx, ActionRequest request) { - Message interceptionResult = interceptor.intercept(rpcCtx, request); + protected void handleRequestInternal(RpcContext rpcCtx, Node node, ActionRequest request, Marshaller commandsMarshaller) { + Message interceptionResult = interceptor.intercept(rpcCtx, request, commandsMarshaller); if (interceptionResult != null) { rpcCtx.sendResponse(interceptionResult); } else { - super.handleRequest(rpcCtx, request); + super.handleRequestInternal(rpcCtx, node, request, commandsMarshaller); } } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java index 77c46ede23..686c425cae 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java @@ -17,6 +17,7 @@ package org.apache.ignite.raft.jraft.rpc.impl; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.raft.jraft.rpc.ActionRequest;import org.apache.ignite.raft.jraft.rpc.Message; import org.apache.ignite.raft.jraft.rpc.RpcContext; import org.jetbrains.annotations.Nullable; @@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable; */ public class NullActionRequestInterceptor implements ActionRequestInterceptor { @Override - public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request) { + public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request, Marshaller commandsMarshaller) { return null; } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java index 1242f4d608..590ce7f851 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java @@ -23,10 +23,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import org.apache.ignite.raft.jraft.rpc.Message; import org.apache.ignite.raft.jraft.util.Bits; -import org.apache.ignite.raft.jraft.util.Marshaller; +import org.apache.ignite.raft.jraft.util.JDKMarshaller; import org.apache.ignite.raft.jraft.util.Utils; /** @@ -60,7 +59,7 @@ public class MessageFile { } final byte[] nameBytes = new byte[len]; readBytes(nameBytes, input); - return Marshaller.DEFAULT.unmarshall(ByteBuffer.wrap(nameBytes)); + return JDKMarshaller.INSTANCE.unmarshall(nameBytes); } } @@ -83,7 +82,7 @@ public class MessageFile { final File file = new File(this.path + ".tmp"); try (final FileOutputStream fOut = new FileOutputStream(file); final BufferedOutputStream output = new BufferedOutputStream(fOut)) { - byte[] bytes = Marshaller.DEFAULT.marshall(msg); + byte[] bytes = JDKMarshaller.INSTANCE.marshall(msg); final byte[] lenBytes = new byte[4]; Bits.putInt(lenBytes, 0, bytes.length); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java index c0444379e9..a9d6241361 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java @@ -32,7 +32,7 @@ import org.apache.ignite.raft.jraft.entity.LocalStorageOutter.LocalSnapshotPbMet import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta; import org.apache.ignite.raft.jraft.option.RaftOptions; import org.apache.ignite.raft.jraft.storage.io.MessageFile; -import org.apache.ignite.raft.jraft.util.Marshaller; +import org.apache.ignite.raft.jraft.util.JDKMarshaller; /** * Table to keep local snapshot metadata infos. @@ -71,7 +71,7 @@ public class LocalSnapshotMetaTable { pbMetaBuilder.filesList(files); - return ByteBuffer.wrap(Marshaller.DEFAULT.marshall(pbMetaBuilder.build())); + return ByteBuffer.wrap(JDKMarshaller.INSTANCE.marshall(pbMetaBuilder.build())); } /** @@ -83,7 +83,7 @@ public class LocalSnapshotMetaTable { return false; } try { - final LocalSnapshotPbMeta pbMeta = Marshaller.DEFAULT.unmarshall(buf); + final LocalSnapshotPbMeta pbMeta = JDKMarshaller.INSTANCE.unmarshall(buf); if (pbMeta == null) { LOG.error("Fail to load meta from buffer."); return false; diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java index 9ecf321ef4..f5b46909b1 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java @@ -21,11 +21,15 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; +import org.apache.ignite.internal.raft.Marshaller; /** - * + * {@link Marshaller} implementation, based on standard {@link ObjectInputStream} and {@link ObjectOutputStream}. */ public class JDKMarshaller implements Marshaller { + /** Pre-allocated {@link JDKMarshaller} instance. */ + public static final Marshaller INSTANCE = new JDKMarshaller(); + /** * {@inheritDoc} */ diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java index f7a56f21ed..735b4a7d4f 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java @@ -56,6 +56,8 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.util.OptimizedMarshaller; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -66,6 +68,7 @@ import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; +import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.apache.ignite.raft.TestWriteCommand; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.Status; @@ -88,6 +91,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse; import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest; import org.apache.ignite.raft.jraft.rpc.WriteActionRequest; import org.apache.ignite.raft.jraft.rpc.impl.RaftException; +import org.apache.ignite.utils.ClusterServiceTestUtils; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -143,6 +147,9 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { when(cluster.messagingService()).thenReturn(messagingService); when(cluster.topologyService()).thenReturn(topologyService); + MessageSerializationRegistry serializationRegistry = ClusterServiceTestUtils.defaultSerializationRegistry(); + when(cluster.serializationRegistry()).thenReturn(serializationRegistry); + when(topologyService.getByConsistentId(any())) .thenAnswer(invocation -> { String consistentId = invocation.getArgument(0); @@ -587,8 +594,11 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { private RaftGroupService startRaftGroupService(List<Peer> peers, boolean getLeader) { PeersAndLearners memberConfiguration = PeersAndLearners.fromPeers(peers, Set.of()); - CompletableFuture<RaftGroupService> service = - RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, raftConfiguration, memberConfiguration, getLeader, executor); + var commandsSerializer = new ThreadLocalOptimizedMarshaller(cluster.serializationRegistry()); + + CompletableFuture<RaftGroupService> service = RaftGroupServiceImpl.start( + TEST_GRP, cluster, FACTORY, raftConfiguration, memberConfiguration, getLeader, executor, commandsSerializer + ); assertThat(service, willCompleteSuccessfully()); @@ -613,12 +623,16 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { * @param peer Fail the request targeted to given peer. */ private void mockUserInput(boolean delay, @Nullable Peer peer) { + //noinspection Convert2Lambda when(messagingService.invoke( any(ClusterNode.class), - argThat(new ArgumentMatcher<ActionRequest>() { + // Must be an anonymous class, to deduce the message type from the generic superclass. + argThat(new ArgumentMatcher<WriteActionRequest>() { @Override - public boolean matches(ActionRequest arg) { - return arg instanceof WriteActionRequest && ((WriteActionRequest) arg).command() instanceof TestWriteCommand; + public boolean matches(WriteActionRequest arg) { + Object command = new OptimizedMarshaller(cluster.serializationRegistry()).unmarshall(arg.command()); + + return command instanceof TestWriteCommand; } }), anyLong() diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java index 1c274f1f88..edd6145bb3 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java @@ -23,7 +23,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.util.AdaptiveBufAllocator; import org.apache.ignite.raft.jraft.util.ByteBufferCollector; import org.apache.ignite.raft.jraft.util.ByteString; -import org.apache.ignite.raft.jraft.util.Marshaller; +import org.apache.ignite.raft.jraft.util.JDKMarshaller; import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList; import org.apache.ignite.raft.jraft.util.RecycleUtil; import org.openjdk.jmh.annotations.Benchmark; @@ -171,7 +171,7 @@ public class AppendEntriesBenchmark { final ByteBuffer buf = dataBuffer.getBuffer(); buf.flip(); rb.data(new ByteString(buf)); - return Marshaller.DEFAULT.marshall(rb.build()); + return JDKMarshaller.INSTANCE.marshall(rb.build()); } private byte[] sendEntries2() { @@ -188,7 +188,7 @@ public class AppendEntriesBenchmark { final ByteBuffer buf = dataBuffer.getBuffer(); buf.flip(); rb.data(new ByteString(buf)); - return Marshaller.DEFAULT.marshall(rb.build()); + return JDKMarshaller.INSTANCE.marshall(rb.build()); } finally { RecycleUtil.recycle(dataBuffer); @@ -211,7 +211,7 @@ public class AppendEntriesBenchmark { final int remaining = buf.remaining(); handleThreadLocal.get().record(remaining); rb.data(new ByteString(buf)); - return Marshaller.DEFAULT.marshall(rb.build()); + return JDKMarshaller.INSTANCE.marshall(rb.build()); } finally { RecycleUtil.recycle(dataBuffer); @@ -231,7 +231,7 @@ public class AppendEntriesBenchmark { dataBuffer.add(buf.slice()); } rb.data(RecyclableByteBufferList.concatenate(dataBuffer)); - return Marshaller.DEFAULT.marshall(rb.build()); + return JDKMarshaller.INSTANCE.marshall(rb.build()); } finally { RecycleUtil.recycle(dataBuffer); diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java index ec6ce84fbb..bfdf5fb537 100644 --- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; import org.apache.ignite.internal.raft.RaftNodeId; @@ -433,7 +434,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener new RaftNodeId(raftGroupId(), initialMemberConf.peer(service.topologyService().localMember().name())), initialMemberConf, createListener(service, listenerPersistencePath, idx), - defaults() + defaults().commandsMarshaller(commandsMarshaller(service)) ); return server; @@ -454,14 +455,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener return startClient(testInfo, raftGroupId(), new NetworkAddress(getLocalAddress(), PORT)); } - /** - * Returns a client service. - * - * @return The client service. - */ - protected ClusterService clientService() { - return cluster.get(initialMemberConf.peers().size()); - } + protected abstract Marshaller commandsMarshaller(ClusterService clusterService); /** * Starts a client with a specific address. @@ -471,8 +465,10 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener private RaftGroupService startClient(TestInfo testInfo, TestReplicationGroupId groupId, NetworkAddress addr) { ClusterService clientNode = clusterService(testInfo, CLIENT_PORT + clients.size(), addr); + Marshaller commandsMarshaller = commandsMarshaller(clientNode); + CompletableFuture<RaftGroupService> clientFuture = RaftGroupServiceImpl - .start(groupId, clientNode, FACTORY, raftConfiguration, initialMemberConf, true, executor); + .start(groupId, clientNode, FACTORY, raftConfiguration, initialMemberConf, true, executor, commandsMarshaller); assertThat(clientFuture, willCompleteSuccessfully()); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java index 96fc615916..993a4da6ab 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.LeaderElectionListener; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; @@ -165,6 +166,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param logicalTopologyService Logical topology service. * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, even * if the leader did not change in that moment (see {@link #subscribeLeader}). + * @param cmdMarshaller Marshaller that should be used to serialize/deserialize commands. * @return Future to create a raft client. */ public static CompletableFuture<TopologyAwareRaftGroupService> start( @@ -177,9 +179,10 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { ScheduledExecutorService executor, LogicalTopologyService logicalTopologyService, RaftGroupEventsClientListener eventsClientListener, - boolean notifyOnSubscription + boolean notifyOnSubscription, + Marshaller cmdMarshaller ) { - return RaftGroupServiceImpl.start(groupId, cluster, factory, raftConfiguration, configuration, getLeader, executor) + return RaftGroupServiceImpl.start(groupId, cluster, factory, raftConfiguration, configuration, getLeader, executor, cmdMarshaller) .thenApply(raftGroupService -> new TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration, raftGroupService, logicalTopologyService, eventsClientListener, notifyOnSubscription)); } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java index 959f21f8ea..304d018188 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.client; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; @@ -66,7 +67,8 @@ public class TopologyAwareRaftGroupServiceFactory implements RaftServiceFactory< ReplicationGroupId groupId, PeersAndLearners peersAndLearners, RaftConfiguration raftConfiguration, - ScheduledExecutorService raftClientExecutor + ScheduledExecutorService raftClientExecutor, + Marshaller commandsMarshaller ) { return TopologyAwareRaftGroupService.start( groupId, @@ -78,7 +80,8 @@ public class TopologyAwareRaftGroupServiceFactory implements RaftServiceFactory< raftClientExecutor, logicalTopologyService, eventsClientListener, - true + true, + commandsMarshaller ); } } diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java index 2fe2548f28..dedfe854c8 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.raft.TestRaftGroupListener; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -348,10 +349,15 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { var dataPath = workDir.resolve("raft_" + localPeer.consistentId()); + var commandsMarshaller = new ThreadLocalOptimizedMarshaller(cluster.serializationRegistry()); + + NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setCommandsMarshaller(commandsMarshaller); + var raftServer = new JraftServerImpl( cluster, dataPath, - new NodeOptions(), + nodeOptions, eventsClientListener ); raftServer.start(); @@ -360,7 +366,7 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { new RaftNodeId(GROUP_ID, localPeer), peersAndLearners, new TestRaftGroupListener(), - RaftGroupOptions.defaults() + RaftGroupOptions.defaults().commandsMarshaller(commandsMarshaller) ); raftServers.put(addr, raftServer); @@ -398,6 +404,8 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { }); } + var commandsMarshaller = new ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry()); + return TopologyAwareRaftGroupService.start( GROUP_ID, localClusterService, @@ -408,7 +416,8 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { executor, new LogicalTopologyServiceTestImpl(localClusterService), eventsClientListener, - notifyOnSubscription + notifyOnSubscription, + commandsMarshaller ).join(); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java index 21bfa15b83..e3366fc3eb 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java @@ -47,7 +47,9 @@ import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; +import org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl; import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.apache.ignite.raft.jraft.rpc.WriteActionRequest; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -236,13 +238,17 @@ public class ItBuildIndexTest extends BaseSqlIntegrationTest { * the command was sent. * @param dropBuildIndexCommand {@code True} to drop {@link BuildIndexCommand}. */ - private static BiPredicate<String, NetworkMessage> waitSendBuildIndexCommand( + private BiPredicate<String, NetworkMessage> waitSendBuildIndexCommand( CompletableFuture<Integer> sendBuildIndexCommandFuture, boolean dropBuildIndexCommand ) { + IgniteImpl node = CLUSTER.node(0); + MessageSerializationRegistry serializationRegistry = node.raftManager().service().serializationRegistry(); + var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry); + return (nodeConsistentId, networkMessage) -> { if (networkMessage instanceof WriteActionRequest) { - Command command = ((WriteActionRequest) networkMessage).command(); + Command command = commandsMarshaller.unmarshall(((WriteActionRequest) networkMessage).command()); if (command instanceof BuildIndexCommand) { sendBuildIndexCommandFuture.complete(((BuildIndexCommand) command).indexId()); diff --git a/modules/table/build.gradle b/modules/table/build.gradle index 4af4baa239..189e0952ec 100644 --- a/modules/table/build.gradle +++ b/modules/table/build.gradle @@ -67,6 +67,7 @@ dependencies { testImplementation(testFixtures(project(':ignite-storage-api'))) testImplementation(testFixtures(project(':ignite-metastorage'))) testImplementation(testFixtures(project(':ignite-marshaller-common'))) + testImplementation(testFixtures(project(':ignite-network'))) testImplementation(testFixtures(project(':ignite-placement-driver-api'))) testImplementation(testFixtures(project(':ignite-distribution-zones'))) testImplementation(testFixtures(project(':ignite-catalog'))) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java index 3db0aa2e3d..c2f066a838 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest; @@ -91,6 +92,7 @@ import org.apache.ignite.internal.table.distributed.replication.request.ReadWrit import org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; +import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -523,6 +525,11 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti return new TestReplicationGroupId("partitions"); } + @Override + protected Marshaller commandsMarshaller(ClusterService clusterService) { + return new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); + } + /** * Creates a {@link Row} with the supplied key and value. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index c9ed4e4b64..ed6954c01c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -49,6 +49,7 @@ import java.util.stream.IntStream; import org.apache.ignite.internal.Cluster; import org.apache.ignite.internal.IgniteIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.cluster.management.CmgGroupId; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; @@ -60,12 +61,14 @@ import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; +import org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl; import org.apache.ignite.internal.test.WatchListenerInhibitor; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.log4j2.LogInspector; import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler; import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.apache.ignite.raft.jraft.RaftGroupService; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.Status; @@ -556,9 +559,12 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { try { prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, theCluster -> { + IgniteImpl node = theCluster.node(0); + MessageSerializationRegistry serializationRegistry = node.raftManager().service().serializationRegistry(); + BiPredicate<String, NetworkMessage> dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed = (recipientId, message) -> message instanceof WriteActionRequest - && ((WriteActionRequest) message).command() instanceof SafeTimeSyncCommand + && isSafeTimeSyncCommand((WriteActionRequest) message, serializationRegistry) && !snapshotInstallFailedDueToIdenticalRetry.get(); theCluster.node(0).dropMessages( @@ -576,6 +582,17 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { } } + private static boolean isSafeTimeSyncCommand(WriteActionRequest request, MessageSerializationRegistry serializationRegistry) { + String groupId = request.groupId(); + + if (groupId.equals(MetastorageGroupId.INSTANCE.toString()) || groupId.equals(CmgGroupId.INSTANCE.toString())) { + return false; + } + + var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry); + return commandsMarshaller.unmarshall(request.command()) instanceof SafeTimeSyncCommand; + } + @Test void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception { CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new CompletableFuture<>(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 4fb2bd45ca..19d2e1c3f9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -107,6 +107,7 @@ import org.apache.ignite.internal.metastorage.dsl.Conditions; import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -175,7 +176,6 @@ import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.TopologyService; import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage; -import org.apache.ignite.raft.jraft.util.Marshaller; import org.apache.ignite.table.Table; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -756,7 +756,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { .thenComposeAsync(v -> inBusyLock(busyLock, () -> { try { //TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online. - return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory); + return raftMgr.startRaftGroupService( + replicaGrpId, newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller + ); } catch (NodeStoppingException ex) { return failedFuture(ex); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java index f7f4490344..a115b59449 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table.distributed.command; +import org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshaller; + /** * A command that requires certain level of catalog version to be locally available just to be accepted on the node. */ @@ -25,4 +27,9 @@ public interface CatalogVersionAware { * Returns version that the Catalog must have locally for the node to be allowed to accept this command via replication. */ int requiredCatalogVersion(); + + /** + * Setter for {@link #requiredCatalogVersion()}. Called by the creator or the {@link PartitionCommandsMarshaller} while deserializing. + */ + void requiredCatalogVersion(int version); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java index cb56b85d8f..a69d8fa731 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.table.distributed.command; import java.util.UUID; import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand; +import org.apache.ignite.network.annotations.Transient; +import org.apache.ignite.network.annotations.WithSetter; /** * Partition transactional command. @@ -34,9 +36,13 @@ public interface PartitionCommand extends SafeTimePropagatingCommand, CatalogVer */ boolean full(); - /** - * Returns version that the Catalog must have locally for the node to be allowed to accept this command via replication. - */ @Override + @Transient + @WithSetter int requiredCatalogVersion(); + + @Override + default void requiredCatalogVersion(int version) { + // No-op. + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 27b425273f..5c1710fa1e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -477,7 +477,7 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler } @Override - public void onBeforeApply(Command command) { + public boolean onBeforeApply(Command command) { // This method is synchronized by replication group specific monitor, see ActionRequestProcessor#handleRequest. if (command instanceof SafeTimePropagatingCommand) { SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command; @@ -489,6 +489,8 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler throw new SafeTimeReorderException(); } } + + return false; } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java index 47b90121bb..499ae26edd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java @@ -19,12 +19,12 @@ package org.apache.ignite.internal.table.distributed.schema; import static org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor; +import java.nio.ByteBuffer; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.table.distributed.command.CatalogVersionAware; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.core.NodeImpl; @@ -53,7 +53,7 @@ public class CheckCatalogVersionOnActionRequest implements ActionRequestIntercep } @Override - public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request) { + public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request, Marshaller commandsMarshaller) { Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalConsistentId())); if (node == null) { @@ -65,15 +65,21 @@ public class CheckCatalogVersionOnActionRequest implements ActionRequestIntercep return errorIfNotLeader; } + if (!(commandsMarshaller instanceof PartitionCommandsMarshaller)) { + return null; + } + if (!(request instanceof WriteActionRequest)) { return null; } - Command command = ((WriteActionRequest) request).command(); + byte[] command = ((WriteActionRequest) request).command(); + + var partitionCommandsMarshaller = (PartitionCommandsMarshaller) commandsMarshaller; - if (command instanceof CatalogVersionAware) { - int requiredCatalogVersion = ((CatalogVersionAware) command).requiredCatalogVersion(); + int requiredCatalogVersion = partitionCommandsMarshaller.readRequiredCatalogVersion(ByteBuffer.wrap(command)); + if (requiredCatalogVersion >= 0) { if (!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) { // TODO: IGNITE-20298 - throttle logging. LOG.warn( diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java index 53270535b6..519b833c8d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.table.distributed.schema; import static org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor; +import static org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshaller.NO_VERSION_REQUIRED; import java.nio.ByteBuffer; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType; import org.apache.ignite.raft.jraft.entity.RaftOutter; @@ -34,7 +36,6 @@ import org.apache.ignite.raft.jraft.rpc.RaftServerService; import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure; import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor; -import org.apache.ignite.raft.jraft.util.Marshaller; import org.jetbrains.annotations.Nullable; /** @@ -44,8 +45,6 @@ import org.jetbrains.annotations.Nullable; public class CheckCatalogVersionOnAppendEntries implements AppendEntriesRequestInterceptor { private static final IgniteLogger LOG = Loggers.forClass(CheckCatalogVersionOnAppendEntries.class); - private static final int NO_VERSION_REQUIREMENT = Integer.MIN_VALUE; - private final CatalogService catalogService; public CheckCatalogVersionOnAppendEntries(CatalogService catalogService) { @@ -66,7 +65,7 @@ public class CheckCatalogVersionOnAppendEntries implements AppendEntriesRequestI for (RaftOutter.EntryMeta entry : request.entriesList()) { int requiredCatalogVersion = readRequiredCatalogVersionForMeta(allData, entry, node.getOptions().getCommandsMarshaller()); - if (requiredCatalogVersion != NO_VERSION_REQUIREMENT && !isMetadataAvailableFor(requiredCatalogVersion, catalogService)) { + if (requiredCatalogVersion != NO_VERSION_REQUIRED && !isMetadataAvailableFor(requiredCatalogVersion, catalogService)) { // TODO: IGNITE-20298 - throttle logging. LOG.warn( "Metadata not yet available, rejecting AppendEntriesRequest with EBUSY [group={}, requiredLevel={}].", @@ -91,11 +90,11 @@ public class CheckCatalogVersionOnAppendEntries implements AppendEntriesRequestI private static int readRequiredCatalogVersionForMeta(ByteBuffer allData, final EntryMeta entry, Marshaller commandsMarshaller) { if (entry.type() != EntryType.ENTRY_TYPE_DATA) { - return NO_VERSION_REQUIREMENT; + return NO_VERSION_REQUIRED; } if (!(commandsMarshaller instanceof PartitionCommandsMarshaller)) { - return NO_VERSION_REQUIREMENT; + return NO_VERSION_REQUIRED; } PartitionCommandsMarshaller partitionCommandsMarshaller = (PartitionCommandsMarshaller) commandsMarshaller; @@ -105,6 +104,6 @@ public class CheckCatalogVersionOnAppendEntries implements AppendEntriesRequestI return partitionCommandsMarshaller.readRequiredCatalogVersion(allData); } - return NO_VERSION_REQUIREMENT; + return NO_VERSION_REQUIRED; } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java index 19079a3528..7a00f2ac7a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java @@ -18,18 +18,23 @@ package org.apache.ignite.internal.table.distributed.schema; import java.nio.ByteBuffer; -import org.apache.ignite.raft.jraft.util.Marshaller; +import org.apache.ignite.internal.raft.Marshaller; /** * {@link Marshaller} that first writes some metadata about an object and then it writes the actual serialized * representation of the object. */ public interface PartitionCommandsMarshaller extends Marshaller { + /** + * Used instead of a required catalog version when there is no requirement. + */ + int NO_VERSION_REQUIRED = -1; + /** * Reads required catalog version from the provided buffer. * * @param raw Buffer to read from. - * @return Catalog version. + * @return Catalog version. {@value #NO_VERSION_REQUIRED} if version is not required for the given command. */ int readRequiredCatalogVersion(ByteBuffer raw); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java index faeae373e0..2e3dd47947 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java @@ -27,12 +27,6 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry; * Default {@link PartitionCommandsMarshaller} implementation. */ public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller implements PartitionCommandsMarshaller { - /** - * Sent instead of a real required catalog version when there is no requirement. - * -1 so that it is encoded as 1 byte by {@link VarIntUtils}. - */ - private static final int NO_VERSION_REQUIREMENT = -1; - public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializationRegistry) { super(serializationRegistry); } @@ -41,7 +35,7 @@ public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller impleme public byte[] marshall(Object o) { int requiredCatalogVersion = o instanceof CatalogVersionAware ? ((CatalogVersionAware) o).requiredCatalogVersion() - : NO_VERSION_REQUIREMENT; + : NO_VERSION_REQUIRED; stream.setBuffer(buffer); stream.writeInt(requiredCatalogVersion); @@ -51,13 +45,15 @@ public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller impleme @Override public <T> T unmarshall(ByteBuffer raw) { - skipRequiredCatalogVersion(raw); + int requiredCatalogVersion = readRequiredCatalogVersion(raw); - return super.unmarshall(raw); - } + T res = super.unmarshall(raw); + + if (res instanceof CatalogVersionAware) { + ((CatalogVersionAware) res).requiredCatalogVersion(requiredCatalogVersion); + } - private void skipRequiredCatalogVersion(ByteBuffer raw) { - readRequiredCatalogVersion(raw); + return res; } /** diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 6f1cefff12..f6046b0163 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -279,7 +279,8 @@ public class TableManagerTest extends IgniteAbstractTest { */ @Test public void testPreconfiguredTable() throws Exception { - when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class))); + when(rm.startRaftGroupService(any(), any(), any(), any())) + .thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class))); TableManager tableManager = createTableManager(tblManagerFut); @@ -449,7 +450,7 @@ public class TableManagerTest extends IgniteAbstractTest { private IgniteBiTuple<TableViewInternal, TableManager> startTableManagerStopTest() throws Exception { TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut); - verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), any()); + verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), any(), any()); TableManager tableManager = tblManagerFut.join(); @@ -527,7 +528,8 @@ public class TableManagerTest extends IgniteAbstractTest { * partition storage is emulated instead. */ private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorageUnderRebalance) throws NodeStoppingException { - when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class))); + when(rm.startRaftGroupService(any(), any(), any(), any())) + .thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class))); when(rm.raftNodeReadyFuture(any())).thenReturn(completedFuture(1L)); createZone(1, 1); @@ -609,7 +611,7 @@ public class TableManagerTest extends IgniteAbstractTest { ) throws Exception { String consistentId = "node0"; - when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> { + when(rm.startRaftGroupService(any(), any(), any(), any())).thenAnswer(mock -> { RaftGroupService raftGrpSrvcMock = mock(TopologyAwareRaftGroupService.class); when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java index aa7953c418..b6c3c8e073 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.schema; +import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -83,6 +84,8 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { private final PeerId leaderId = new PeerId("leader"); + private PartitionCommandsMarshallerImpl commandsMarshaller; + @BeforeEach void initMocks() { when(rpcContext.getNodeManager()).thenReturn(nodeManager); @@ -91,16 +94,18 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { lenient().when(node.getRaftOptions()).thenReturn(raftOptions); lenient().when(node.getNodeState()).thenReturn(State.STATE_LEADER); lenient().when(node.getLeaderId()).thenReturn(leaderId); + + commandsMarshaller = new PartitionCommandsMarshallerImpl(defaultSerializationRegistry()); } @Test void delegatesWhenCommandHasNoRequiredCatalogVersion() { ActionRequest request = raftMessagesFactory.writeActionRequest() .groupId("test") - .command(commandWithoutRequiredCatalogVersion()) + .command(commandsMarshaller.marshall(commandWithoutRequiredCatalogVersion())) .build(); - assertThat(interceptor.intercept(rpcContext, request), is(nullValue())); + assertThat(interceptor.intercept(rpcContext, request, commandsMarshaller), is(nullValue())); } private WriteCommand commandWithoutRequiredCatalogVersion() { @@ -113,10 +118,10 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { ActionRequest request = raftMessagesFactory.writeActionRequest() .groupId("test") - .command(commandWithRequiredCatalogVersion(3)) + .command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(3))) .build(); - assertThat(interceptor.intercept(rpcContext, request), is(nullValue())); + assertThat(interceptor.intercept(rpcContext, request, commandsMarshaller), is(nullValue())); } private WriteCommand commandWithRequiredCatalogVersion(int requiredVersion) { @@ -135,10 +140,10 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { ActionRequest request = raftMessagesFactory.writeActionRequest() .groupId("test") - .command(commandWithRequiredCatalogVersion(6)) + .command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(6))) .build(); - Message result = interceptor.intercept(rpcContext, request); + Message result = interceptor.intercept(rpcContext, request, commandsMarshaller); assertThat(result, is(notNullValue())); assertThat(result, instanceOf(ErrorResponse.class)); @@ -156,10 +161,10 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { ActionRequest request = raftMessagesFactory.writeActionRequest() .groupId("test") - .command(commandWithRequiredCatalogVersion(6)) + .command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(6))) .build(); - Message result = interceptor.intercept(rpcContext, request); + Message result = interceptor.intercept(rpcContext, request, commandsMarshaller); assertThat(result, is(notNullValue())); assertThat(result, instanceOf(ErrorResponse.class)); @@ -182,10 +187,10 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { ActionRequest request = raftMessagesFactory.writeActionRequest() .groupId("test") - .command(commandWithRequiredCatalogVersion(6)) + .command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(6))) .build(); - Message result = interceptor.intercept(rpcContext, request); + Message result = interceptor.intercept(rpcContext, request, commandsMarshaller); assertThat(result, is(notNullValue())); assertThat(result, instanceOf(ErrorResponse.class)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java index 53ca61dcb9..5f31c1a1f9 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java @@ -108,7 +108,7 @@ class PartitionCommandsMarshallerImplTest { byte[] serialized = partitionCommandsMarshaller.marshall(message); - FinishTxCommand unmarshalled = partitionCommandsMarshaller.unmarshall(ByteBuffer.wrap(serialized)); + FinishTxCommand unmarshalled = partitionCommandsMarshaller.unmarshall(serialized); assertThat(unmarshalled.requiredCatalogVersion(), is(42)); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 84ef75e717..cb1f5b3334 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -21,7 +21,6 @@ import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.CollectionUtils.first; import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses; @@ -91,7 +90,6 @@ import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; -import org.apache.ignite.internal.storage.impl.TestMvTableStorage; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor; import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; @@ -112,6 +110,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TransactionStateR import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService; import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; +import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; @@ -441,6 +440,9 @@ public class ItTxTestCluster { int globalIndexId = 1; + ThreadLocalPartitionCommandsMarshaller commandsMarshaller = + new ThreadLocalPartitionCommandsMarshaller(cluster.get(0).serializationRegistry()); + for (int p = 0; p < assignments.size(); p++) { Set<String> partAssignments = assignments.get(p); @@ -449,7 +451,6 @@ public class ItTxTestCluster { for (String assignment : partAssignments) { int partId = p; - var mvTableStorage = new TestMvTableStorage(tableId, DEFAULT_PARTITION_COUNT); var mvPartStorage = new TestMvPartitionStorage(partId); var txStateStorage = txStateStorages.get(assignment); var transactionStateResolver = new TransactionStateResolver( @@ -575,7 +576,7 @@ public class ItTxTestCluster { if (startClient) { RaftGroupService service = RaftGroupServiceImpl - .start(grpId, client, FACTORY, raftConfig, membersConf, true, executor) + .start(grpId, client, FACTORY, raftConfig, membersConf, true, executor, commandsMarshaller) .get(5, TimeUnit.SECONDS); clients.put(p, service); @@ -584,7 +585,7 @@ public class ItTxTestCluster { ClusterService tmpSvc = cluster.get(0); RaftGroupService service = RaftGroupServiceImpl - .start(grpId, tmpSvc, FACTORY, raftConfig, membersConf, true, executor) + .start(grpId, tmpSvc, FACTORY, raftConfig, membersConf, true, executor, commandsMarshaller) .get(5, TimeUnit.SECONDS); Peer leader = service.leader(); @@ -597,7 +598,7 @@ public class ItTxTestCluster { .orElseThrow(); RaftGroupService leaderClusterSvc = RaftGroupServiceImpl - .start(grpId, leaderSrv, FACTORY, raftConfig, membersConf, true, executor) + .start(grpId, leaderSrv, FACTORY, raftConfig, membersConf, true, executor, commandsMarshaller) .get(5, TimeUnit.SECONDS); clients.put(p, leaderClusterSvc);