This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 75e7464477 IGNITE-17871 JDKMarshaller replaced with
OptimizedMarshaller for RAFT commands. (#1411)
75e7464477 is described below
commit 75e7464477a26b4cd34af9e040c44687018689cf
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Dec 12 12:46:30 2022 +0300
IGNITE-17871 JDKMarshaller replaced with OptimizedMarshaller for RAFT
commands. (#1411)
---
.../raft/commands/InitCmgStateCommand.java | 3 +-
.../management/raft/commands/JoinReadyCommand.java | 3 +-
.../raft/commands/JoinRequestCommand.java | 3 +-
.../raft/commands/NodesLeaveCommand.java | 3 +-
.../raft/commands/ReadLogicalTopologyCommand.java | 3 +-
.../management/raft/commands/ReadStateCommand.java | 3 +-
.../metastorage/common/command/GetAllCommand.java | 3 +-
.../common/command/GetAndPutAllCommand.java | 3 +-
.../common/command/GetAndPutCommand.java | 3 +-
.../common/command/GetAndRemoveAllCommand.java | 3 +-
.../common/command/GetAndRemoveCommand.java | 3 +-
.../metastorage/common/command/GetCommand.java | 3 +-
.../metastorage/common/command/InvokeCommand.java | 3 +-
.../common/command/MultiInvokeCommand.java | 3 +-
.../metastorage/common/command/PutAllCommand.java | 3 +-
.../metastorage/common/command/PutCommand.java | 3 +-
.../metastorage/common/command/RangeCommand.java | 3 +-
.../common/command/RemoveAllCommand.java | 3 +-
.../metastorage/common/command/RemoveCommand.java | 3 +-
.../common/command/WatchExactKeysCommand.java | 3 +-
.../common/command/WatchRangeKeysCommand.java | 3 +-
.../common/command/cursor/CursorCloseCommand.java | 3 +-
.../command/cursor/CursorHasNextCommand.java | 3 +-
.../common/command/cursor/CursorNextCommand.java | 3 +-
.../common/command/cursor/CursorsCloseCommand.java | 3 +-
.../org/apache/ignite/internal/raft/Command.java | 4 +-
.../raft/server/ItJraftCounterServerTest.java | 38 +++++----
.../apache/ignite/raft/server/ItSafeTimeTest.java | 4 +-
.../raft/server/ItSimpleCounterServerTest.java | 20 ++---
.../raft/server/snasphot/TestWriteCommand.java | 26 ------
.../server/snasphot/UpdateCountRaftListener.java | 1 +
.../internal/raft/server/impl/JraftServerImpl.java | 18 ++++-
.../jraft/rpc/impl/ActionRequestProcessor.java | 10 ++-
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 4 +-
.../ignite/internal/raft/RaftGroupServiceTest.java | 21 +++--
.../org/apache/ignite/raft/TestWriteCommand.java | 8 +-
.../ignite/raft/messages/TestMessageGroup.java | 7 ++
.../raft/server/counter/GetValueCommand.java | 10 ++-
.../server/counter/IncrementAndGetCommand.java | 27 +++----
.../replicator/command/SafeTimeSyncCommand.java | 5 +-
.../replicator/message/ReplicaMessageGroup.java | 22 +++---
.../ignite/internal/table/ItColocationTest.java | 14 ++--
.../table/distributed/TableMessageGroup.java | 9 ++-
.../distributed/command/PartitionCommand.java | 3 +-
.../distributed/command/UpdateAllCommand.java | 6 +-
.../table/distributed/command/UpdateCommand.java | 6 +-
.../table/distributed/raft/PartitionListener.java | 10 +--
.../replicator/PartitionReplicaListener.java | 26 +++---
.../PartitionRaftCommandsSerializationTest.java | 92 +++++++++++++---------
.../raft/PartitionCommandListenerTest.java | 31 +++++---
.../incoming/IncomingSnapshotCopierTest.java | 2 +-
51 files changed, 254 insertions(+), 245 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java
index 3840140b87..e0c7d86a11 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java
@@ -20,7 +20,6 @@ package
org.apache.ignite.internal.cluster.management.raft.commands;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -28,7 +27,7 @@ import org.apache.ignite.network.annotations.Transferable;
* existing state.
*/
@Transferable(CmgMessageGroup.Commands.INIT_CMG_STATE)
-public interface InitCmgStateCommand extends WriteCommand, NetworkMessage {
+public interface InitCmgStateCommand extends WriteCommand {
/**
* Returns the node that wants to initialize the CMG state.
*/
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java
index 68acd07717..3de5185f76 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java
@@ -19,14 +19,13 @@ package
org.apache.ignite.internal.cluster.management.raft.commands;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Command sent by a node when it's ready to join the cluster and enter the
logical topology.
*/
@Transferable(CmgMessageGroup.Commands.JOIN_READY)
-public interface JoinReadyCommand extends WriteCommand, NetworkMessage {
+public interface JoinReadyCommand extends WriteCommand {
/**
* Returns the node that wants to enter the logical topology.
*/
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java
index a2a8be1a19..7b6731014e 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java
@@ -21,14 +21,13 @@ import
org.apache.ignite.internal.cluster.management.ClusterTag;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Command sent by a node that intends to join a cluster. This command will
trigger node validation.
*/
@Transferable(CmgMessageGroup.Commands.JOIN_REQUEST)
-public interface JoinRequestCommand extends WriteCommand, NetworkMessage {
+public interface JoinRequestCommand extends WriteCommand {
/**
* Returns the node that wants to join a cluster.
*/
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
index e84f72bf6e..a5ebdb06d1 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
@@ -20,14 +20,13 @@ package
org.apache.ignite.internal.cluster.management.raft.commands;
import java.util.Set;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Command that gets executed when nodes need to be removed from the logical
topology.
*/
@Transferable(CmgMessageGroup.Commands.NODES_LEAVE)
-public interface NodesLeaveCommand extends WriteCommand, NetworkMessage {
+public interface NodesLeaveCommand extends WriteCommand {
/**
* Returns the nodes that need to be removed from the logical topology.
*/
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java
index 9d6851300f..161fb70d43 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java
@@ -19,12 +19,11 @@ package
org.apache.ignite.internal.cluster.management.raft.commands;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.raft.ReadCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Command for retrieving the current logical topology.
*/
@Transferable(CmgMessageGroup.Commands.READ_LOGICAL_TOPOLOGY)
-public interface ReadLogicalTopologyCommand extends ReadCommand,
NetworkMessage {
+public interface ReadLogicalTopologyCommand extends ReadCommand {
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java
index d4f250c5c2..5a02e92b8d 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java
@@ -20,12 +20,11 @@ package
org.apache.ignite.internal.cluster.management.raft.commands;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.raft.ReadCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Command for retrieving the current {@link ClusterState}.
*/
@Transferable(CmgMessageGroup.Commands.READ_STATE)
-public interface ReadStateCommand extends ReadCommand, NetworkMessage {
+public interface ReadStateCommand extends ReadCommand {
}
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
index d4351d4d1e..90eef8256c 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
@@ -22,14 +22,13 @@ import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Get all command for MetaStorageCommandListener that retrieves entries for
given keys and the revision upper bound, if latter is present.
*/
@Transferable(MetastorageCommandsMessageGroup.GET_ALL)
-public interface GetAllCommand extends ReadCommand, NetworkMessage {
+public interface GetAllCommand extends ReadCommand {
/**
* Returns the list of keys.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
index 71960b841a..07cd8ebabf 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -30,7 +29,7 @@ import org.apache.ignite.network.annotations.Transferable;
* previous entries for given keys.
*/
@Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT_ALL)
-public interface GetAndPutAllCommand extends WriteCommand, NetworkMessage {
+public interface GetAndPutAllCommand extends WriteCommand {
/**
* Returns keys.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
index 612d833262..f246785763 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable;
* a previous entry for the given key.
*/
@Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT)
-public interface GetAndPutCommand extends WriteCommand, NetworkMessage {
+public interface GetAndPutCommand extends WriteCommand {
/**
* Returns the key. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
index 11f65fe74b..d331779748 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
@@ -22,14 +22,13 @@ import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Get and remove all command for MetaStorageCommandListener that removes
entries for given keys and retrieves previous entries.
*/
@Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE_ALL)
-public interface GetAndRemoveAllCommand extends NetworkMessage, WriteCommand {
+public interface GetAndRemoveAllCommand extends WriteCommand {
/**
* Returns the keys collection. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
index 02124ced93..44a2ca4f02 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable;
* given key.
*/
@Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE)
-public interface GetAndRemoveCommand extends WriteCommand, NetworkMessage {
+public interface GetAndRemoveCommand extends WriteCommand {
/**
* Returns the key. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
index 8b7113ecb3..74fc5ffa05 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
@@ -18,14 +18,13 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.ReadCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Get command for MetaStorageCommandListener that retrieves an entry for the
given key and the revision upper bound, if latter is present.
*/
@Transferable(MetastorageCommandsMessageGroup.GET)
-public interface GetCommand extends ReadCommand, NetworkMessage {
+public interface GetCommand extends ReadCommand {
/**
* Returns key. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
index 06dc51b4af..ed0ee32186 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
@@ -20,14 +20,13 @@ package
org.apache.ignite.internal.metastorage.common.command;
import java.util.List;
import org.apache.ignite.internal.metastorage.common.OperationInfo;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Represents invoke command for meta storage.
*/
@Transferable(MetastorageCommandsMessageGroup.INVOKE)
-public interface InvokeCommand extends NetworkMessage, WriteCommand {
+public interface InvokeCommand extends WriteCommand {
/**
* Returns condition.
*
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java
index 6376694688..7e62fdf2db 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java
@@ -18,14 +18,13 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Represents invoke command with nested conditions and execution branches.
*/
@Transferable(MetastorageCommandsMessageGroup.MULTI_INVOKE)
-public interface MultiInvokeCommand extends NetworkMessage, WriteCommand {
+public interface MultiInvokeCommand extends WriteCommand {
/**
* Returns if statement.
*
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
index 6639ffbb77..1604b85167 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
@@ -22,14 +22,13 @@ import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Put all command for MetaStorageCommandListener that inserts or updates
entries with given keys and given values.
*/
@Transferable(MetastorageCommandsMessageGroup.PUT_ALL)
-public interface PutAllCommand extends WriteCommand, NetworkMessage {
+public interface PutAllCommand extends WriteCommand {
/**
* Returns entries keys.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
index 3fb838a214..a349844d9d 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable;
* previous entry for the given key.
*/
@Transferable(MetastorageCommandsMessageGroup.PUT)
-public interface PutCommand extends WriteCommand, NetworkMessage {
+public interface PutCommand extends WriteCommand {
/**
* Returns the key. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
index 8993e620a4..a750163982 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -27,7 +26,7 @@ import org.apache.ignite.network.annotations.Transferable;
* filtered out by upper bound of given revision number.
*/
@Transferable(MetastorageCommandsMessageGroup.RANGE)
-public interface RangeCommand extends WriteCommand, NetworkMessage {
+public interface RangeCommand extends WriteCommand {
/** Default value for {@link #batchSize}. */
int DEFAULT_BATCH_SIZE = 100;
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
index 15ef2fae21..df6f5c0027 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
@@ -22,14 +22,13 @@ import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Remove all command for MetaStorageCommandListener that removes entries for
given keys.
*/
@Transferable(MetastorageCommandsMessageGroup.REMOVE_ALL)
-public interface RemoveAllCommand extends WriteCommand, NetworkMessage {
+public interface RemoveAllCommand extends WriteCommand {
/**
* Returns the keys list. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
index 0ec5f56a33..0697aa6929 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
@@ -18,14 +18,13 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Remove command for MetaStorageCommandListener that removes an entry for the
given key.
*/
@Transferable(MetastorageCommandsMessageGroup.REMOVE)
-public interface RemoveCommand extends WriteCommand, NetworkMessage {
+public interface RemoveCommand extends WriteCommand {
/**
* Returns the key. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
index 1cf5fc6290..cfca1fb686 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
@@ -23,14 +23,13 @@ import java.util.Set;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Watch command for MetaStorageCommandListener that subscribes on meta
storage updates matching the parameters.
*/
@Transferable(MetastorageCommandsMessageGroup.WATCH_EXACT_KEYS)
-public interface WatchExactKeysCommand extends NetworkMessage, WriteCommand {
+public interface WatchExactKeysCommand extends WriteCommand {
/**
* Returns the keys list. Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
index 013745e066..269ba13d41 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
@@ -27,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
* Watch command for MetaStorageCommandListener that subscribes on meta
storage updates matching the parameters.
*/
@Transferable(MetastorageCommandsMessageGroup.WATCH_RANGE_KEYS)
-public interface WatchRangeKeysCommand extends NetworkMessage, WriteCommand {
+public interface WatchRangeKeysCommand extends WriteCommand {
/**
* Returns start key of range (inclusive). Couldn't be {@code null}.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java
index edde63db1e..17adffeae7 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java
@@ -20,14 +20,13 @@ package
org.apache.ignite.internal.metastorage.common.command.cursor;
import
org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Cursor close command for MetaStorageCommandListener that closes cursor with
given id.
*/
@Transferable(MetastorageCommandsMessageGroup.CURSOR_CLOSE)
-public interface CursorCloseCommand extends NetworkMessage, WriteCommand {
+public interface CursorCloseCommand extends WriteCommand {
/**
* Returns cursor id.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java
index be1ec84098..7304792b6e 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java
@@ -20,14 +20,13 @@ package
org.apache.ignite.internal.metastorage.common.command.cursor;
import
org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Cursor {@code hasNext} command for MetaStorageCommandListener that checks
whether next element is available.
*/
@Transferable(MetastorageCommandsMessageGroup.CURSOR_HAS_NEXT)
-public interface CursorHasNextCommand extends NetworkMessage, ReadCommand {
+public interface CursorHasNextCommand extends ReadCommand {
/**
* Returns cursor id.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java
index a5c2049937..7fb92db4f0 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java
@@ -20,14 +20,13 @@ package
org.apache.ignite.internal.metastorage.common.command.cursor;
import
org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Cursor {@code next} command for MetaStorageCommandListener that returns
next element and moves cursor.
*/
@Transferable(MetastorageCommandsMessageGroup.CURSOR_NEXT)
-public interface CursorNextCommand extends NetworkMessage, WriteCommand {
+public interface CursorNextCommand extends WriteCommand {
/**
* Returns cursor id.
*/
diff --git
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java
index 2abe0d8671..5c7132df6e 100644
---
a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java
+++
b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.metastorage.common.command.cursor;
import
org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -27,7 +26,7 @@ import org.apache.ignite.network.annotations.Transferable;
* topology.
*/
@Transferable(MetastorageCommandsMessageGroup.CURSORS_CLOSE)
-public interface CursorsCloseCommand extends NetworkMessage, WriteCommand {
+public interface CursorsCloseCommand extends WriteCommand {
/**
* Returns cursor id.
*/
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java
index dd0de21e48..570b32732f 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.raft;
-import java.io.Serializable;
+import org.apache.ignite.network.NetworkMessage;
/**
* A marker interface for replication group command.
*/
-public interface Command extends Serializable {
+public interface Command extends NetworkMessage {
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index ac356b9cd2..6a1b912558 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -25,6 +25,8 @@ import static
org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static
org.apache.ignite.raft.server.counter.GetValueCommand.getValueCommand;
+import static
org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -63,11 +65,11 @@ import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
import org.apache.ignite.raft.server.counter.CounterListener;
import org.apache.ignite.raft.server.counter.GetValueCommand;
import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
import org.apache.ignite.raft.server.snasphot.SnapshotInMemoryStorageFactory;
-import org.apache.ignite.raft.server.snasphot.TestWriteCommand;
import org.apache.ignite.raft.server.snasphot.UpdateCountRaftListener;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
@@ -199,15 +201,15 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
assertNotNull(client1.leader());
assertNotNull(client2.leader());
- assertEquals(2, client1.<Long>run(new
IncrementAndGetCommand(2)).get());
- assertEquals(2, client1.<Long>run(new GetValueCommand()).get());
- assertEquals(3, client1.<Long>run(new
IncrementAndGetCommand(1)).get());
- assertEquals(3, client1.<Long>run(new GetValueCommand()).get());
+ assertEquals(2, client1.<Long>run(incrementAndGetCommand(2)).get());
+ assertEquals(2, client1.<Long>run(getValueCommand()).get());
+ assertEquals(3, client1.<Long>run(incrementAndGetCommand(1)).get());
+ assertEquals(3, client1.<Long>run(getValueCommand()).get());
- assertEquals(4, client2.<Long>run(new
IncrementAndGetCommand(4)).get());
- assertEquals(4, client2.<Long>run(new GetValueCommand()).get());
- assertEquals(7, client2.<Long>run(new
IncrementAndGetCommand(3)).get());
- assertEquals(7, client2.<Long>run(new GetValueCommand()).get());
+ assertEquals(4, client2.<Long>run(incrementAndGetCommand(4)).get());
+ assertEquals(4, client2.<Long>run(getValueCommand()).get());
+ assertEquals(7, client2.<Long>run(incrementAndGetCommand(3)).get());
+ assertEquals(7, client2.<Long>run(getValueCommand()).get());
}
@Test
@@ -369,7 +371,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
assertEquals(sum(9), val3);
try {
- client1.<Long>run(new IncrementAndGetCommand(10)).get();
+ client1.<Long>run(incrementAndGetCommand(10)).get();
fail();
} catch (Exception e) {
@@ -384,7 +386,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
// Client can't switch to new leader, because only one peer in the
list.
try {
- client1.<Long>run(new IncrementAndGetCommand(11)).get();
+ client1.<Long>run(incrementAndGetCommand(11)).get();
} catch (Exception e) {
boolean isValid = e.getCause() instanceof TimeoutException;
@@ -437,7 +439,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
assertNotNull(leader);
try {
- client1.<Long>run(new IncrementAndGetCommand(3)).get();
+ client1.<Long>run(incrementAndGetCommand(3)).get();
fail();
} catch (Exception e) {
@@ -450,7 +452,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
}
try {
- client1.<Long>run(new GetValueCommand()).get();
+ client1.<Long>run(getValueCommand()).get();
fail();
} catch (Exception e) {
@@ -573,19 +575,21 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
raftClient.refreshMembers(true).get();
var peers = raftClient.peers();
- raftClient.run(new TestWriteCommand());
+ var testWriteCommandBuilder = new
TestRaftMessagesFactory().testWriteCommand();
+
+ raftClient.run(testWriteCommandBuilder.build());
assertTrue(TestUtils.waitForCondition(() -> counters.get(0).get() ==
1, 10_000));
raftClient.snapshot(peers.get(0)).get();
- raftClient.run(new TestWriteCommand());
+ raftClient.run(testWriteCommandBuilder.build());
assertTrue(TestUtils.waitForCondition(() -> counters.get(1).get() ==
2, 10_000));
raftClient.snapshot(peers.get(1)).get();
- raftClient.run(new TestWriteCommand());
+ raftClient.run(testWriteCommandBuilder.build());
for (AtomicInteger counter : counters.values()) {
assertTrue(TestUtils.waitForCondition(() -> counter.get() == 3,
10_000));
@@ -767,7 +771,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
long val = 0;
for (int i = start; i <= stop; i++) {
- val = client.<Long>run(new IncrementAndGetCommand(i)).get();
+ val = client.<Long>run(incrementAndGetCommand(i)).get();
logger().info("Val={}, i={}", val, i);
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
index c98c6b5e76..fad2f441e5 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.server;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
+import static
org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -33,7 +34,6 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.raft.server.counter.CounterListener;
-import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
import org.junit.jupiter.api.Test;
/**
@@ -102,7 +102,7 @@ public class ItSafeTimeTest extends JraftAbstractTest {
clocks.get(leaderIndex).update(new HybridTimestamp(leaderPhysicalTime,
0));
- client1.run(new IncrementAndGetCommand(1)).get();
+ client1.run(incrementAndGetCommand(1)).get();
waitForCondition(() -> {
for (int i = 0; i < NODES; i++) {
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index f2b3967160..e8ac6b4a1d 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.raft.server;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static
org.apache.ignite.raft.server.counter.GetValueCommand.getValueCommand;
+import static
org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -40,8 +42,6 @@ import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.server.counter.CounterListener;
-import org.apache.ignite.raft.server.counter.GetValueCommand;
-import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -162,14 +162,14 @@ class ItSimpleCounterServerTest extends
RaftServerAbstractTest {
assertNotNull(client1.leader());
assertNotNull(client2.leader());
- assertEquals(2, client1.<Long>run(new
IncrementAndGetCommand(2)).get());
- assertEquals(2, client1.<Long>run(new GetValueCommand()).get());
- assertEquals(3, client1.<Long>run(new
IncrementAndGetCommand(1)).get());
- assertEquals(3, client1.<Long>run(new GetValueCommand()).get());
+ assertEquals(2, client1.<Long>run(incrementAndGetCommand(2)).get());
+ assertEquals(2, client1.<Long>run(getValueCommand()).get());
+ assertEquals(3, client1.<Long>run(incrementAndGetCommand(1)).get());
+ assertEquals(3, client1.<Long>run(getValueCommand()).get());
- assertEquals(4, client2.<Long>run(new
IncrementAndGetCommand(4)).get());
- assertEquals(4, client2.<Long>run(new GetValueCommand()).get());
- assertEquals(7, client2.<Long>run(new
IncrementAndGetCommand(3)).get());
- assertEquals(7, client2.<Long>run(new GetValueCommand()).get());
+ assertEquals(4, client2.<Long>run(incrementAndGetCommand(4)).get());
+ assertEquals(4, client2.<Long>run(getValueCommand()).get());
+ assertEquals(7, client2.<Long>run(incrementAndGetCommand(3)).get());
+ assertEquals(7, client2.<Long>run(getValueCommand()).get());
}
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
deleted file mode 100644
index b154b64cb3..0000000000
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.server.snasphot;
-
-import org.apache.ignite.internal.raft.WriteCommand;
-
-/**
- * Test write command.
- */
-public class TestWriteCommand implements WriteCommand {
-}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
index ceec8b5fa0..d81b6d9575 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.raft.TestWriteCommand;
/**
* The RAFT state machine counts applied write commands and stores the result
into {@link java.util.concurrent.atomic.AtomicLong} that is
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 6df4e505a4..6e49bae9bc 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteInternalException;
@@ -76,7 +77,7 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
-import org.apache.ignite.raft.jraft.util.JDKMarshaller;
+import org.apache.ignite.raft.jraft.util.Marshaller;
import org.apache.ignite.raft.jraft.util.Utils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -113,6 +114,9 @@ public class JraftServerImpl implements RaftServer {
/** Request executor. */
private ExecutorService requestExecutor;
+ /** Marshaller for RAFT commands. */
+ private final Marshaller commandsMarshaller;
+
/** The number of parallel raft groups starts. */
private static final int SIMULTANEOUS_GROUP_START_PARALLELISM =
Math.min(Utils.cpus() * 3, 25);
@@ -168,6 +172,8 @@ public class JraftServerImpl implements RaftServer {
}
startGroupInProgressMonitors = Collections.unmodifiableList(monitors);
+
+ commandsMarshaller = new
ThreadLocalOptimizedMarshaller(service.localConfiguration().getSerializationRegistry());
}
/** {@inheritDoc} */
@@ -387,7 +393,7 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
- nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
+ nodeOptions.setFsm(new DelegatingStateMachine(lsnr,
commandsMarshaller));
nodeOptions.setRaftGrpEvtsLsnr(new
RaftGroupEventsListenerAdapter(evLsnr));
@@ -519,13 +525,17 @@ public class JraftServerImpl implements RaftServer {
public static class DelegatingStateMachine extends StateMachineAdapter {
private final RaftGroupListener listener;
+ private final Marshaller marshaller;
+
/**
* Constructor.
*
* @param listener The listener.
+ * @param marshaller Marshaller.
*/
- DelegatingStateMachine(RaftGroupListener listener) {
+ DelegatingStateMachine(RaftGroupListener listener, Marshaller
marshaller) {
this.listener = listener;
+ this.marshaller = marshaller;
}
public RaftGroupListener getListener() {
@@ -547,7 +557,7 @@ public class JraftServerImpl implements RaftServer {
@Nullable CommandClosure<WriteCommand> done =
(CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();
- WriteCommand command = done == null ?
JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command();
+ WriteCommand command = done == null ?
marshaller.unmarshall(data.array()) : done.command();
long commandIndex = iter.getIndex();
long commandTerm = iter.getTerm();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index 0d0a29ec5c..b966f6e275 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -42,7 +42,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.util.BytesUtil;
-import org.apache.ignite.raft.jraft.util.JDKMarshaller;
+import org.apache.ignite.raft.jraft.util.Marshaller;
/**
* Process action request.
@@ -54,9 +54,12 @@ public class ActionRequestProcessor implements
RpcProcessor<ActionRequest> {
private final RaftMessagesFactory factory;
- public ActionRequestProcessor(Executor executor, RaftMessagesFactory
factory) {
+ private final Marshaller commandsMarshaller;
+
+ public ActionRequestProcessor(Executor executor, RaftMessagesFactory
factory, Marshaller commandsMarshaller) {
this.executor = executor;
this.factory = factory;
+ this.commandsMarshaller = commandsMarshaller;
}
/** {@inheritDoc} */
@@ -83,8 +86,7 @@ public class ActionRequestProcessor implements
RpcProcessor<ActionRequest> {
* @param rpcCtx The context.
*/
private void applyWrite(Node node, ActionRequest request, RpcContext
rpcCtx) {
- // TODO asch get rid of JDK marshaller IGNITE-14832
- node.apply(new
Task(ByteBuffer.wrap(JDKMarshaller.DEFAULT.marshall(request.command())),
+ node.apply(new
Task(ByteBuffer.wrap(commandsMarshaller.marshall(request.command())),
new CommandClosureImpl<>(request.command()) {
@Override
public void result(Serializable res) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 3326f533c5..64bf8eb84c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -114,7 +115,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new RemoveLearnersRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new ResetLearnersRequestProcessor(rpcExecutor,
raftMessagesFactory));
// common client integration
- registerProcessor(new ActionRequestProcessor(rpcExecutor,
raftMessagesFactory));
+ var commandsMarshaller = new
ThreadLocalOptimizedMarshaller(service.localConfiguration().getSerializationRegistry());
+ registerProcessor(new ActionRequestProcessor(rpcExecutor,
raftMessagesFactory, commandsMarshaller));
var messageHandler = new RpcMessageHandler();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index d89c7072cb..4862af2381 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toUnmodifiableList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.raft.TestWriteCommand.testWriteCommand;
import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -58,6 +59,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.TestWriteCommand;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -204,7 +206,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertThat(service.refreshLeader(), willCompleteSuccessfully());
- assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+ assertThat(service.run(testWriteCommand()),
willBe(instanceOf(TestResponse.class)));
}
@Test
@@ -216,7 +218,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertNull(service.leader());
- assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+ assertThat(service.run(testWriteCommand()),
willBe(instanceOf(TestResponse.class)));
assertEquals(leader, service.leader());
}
@@ -228,7 +230,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
RaftGroupService service = startRaftGroupService(NODES, false);
- assertThat(service.run(new TestCommand()),
willThrow(TimeoutException.class, 500, TimeUnit.MILLISECONDS));
+ assertThat(service.run(testWriteCommand()),
willThrow(TimeoutException.class, 500, TimeUnit.MILLISECONDS));
}
@Test
@@ -246,7 +248,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertEquals(leader, service.leader());
- assertThat(service.run(new TestCommand()),
willThrow(TimeoutException.class));
+ assertThat(service.run(testWriteCommand()),
willThrow(TimeoutException.class));
}
@Test
@@ -266,7 +268,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
executor.schedule((Runnable) () -> this.leader = NODES.get(0), 500,
TimeUnit.MILLISECONDS);
- assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+ assertThat(service.run(testWriteCommand()),
willBe(instanceOf(TestResponse.class)));
assertEquals(NODES.get(0), service.leader());
}
@@ -295,7 +297,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
500, TimeUnit.MILLISECONDS
);
- assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+ assertThat(service.run(testWriteCommand()),
willBe(instanceOf(TestResponse.class)));
assertEquals(NODES.get(1), service.leader());
}
@@ -319,7 +321,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertNotEquals(leader, newLeader);
// Runs the command on an old leader. It should respond with leader
changed error, when transparently retry.
- assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+ assertThat(service.run(testWriteCommand()),
willBe(instanceOf(TestResponse.class)));
assertEquals(newLeader, service.leader());
}
@@ -607,7 +609,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
argThat(new ArgumentMatcher<ActionRequest>() {
@Override
public boolean matches(ActionRequest arg) {
- return arg.command() instanceof TestCommand;
+ return arg.command() instanceof TestWriteCommand;
}
}),
anyLong()
@@ -701,9 +703,6 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
}
- private static class TestCommand implements WriteCommand {
- }
-
private static class TestResponse {
}
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java
index 92a364da1a..502f86bbf5 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java
@@ -18,13 +18,17 @@
package org.apache.ignite.raft;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
/**
* Test WriteCommand.
*/
@Transferable(10)
-public interface TestWriteCommand extends NetworkMessage, WriteCommand {
+public interface TestWriteCommand extends WriteCommand {
String value();
+
+ static TestWriteCommand testWriteCommand() {
+ return new TestRaftMessagesFactory().testWriteCommand().build();
+ }
}
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java
index 6dbca8cdef..c8530ab39f 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java
@@ -18,10 +18,17 @@
package org.apache.ignite.raft.messages;
import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.raft.server.counter.GetValueCommand;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
/**
* Message group for tests.
*/
@MessageGroup(groupType = 4, groupName = "TestRaftMessages")
public interface TestMessageGroup {
+ /** Message type for {@link GetValueCommand}. */
+ short GET_VALUE_COMMAND = 1000;
+
+ /** Message type for {@link IncrementAndGetCommand}. */
+ short INCREMENT_AND_GET_COMMAND = 1001;
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
similarity index 64%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
rename to
modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
index 028b5bf46d..5d319f096d 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/GetValueCommand.java
@@ -18,9 +18,17 @@
package org.apache.ignite.raft.server.counter;
import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.messages.TestMessageGroup;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
/**
* Get a value command.
*/
-public class GetValueCommand implements ReadCommand {
+//TODO IGNITE-18357 Move to integration test directory when Maven build is not
supported anymore.
+@Transferable(TestMessageGroup.GET_VALUE_COMMAND)
+public interface GetValueCommand extends ReadCommand {
+ static GetValueCommand getValueCommand() {
+ return new TestRaftMessagesFactory().getValueCommand().build();
+ }
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
similarity index 63%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
rename to
modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
index ebc6bb9434..cdc6c46fba 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java
@@ -18,29 +18,22 @@
package org.apache.ignite.raft.server.counter;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.messages.TestMessageGroup;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
/**
* Increment and get command.
*/
-public class IncrementAndGetCommand implements WriteCommand {
- /**
- * The delta.
- */
- private final long delta;
-
- /**
- * Constructor.
- *
- * @param delta The delta.
- */
- public IncrementAndGetCommand(long delta) {
- this.delta = delta;
- }
-
+//TODO IGNITE-18357 Move to integration test directory when Maven build is not
supported anymore.
+@Transferable(TestMessageGroup.INCREMENT_AND_GET_COMMAND)
+public interface IncrementAndGetCommand extends WriteCommand {
/**
* Returns the delta.
*/
- public long delta() {
- return delta;
+ long delta();
+
+ static IncrementAndGetCommand incrementAndGetCommand(long delta) {
+ return new
TestRaftMessagesFactory().incrementAndGetCommand().delta(delta).build();
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
index dc9df4e5a4..86549c26e7 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
@@ -18,9 +18,12 @@
package org.apache.ignite.internal.replicator.command;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
/**
* Write command to synchronize safe time periodically.
*/
-public class SafeTimeSyncCommand implements WriteCommand {
+@Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_COMMAND)
+public interface SafeTimeSyncCommand extends WriteCommand {
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index 62e4aa1ddb..d01089d5f1 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -17,34 +17,38 @@
package org.apache.ignite.internal.replicator.message;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.network.annotations.MessageGroup;
/**
* Message group for the replication process.
*/
@MessageGroup(groupType = 8, groupName = "ReplicaMessages")
-public class ReplicaMessageGroup {
+public interface ReplicaMessageGroup {
/** Message type for {@link ErrorReplicaResponse}. */
- public static final short ERROR_REPLICA_RESPONSE = 1;
+ short ERROR_REPLICA_RESPONSE = 1;
/** Message type for {@link ReplicaResponse}. */
- public static final short REPLICA_RESPONSE = 2;
+ short REPLICA_RESPONSE = 2;
/** Message type for {@link TimestampAware}. */
- public static final short TIMESTAMP_AWARE = 3;
+ short TIMESTAMP_AWARE = 3;
/** Message type for {@link ErrorTimestampAwareReplicaResponse}. */
- public static final short ERROR_TIMESTAMP_AWARE_REPLICA_RESPONSE = 4;
+ short ERROR_TIMESTAMP_AWARE_REPLICA_RESPONSE = 4;
/** Message type for {@link TimestampAwareReplicaResponse}. */
- public static final short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5;
+ short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5;
/** Message type for {@link ReplicaSafeTimeSyncRequest}. */
- public static final short SAFE_TIME_SYNC_REQUEST = 6;
+ short SAFE_TIME_SYNC_REQUEST = 6;
/** Message type for {@link AwaitReplicaRequest}. */
- public static final short AWAIT_REPLICA_REQUEST = 7;
+ short AWAIT_REPLICA_REQUEST = 7;
/** Message type for {@link AwaitReplicaResponse}. */
- public static final short AWAIT_REPLICA_RESPONSE = 8;
+ short AWAIT_REPLICA_RESPONSE = 8;
+
+ /** Message type for {@link SafeTimeSyncCommand}. */
+ short SAFE_TIME_SYNC_COMMAND = 40;
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 4da5b96154..629018f065 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -35,6 +35,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -60,6 +61,7 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
@@ -92,7 +94,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -190,10 +191,9 @@ public class ItColocationTest {
RaftGroupService r = groupRafts.get(request.groupId());
if (request instanceof ReadWriteMultiRowReplicaRequest) {
- Map<UUID, ByteString> rows =
((ReadWriteMultiRowReplicaRequest) request).binaryRows()
+ Map<UUID, ByteBuffer> rows =
((ReadWriteMultiRowReplicaRequest) request).binaryRows()
.stream()
- .collect(toMap(
- row -> Timestamp.nextVersion().toUuid(), row
-> new ByteString(row.byteBuffer())));
+ .collect(toMap(row ->
Timestamp.nextVersion().toUuid(), BinaryRow::byteBuffer));
return r.run(MSG_FACTORY.updateAllCommand()
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
@@ -215,7 +215,7 @@ public class ItColocationTest {
.build()
)
.rowUuid(Timestamp.nextVersion().toUuid())
- .rowBuffer(new
ByteString(((ReadWriteSingleRowReplicaRequest)
request).binaryRow().byteBuffer()))
+ .rowBuffer(((ReadWriteSingleRowReplicaRequest)
request).binaryRow().byteBuffer())
.txId(UUID.randomUUID())
.build());
}
@@ -388,8 +388,8 @@ public class ItColocationTest {
UpdateAllCommand cmd = (UpdateAllCommand)
CollectionUtils.first(set);
assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () ->
"part=" + p + ", set=" + set);
- cmd.rowsToUpdate().values().forEach(byteStr -> {
- Row r = new Row(schema, new
ByteBufferRow(byteStr.toByteArray()));
+ cmd.rowsToUpdate().values().forEach(byteBuffer -> {
+ Row r = new Row(schema, new ByteBufferRow(byteBuffer));
assertEquals(INT_TABLE.partition(r), p);
});
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 2a4f6f96c8..7955b34108 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.distributed;
+import static
org.apache.ignite.internal.table.distributed.TableMessageGroup.GROUP_TYPE;
+
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
@@ -45,8 +47,11 @@ import org.apache.ignite.network.annotations.MessageGroup;
/**
* Message group for the table module.
*/
-@MessageGroup(groupType = 9, groupName = "TableMessages")
+@MessageGroup(groupType = GROUP_TYPE, groupName = "TableMessages")
public interface TableMessageGroup {
+ /** Table message group type. */
+ short GROUP_TYPE = 9;
+
/**
* Message type for {@link ReadWriteSingleRowReplicaRequest}.
*/
@@ -135,7 +140,7 @@ public interface TableMessageGroup {
/**
* Message types for Table module RAFT commands.
*/
- public interface Commands {
+ interface Commands {
/** Message type for {@link FinishTxCommand}. */
short FINISH_TX = 40;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index 40ea915476..33c7547567 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -19,12 +19,11 @@ package
org.apache.ignite.internal.table.distributed.command;
import java.util.UUID;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.network.NetworkMessage;
/**
* Partition transactional command.
*/
-public interface PartitionCommand extends WriteCommand, NetworkMessage {
+public interface PartitionCommand extends WriteCommand {
/**
* Returns a transaction id.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 46e112f05c..9654deff74 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.table.distributed.command;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.jraft.util.ByteString;
/**
* State machine command for updating a batch of entries.
@@ -31,6 +30,5 @@ import org.apache.ignite.raft.jraft.util.ByteString;
public interface UpdateAllCommand extends PartitionCommand {
TablePartitionIdMessage tablePartitionId();
- @Marshallable
- Map<UUID, ByteString> rowsToUpdate();
+ Map<UUID, ByteBuffer> rowsToUpdate();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 468a5c8f4d..ab9746eaac 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -17,11 +17,10 @@
package org.apache.ignite.internal.table.distributed.command;
+import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
-import org.apache.ignite.raft.jraft.util.ByteString;
/**
* State machine command to update a row specified by a row id.
@@ -32,6 +31,5 @@ public interface UpdateCommand extends PartitionCommand {
UUID rowUuid();
- @Marshallable
- ByteString rowBuffer();
+ ByteBuffer rowBuffer();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 1439cd9424..47d872a56e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_UNEXPECTED_STATE_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
+import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
@@ -60,7 +61,6 @@ import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.raft.jraft.util.ByteString;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -184,7 +184,7 @@ public class PartitionListener implements RaftGroupListener
{
}
storage.runConsistently(() -> {
- BinaryRow row = cmd.rowBuffer() != null ? new
ByteBufferRow(cmd.rowBuffer().toByteArray()) : null;
+ BinaryRow row = cmd.rowBuffer() != null ? new
ByteBufferRow(cmd.rowBuffer()) : null;
UUID rowUuid = cmd.rowUuid();
RowId rowId = new RowId(partitionId, rowUuid);
UUID txId = cmd.txId();
@@ -218,14 +218,14 @@ public class PartitionListener implements
RaftGroupListener {
storage.runConsistently(() -> {
UUID txId = cmd.txId();
- Map<UUID, ByteString> rowsToUpdate = cmd.rowsToUpdate();
+ Map<UUID, ByteBuffer> rowsToUpdate = cmd.rowsToUpdate();
UUID commitTblId = cmd.tablePartitionId().tableId();
int commitPartId = cmd.tablePartitionId().partitionId();
if (!nullOrEmpty(rowsToUpdate)) {
- for (Map.Entry<UUID, ByteString> entry :
rowsToUpdate.entrySet()) {
+ for (Map.Entry<UUID, ByteBuffer> entry :
rowsToUpdate.entrySet()) {
RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() != null ? new
ByteBufferRow(entry.getValue().toByteArray()) : null;
+ BinaryRow row = entry.getValue() != null ? new
ByteBufferRow(entry.getValue()) : null;
storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 480bc8d36a..00a836e667 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -52,12 +52,12 @@ import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -111,7 +111,6 @@ import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.raft.jraft.util.ByteString;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -120,6 +119,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Factory to create RAFT command messages. */
private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+ /** Factory for creating replica command messages. */
+ private final ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+
/** Tx messages factory. */
private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
@@ -505,7 +507,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future.
*/
private CompletionStage<Void>
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
- return raftClient.run(new SafeTimeSyncCommand());
+ return
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().build());
}
/**
@@ -1159,7 +1161,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(rowIdLockFuts).thenCompose(ignore -> {
- Map<UUID, ByteString> rowIdsToDelete = new HashMap<>();
+ Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
@@ -1198,7 +1200,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
- Map<UUID, ByteString> rowIdsToDelete = new HashMap<>();
+ Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
@@ -1263,10 +1265,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
insertLockFuts[idx++] =
takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
}
- Map<UUID, ByteString> convertedMap =
rowsToInsert.entrySet().stream().collect(
+ Map<UUID, ByteBuffer> convertedMap =
rowsToInsert.entrySet().stream().collect(
Collectors.toMap(
e -> e.getKey().uuid(),
- e -> new
ByteString(e.getValue().byteBuffer())));
+ e -> e.getValue().byteBuffer()));
return allOf(insertLockFuts)
.thenCompose(ignored ->
applyCmdWithExceptionHandling(
@@ -1300,14 +1302,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return allOf(rowIdFuts).thenCompose(ignore -> {
- Map<UUID, ByteString> rowsToUpdate = new HashMap<>();
+ Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
RowId lockedRow = rowIdFuts[futNum++].join().get1();
- rowsToUpdate.put(lockedRow.uuid(), new
ByteString(row.byteBuffer()));
+ rowsToUpdate.put(lockedRow.uuid(), row.byteBuffer());
}
if (rowsToUpdate.isEmpty()) {
@@ -1911,7 +1913,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
.txId(txId);
if (rowBuf != null) {
- bldr.rowBuffer(new ByteString(rowBuf));
+ bldr.rowBuffer(rowBuf);
}
return bldr.build();
@@ -1921,11 +1923,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Method to construct {@link UpdateAllCommand} object.
*
* @param tablePartId {@link TablePartitionId} object to construct {@link
UpdateCommand} object with.
- * @param rowsToUpdate All {@link BinaryRow}s represented as {@link
ByteString}s to be updated.
+ * @param rowsToUpdate All {@link BinaryRow}s represented as {@link
ByteBuffer}s to be updated.
* @param txId Transaction ID.
* @return Constructed {@link UpdateAllCommand} object.
*/
- private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId,
Map<UUID, ByteString> rowsToUpdate, UUID txId) {
+ private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId,
Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) {
return msgFactory.updateAllCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowsToUpdate(rowsToUpdate)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index eb6f5cb9b2..7d885903f9 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -17,16 +17,12 @@
package org.apache.ignite.internal.table.distributed.command;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Locale;
@@ -36,17 +32,18 @@ import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.Timestamp;
-import org.apache.ignite.raft.jraft.util.ByteString;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -84,7 +81,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
.build()
)
.rowUuid(Timestamp.nextVersion().toUuid())
- .rowBuffer(byteStrFromBinaryRow(1))
+ .rowBuffer(byteBufferFromBinaryRow(1))
.txId(UUID.randomUUID())
.build();
@@ -92,7 +89,7 @@ public class PartitionRaftCommandsSerializationTest extends
IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
assertEquals(cmd.rowUuid(), readCmd.rowUuid());
- assertArrayEquals(cmd.rowBuffer().toByteArray(),
readCmd.rowBuffer().toByteArray());
+ assertEquals(cmd.rowBuffer(), readCmd.rowBuffer());
}
@Test
@@ -116,10 +113,10 @@ public class PartitionRaftCommandsSerializationTest
extends IgniteAbstractTest {
@Test
public void testUpdateAllCommand() throws Exception {
- HashMap<UUID, ByteString> rowsToUpdate = new HashMap();
+ Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>();
for (int i = 0; i < 10; i++) {
- rowsToUpdate.put(Timestamp.nextVersion().toUuid(),
byteStrFromBinaryRow(i));
+ rowsToUpdate.put(Timestamp.nextVersion().toUuid(),
byteBufferFromBinaryRow(i));
}
var cmd = msgFactory.updateAllCommand()
@@ -136,19 +133,19 @@ public class PartitionRaftCommandsSerializationTest
extends IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
- for (Map.Entry<UUID, ByteString> entry :
cmd.rowsToUpdate().entrySet()) {
+ for (Map.Entry<UUID, ByteBuffer> entry :
cmd.rowsToUpdate().entrySet()) {
assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey()));
var readVal = readCmd.rowsToUpdate().get(entry.getKey());
var val = entry.getValue();
- assertArrayEquals(val.toByteArray(), readVal.toByteArray());
+ assertEquals(val, readVal);
}
}
@Test
public void testRemoveAllCommand() throws Exception {
- Map<UUID, ByteString> rowsToRemove = new HashMap<>();
+ Map<UUID, ByteBuffer> rowsToRemove = new HashMap<>();
for (int i = 0; i < 10; i++) {
rowsToRemove.put(Timestamp.nextVersion().toUuid(), null);
@@ -225,33 +222,52 @@ public class PartitionRaftCommandsSerializationTest
extends IgniteAbstractTest {
.build();
}
- private <T> T copyCommand(T cmd) throws Exception {
- return cmdFromBytes(cmdToBytes(cmd));
- }
-
- private <T> T cmdFromBytes(byte[] bytes) throws IOException,
ClassNotFoundException {
- try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
- try (ObjectInputStream ois = new ObjectInputStream(bais)) {
- return (T) ois.readObject();
- }
- }
- }
-
- private <T> byte[] cmdToBytes(T cmd) throws IOException {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
- oos.writeObject(cmd);
- }
-
- baos.flush();
-
- return baos.toByteArray();
+ private <T extends Command> T copyCommand(T cmd) {
+ assertEquals(TableMessageGroup.GROUP_TYPE, cmd.groupType());
+
+ if (cmd instanceof FinishTxCommand) {
+ FinishTxCommand finishTxCommand = (FinishTxCommand) cmd;
+
+ return (T) msgFactory.finishTxCommand()
+ .txId(finishTxCommand.txId())
+ .commit(finishTxCommand.commit())
+ .tablePartitionIds(finishTxCommand.tablePartitionIds())
+ .commitTimestamp(finishTxCommand.commitTimestamp())
+ .build();
+ } else if (cmd instanceof TxCleanupCommand) {
+ TxCleanupCommand txCleanupCommand = (TxCleanupCommand) cmd;
+
+ return (T) msgFactory.txCleanupCommand()
+ .txId(txCleanupCommand.txId())
+ .commit(txCleanupCommand.commit())
+ .commitTimestamp(txCleanupCommand.commitTimestamp())
+ .build();
+ } else if (cmd instanceof UpdateCommand) {
+ UpdateCommand updateCommand = (UpdateCommand) cmd;
+
+ return (T) msgFactory.updateCommand()
+ .txId(updateCommand.txId())
+ .rowUuid(updateCommand.rowUuid())
+ .tablePartitionId(updateCommand.tablePartitionId())
+ .rowBuffer(updateCommand.rowBuffer())
+ .build();
+ } else if (cmd instanceof UpdateAllCommand) {
+ UpdateAllCommand updateCommand = (UpdateAllCommand) cmd;
+
+ return (T) msgFactory.updateAllCommand()
+ .txId(updateCommand.txId())
+ .rowsToUpdate(updateCommand.rowsToUpdate())
+ .tablePartitionId(updateCommand.tablePartitionId())
+ .build();
+ } else {
+ fail(cmd.toString());
+
+ return null;
}
}
- private static ByteString byteStrFromBinaryRow(int id) throws Exception {
- return new ByteString(kvMarshaller.marshal(new TestKey(id,
String.valueOf(id)), new TestValue(id, String.valueOf(id)))
- .byteBuffer());
+ private static ByteBuffer byteBufferFromBinaryRow(int id) throws Exception
{
+ return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new
TestValue(id, String.valueOf(id))).byteBuffer();
}
/**
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 6097adac4c..d688b11635 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
@@ -58,6 +59,8 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
+import
org.apache.ignite.internal.replicator.command.SafeTimeSyncCommandBuilder;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -92,7 +95,6 @@ import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.jraft.util.ByteString;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -336,8 +338,10 @@ public class PartitionCommandListenerTest {
@Test
void updatesLastAppliedForSafeTimeSyncCommands() {
+ SafeTimeSyncCommand safeTimeSyncCommand = new
ReplicaMessagesFactory().safeTimeSyncCommand().build();
+
commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, new SafeTimeSyncCommand(),
commandClosureResultCaptor)
+ writeCommandCommandClosure(3, 2, safeTimeSyncCommand,
commandClosureResultCaptor)
).iterator());
verify(mvPartitionStorage).lastApplied(3, 2);
@@ -345,8 +349,10 @@ public class PartitionCommandListenerTest {
@Test
void locksOnCommandApplication() {
+ SafeTimeSyncCommandBuilder safeTimeSyncCommand = new
ReplicaMessagesFactory().safeTimeSyncCommand();
+
commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, new SafeTimeSyncCommand(),
commandClosureResultCaptor)
+ writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(),
commandClosureResultCaptor)
).iterator());
InOrder inOrder = inOrder(partitionDataStorage);
@@ -490,14 +496,14 @@ public class PartitionCommandListenerTest {
* Inserts all rows.
*/
private void insertAll() {
- HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT);
+ Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT);
UUID txId = Timestamp.nextVersion().toUuid();
var commitPartId = new TablePartitionId(txId, PARTITION_ID);
for (int i = 0; i < KEY_COUNT; i++) {
Row row = getTestRow(i, i);
- rows.put(Timestamp.nextVersion().toUuid(), new
ByteString(row.byteBuffer()));
+ rows.put(Timestamp.nextVersion().toUuid(), row.byteBuffer());
}
HybridTimestamp commitTimestamp = CLOCK.now();
@@ -529,12 +535,12 @@ public class PartitionCommandListenerTest {
private void updateAll(Function<Integer, Integer> keyValueMapper) {
UUID txId = Timestamp.nextVersion().toUuid();
var commitPartId = new TablePartitionId(txId, PARTITION_ID);
- HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT);
+ Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT);
for (int i = 0; i < KEY_COUNT; i++) {
Row row = getTestRow(i, keyValueMapper.apply(i));
- rows.put(readRow(row).uuid(), new ByteString(row.byteBuffer()));
+ rows.put(readRow(row).uuid(), row.byteBuffer());
}
HybridTimestamp commitTimestamp = CLOCK.now();
@@ -564,7 +570,7 @@ public class PartitionCommandListenerTest {
private void deleteAll() {
UUID txId = Timestamp.nextVersion().toUuid();
var commitPartId = new TablePartitionId(txId, PARTITION_ID);
- Map<UUID, ByteString> keyRows = new HashMap<>(KEY_COUNT);
+ Map<UUID, ByteBuffer> keyRows = new HashMap<>(KEY_COUNT);
for (int i = 0; i < KEY_COUNT; i++) {
Row row = getTestRow(i, i);
@@ -617,8 +623,8 @@ public class PartitionCommandListenerTest {
.tablePartitionId(msgFactory.tablePartitionIdMessage()
.tableId(txId)
.partitionId(PARTITION_ID).build())
- .rowUuid(new UUID(rowId.mostSignificantBits(),
rowId.leastSignificantBits()))
- .rowBuffer(new ByteString(row.byteBuffer()))
+ .rowUuid(rowId.uuid())
+ .rowBuffer(row.byteBuffer())
.txId(txId)
.build());
@@ -663,7 +669,7 @@ public class PartitionCommandListenerTest {
.tablePartitionId(msgFactory.tablePartitionIdMessage()
.tableId(txId)
.partitionId(PARTITION_ID).build())
- .rowUuid(new UUID(rowId.mostSignificantBits(),
rowId.leastSignificantBits()))
+ .rowUuid(rowId.uuid())
.txId(txId)
.build());
@@ -729,7 +735,6 @@ public class PartitionCommandListenerTest {
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, i);
- var commitPartId = new TablePartitionId(txId, PARTITION_ID);
txIds.add(txId);
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
@@ -740,7 +745,7 @@ public class PartitionCommandListenerTest {
.tableId(txId)
.partitionId(PARTITION_ID).build())
.rowUuid(Timestamp.nextVersion().toUuid())
- .rowBuffer(new ByteString(row.byteBuffer()))
+ .rowBuffer(row.byteBuffer())
.txId(txId)
.build());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 17475d1db2..4ea1136f24 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -355,7 +355,7 @@ public class IncomingSnapshotCopierTest {
responseEntries.add(
TABLE_MSG_FACTORY.responseEntry()
- .rowId(new UUID(rowId.mostSignificantBits(),
rowId.leastSignificantBits()))
+ .rowId(rowId.uuid())
.rowVersions(rowVersions)
.timestamps(timestamps)
.txId(txId)