This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1b4552729 RATIS-1849. Remove unused getRaftClient (#886)
1b4552729 is described below
commit 1b45527295118254fb5ec81bb295d73ddda81a95
Author: tison <[email protected]>
AuthorDate: Thu Jun 15 15:44:43 2023 +0800
RATIS-1849. Remove unused getRaftClient (#886)
---
.../java/org/apache/ratis/server/RaftServer.java | 29 ++---
.../apache/ratis/server/impl/RaftServerImpl.java | 124 +++++++++++++--------
.../datastream/DataStreamAsyncClusterTests.java | 4 -
.../datastream/TestNettyDataStreamWithMock.java | 42 ++-----
4 files changed, 102 insertions(+), 97 deletions(-)
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index 1c99e88d8..84e3a1ed3 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -17,12 +17,26 @@
*/
package org.apache.ratis.server;
-import org.apache.ratis.client.RaftClient;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
+import org.apache.ratis.protocol.AdminProtocol;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
@@ -37,14 +51,6 @@ import org.apache.ratis.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Optional;
-
/** Raft server interface */
public interface RaftServer extends Closeable, RpcType.Get,
RaftServerProtocol, RaftServerAsynchronousProtocol,
@@ -111,9 +117,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the data stream map of this division. */
DataStreamMap getDataStreamMap();
- /** @return the internal {@link RaftClient} of this division. */
- RaftClient getRaftClient();
-
/** @return the {@link ThreadGroup} the threads of this Division belong
to. */
ThreadGroup getThreadGroup();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 90641b73e..e429518e6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,24 +17,85 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.client.RaftClient;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.management.ObjectName;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
-import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.ReadIndexException;
-import org.apache.ratis.protocol.exceptions.SetConfigurationException;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
@@ -65,26 +126,19 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.util.*;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.JmxRegister;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedSupplier;
-
-import javax.management.ObjectName;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.NoSuchFileException;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
@@ -166,8 +220,6 @@ class RaftServerImpl implements RaftServer.Division,
private final DataStreamMap dataStreamMap;
private final RaftServerConfigKeys.Read.Option readOption;
- private final MemoizedSupplier<RaftClient> raftClient;
-
private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
@@ -220,11 +272,6 @@ class RaftServerImpl implements RaftServer.Division,
this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(),
getMemberId().toString());
- this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
- .setRaftGroup(group)
- .setProperties(getRaftServer().getProperties())
- .build());
-
this.transferLeadership = new TransferLeadership(this, properties);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this,
properties);
@@ -302,11 +349,6 @@ class RaftServerImpl implements RaftServer.Division,
return dataStreamMap;
}
- @Override
- public RaftClient getRaftClient() {
- return raftClient.get();
- }
-
@Override
public RetryCacheImpl getRetryCache() {
return retryCache;
@@ -496,14 +538,6 @@ class RaftServerImpl implements RaftServer.Division,
} catch (Exception ignored) {
LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
}
- try {
- if (raftClient.isInitialized()) {
- raftClient.get().close();
- }
- } catch (Exception ignored) {
- LOG.warn("{}: Failed to close raft client", getMemberId(), ignored);
- }
-
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception ignored) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index c18f7dea6..86b03a2da 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -139,10 +139,6 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER
extends MiniRaftCluste
.orElseThrow(IllegalStateException::new);
}
- ClientId getPrimaryClientId(CLUSTER cluster, RaftPeer primary) {
- return cluster.getDivision(primary.getId()).getRaftClient().getId();
- }
-
long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int
bufferNum, boolean stepDownLeader) {
final Iterable<RaftServer> servers =
CollectionUtils.as(cluster.getServers(), s -> s);
final RaftPeerId leader;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 324acfa7c..64fa59e40 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -18,15 +18,16 @@
package org.apache.ratis.datastream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.AsyncRpcApi;
-import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -40,15 +41,6 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -70,10 +62,9 @@ public class TestNettyDataStreamWithMock extends
DataStreamBaseTest {
RaftConfigKeys.DataStream.setType(properties,
SupportedDataStreamType.NETTY);
}
- RaftServer.Division mockDivision(RaftServer server, RaftClient client) {
+ RaftServer.Division mockDivision(RaftServer server) {
final RaftServer.Division division = mock(RaftServer.Division.class);
when(division.getRaftServer()).thenReturn(server);
- when(division.getRaftClient()).thenReturn(client);
when(division.getRaftConf()).thenAnswer(i -> getRaftConf());
final MultiDataStreamStateMachine stateMachine = new
MultiDataStreamStateMachine();
@@ -107,27 +98,8 @@ public class TestNettyDataStreamWithMock extends
DataStreamBaseTest {
when(raftServer.getId()).thenReturn(peerId);
when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
if (getStateMachineException == null) {
- RaftClient client = Mockito.mock(RaftClient.class);
- when(client.getId()).thenReturn(clientId);
- AsyncRpcApi asyncRpcApi = Mockito.mock(AsyncRpcApi.class);
- when(client.async()).thenReturn(asyncRpcApi);
-
- final RaftServer.Division myDivision = mockDivision(raftServer,
client);
+ final RaftServer.Division myDivision = mockDivision(raftServer);
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
-
- if (submitException != null) {
-
when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class))).thenThrow(submitException);
- } else if (i == 0) {
- // primary
- when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class)))
- .thenAnswer((Answer<CompletableFuture<RaftClientReply>>)
invocation -> {
- final RaftClientRequest r = (RaftClientRequest)
invocation.getArguments()[0];
- final RaftClientReply.Builder b =
RaftClientReply.newBuilder().setRequest(r);
- final RaftClientReply reply = leaderException != null?
b.setException(leaderException).build()
- : b.setSuccess().setMessage(() ->
DataStreamTestUtils.MOCK).build();
- return CompletableFuture.completedFuture(reply);
- });
- }
} else {
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
}