This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 16ec705 IGNITE-16110 Added current term to GetLeaderResponse. Fixes #515 16ec705 is described below commit 16ec7051af62b998f8630ea02d0fb895018df0d7 Author: Mirza Aliev <alievmi...@gmail.com> AuthorDate: Tue Dec 21 12:35:29 2021 +0300 IGNITE-16110 Added current term to GetLeaderResponse. Fixes #515 Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com> --- .../java/org/apache/ignite/raft/jraft/Node.java | 7 +++++ .../apache/ignite/raft/jraft/core/NodeImpl.java | 10 ++++---- .../apache/ignite/raft/jraft/rpc/CliRequests.java | 2 ++ .../rpc/impl/cli/GetLeaderRequestProcessor.java | 1 + .../raft/jraft/core/RaftGroupServiceTest.java | 30 +++++++++++++++++++++- 5 files changed, 44 insertions(+), 6 deletions(-) diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java index 102e8fe..1ab3fd4 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java @@ -298,4 +298,11 @@ public interface Node extends Lifecycle<NodeOptions>, Describer { * @return node's target election priority value. */ int getNodeTargetPriority(); + + /** + * Get the node's current term. + * + * @return node's current term. + */ + long getCurrentTerm(); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 8957c46..f4a169d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -1196,7 +1196,7 @@ public class NodeImpl implements Node, RaftServerService { if (peer.equals(this.serverId)) { continue; } - + rpcClientService.connectAsync(peer.getEndpoint()).thenAccept(ok -> { if (!ok) { LOG.warn("Node {} failed to init channel, address={}.", getNodeId(), peer.getEndpoint()); @@ -2488,7 +2488,7 @@ public class NodeImpl implements Node, RaftServerService { return this.raftOptions; } - @OnlyForTest + @Override public long getCurrentTerm() { this.readLock.lock(); try { @@ -2753,7 +2753,7 @@ public class NodeImpl implements Node, RaftServerService { if (peer.equals(this.serverId)) { continue; } - + rpcClientService.connectAsync(peer.getEndpoint()).thenAccept(ok -> { if (!ok) { LOG.warn("Node {} failed to init channel, address={}.", getNodeId(), peer.getEndpoint()); @@ -3400,10 +3400,10 @@ public class NodeImpl implements Node, RaftServerService { // Parallelize response and election done.sendResponse(resp); doUnlock = false; - + LOG.info("Node {} received TimeoutNowRequest from {}, term={} and starts voting.", getNodeId(), request.serverId(), savedTerm); - + electSelf(); } finally { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java index a5f2b8d..daff213 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java @@ -109,6 +109,8 @@ public final class CliRequests { @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_LEADER_RESPONSE, autoSerializable = false) public interface GetLeaderResponse extends Message { String leaderId(); + + long currentTerm(); } @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_PEERS_REQUEST, autoSerializable = false) diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java index f8be228..9d7c6e6 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java @@ -87,6 +87,7 @@ public class GetLeaderRequestProcessor extends BaseCliRequestProcessor<GetLeader if (leader != null && !leader.isEmpty()) { return msgFactory().getLeaderResponse() .leaderId(leader.toString()) + .currentTerm(node.getCurrentTerm()) .build(); } } diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java index 81bfbf3..b139916 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java @@ -47,6 +47,8 @@ import org.apache.ignite.raft.jraft.entity.PeerId; import org.apache.ignite.raft.jraft.error.RaftError; import org.apache.ignite.raft.jraft.rpc.ActionRequest; import org.apache.ignite.raft.jraft.rpc.CliRequests; +import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest; +import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse; import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory; import org.apache.ignite.raft.jraft.rpc.impl.RaftException; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl; @@ -100,6 +102,9 @@ public class RaftGroupServiceTest { /** Retry delay. */ private static final int DELAY = 200; + /** Current term */ + private static final int CURRENT_TERM = 1; + /** Mock cluster. */ @Mock private ClusterService cluster; @@ -747,6 +752,29 @@ public class RaftGroupServiceTest { assertEquals(NODES.subList(1, 2), service.learners()); } + /** */ + @Test + public void testGetLeaderRequest() throws Exception { + String groupId = "test"; + + mockLeaderRequest(false); + + RaftGroupService service = + RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS); + + assertNull(service.leader()); + + service.refreshLeader().get(); + + GetLeaderRequest req = FACTORY.getLeaderRequest().groupId(groupId).build(); + + GetLeaderResponse fut = (GetLeaderResponse) messagingService.invoke(leader.address(), req, TIMEOUT).get(); + + assertEquals(fut.leaderId(), PeerId.fromPeer(leader).toString()); + + assertEquals(fut.currentTerm(), CURRENT_TERM); + } + /** * @param delay {@code True} to create a delay before response. * @param peer Fail the request targeted to given peer. @@ -816,7 +844,7 @@ public class RaftGroupServiceTest { Object resp = leader0 == null ? FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build() : - FACTORY.getLeaderResponse().leaderId(leader0.toString()).build(); + FACTORY.getLeaderResponse().leaderId(leader0.toString()).currentTerm(CURRENT_TERM).build(); return completedFuture(resp); });