This is an automated email from the ASF dual-hosted git repository. ascherbakov pushed a commit to branch ignite-13885 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a529ad1eaebdbd0d999b3102d20f10d931afc8af Author: Alexey Scherbakov <[email protected]> AuthorDate: Thu Dec 31 11:40:20 2020 +0300 IGNITE-13885 partially working. --- .../sofa/jraft/entity/LocalStorageOutter.java | 3 +- .../com/alipay/sofa/jraft/entity/RaftOutter.java | 3 +- .../com/alipay/sofa/jraft/rpc/CliRequests.java | 2 +- .../java/com/alipay/sofa/jraft/rpc/Connection.java | 1 - .../alipay/sofa/jraft/rpc/HasErrorResponse.java | 2 +- .../sofa/jraft/rpc/MessageBuilderFactory.java | 28 +++- .../com/alipay/sofa/jraft/rpc/RaftRpcFactory.java | 9 -- .../com/alipay/sofa/jraft/rpc/RpcRequests.java | 26 ++-- .../sofa/jraft/rpc/impl/AbstractClientService.java | 11 +- .../sofa/jraft/rpc/impl/LocalConnection.java | 35 +++++ .../sofa/jraft/rpc/impl/LocalRaftRpcFactory.java | 35 ++++- .../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java | 72 ++++++++- .../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 165 ++++++++++++++++++++- .../rpc/message/AppendEntriesRequestImpl.java | 130 ++++++++++++++++ .../rpc/message/AppendEntriesResponseImpl.java | 47 ++++++ .../rpc/message/DefaultMessageBuilderFactory.java | 53 ++++++- .../sofa/jraft/rpc/message/EntryMetaImpl.java | 133 +++++++++++++++++ .../sofa/jraft/rpc/message/ErrorResponseImpl.java | 32 ++++ .../sofa/jraft/rpc/message/PingRequestImpl.java | 21 +++ .../sofa/jraft/rpc/message/PreVoteRequestImpl.java | 87 +++++++++++ .../jraft/rpc/message/ReadIndexRequestImpl.java | 76 ++++++++++ .../jraft/rpc/message/ReadIndexResponseImpl.java | 36 +++++ .../jraft/rpc/message/RequestVoteResponseImpl.java | 36 +++++ .../alipay/sofa/jraft/rpc/message/StableMeta.java | 32 ++++ .../jraft/rpc/message/TimeoutNowRequestImpl.java | 54 +++++++ .../jraft/rpc/message/TimeoutNowResponseImpl.java | 36 +++++ .../com/alipay/sofa/jraft/util/JDKMarshaller.java | 40 +++-- .../com/alipay/sofa/jraft/util/Marshaller.java | 4 +- .../java/com/alipay/sofa/jraft/util/Utils.java | 2 +- .../com.alipay.sofa.jraft.rpc.RaftRpcFactory | 2 +- .../com/alipay/sofa/jraft/rpc/LocalRpcTest.java | 154 +++++++++++++++++++ .../jraft/storage/impl/LocalLogStorageTest.java | 5 - .../sofa/jraft/storage/io/LocalFileReaderTest.java | 1 - modules/raft/src/test/resources/log4j2.xml | 25 ++++ 34 files changed, 1317 insertions(+), 81 deletions(-) diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java index 81e54b4..d483818 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java @@ -20,6 +20,7 @@ package com.alipay.sofa.jraft.entity; import com.alipay.sofa.jraft.rpc.Message; +import com.alipay.sofa.jraft.rpc.MessageBuilderFactory; import com.alipay.sofa.jraft.storage.RaftMetaStorage; import com.alipay.sofa.jraft.util.DisruptorBuilder; import java.nio.ByteBuffer; @@ -45,7 +46,7 @@ public final class LocalStorageOutter { public interface StablePBMeta extends Message { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createStableMeta(); } long getTerm(); diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java index 83d40ea..b8b78a3 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java @@ -19,12 +19,13 @@ package com.alipay.sofa.jraft.entity; +import com.alipay.sofa.jraft.rpc.MessageBuilderFactory; import com.alipay.sofa.jraft.rpc.RpcRequests; public final class RaftOutter { public interface EntryMeta { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createEntryMeta(); } long getTerm(); diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java index 4c8776d..38732c0 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java @@ -38,7 +38,7 @@ public final class CliRequests { } public static Builder newBuilder() { - return MessageBuilderFactory.DEFAULT.createAddPeer(); + return MessageBuilderFactory.DEFAULT.createAddPeerRequest(); } } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java index 9bae02a..256bf6f 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java @@ -22,7 +22,6 @@ package com.alipay.sofa.jraft.rpc; * @author jiachun.fjc */ public interface Connection { - /** * Get the attribute that bound to the connection. * diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java index 917b05f..59c8d5c 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java @@ -1,5 +1,5 @@ package com.alipay.sofa.jraft.rpc; public interface HasErrorResponse extends Message { - RpcRequests.ErrorResponse getErrorResponse(); + RpcRequests.ErrorResponse getErrorResponse(); // TODO asch can be removed. } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java index 49aed96..a286c6c 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java @@ -1,13 +1,39 @@ package com.alipay.sofa.jraft.rpc; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter; +import com.alipay.sofa.jraft.entity.LocalStorageOutter; +import com.alipay.sofa.jraft.entity.RaftOutter; import com.alipay.sofa.jraft.rpc.message.DefaultMessageBuilderFactory; // TODO asch use JRaftServiceLoader ? public interface MessageBuilderFactory { public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory(); - CliRequests.AddPeerRequest.Builder createAddPeer(); + CliRequests.AddPeerRequest.Builder createAddPeerRequest(); LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta(); + + RpcRequests.PingRequest.Builder createPingRequest(); + + RpcRequests.RequestVoteRequest.Builder createVoteRequest(); + + RpcRequests.RequestVoteResponse.Builder createVoteResponse(); + + RpcRequests.ErrorResponse.Builder createErrorResponse(); + + LocalStorageOutter.StablePBMeta.Builder createStableMeta(); + + RpcRequests.AppendEntriesRequest.Builder createAppendEntriesRequest(); + + RpcRequests.AppendEntriesResponse.Builder createAppendEntriesResponse(); + + RaftOutter.EntryMeta.Builder createEntryMeta(); + + RpcRequests.TimeoutNowRequest.Builder createTimeoutNowRequest(); + + RpcRequests.TimeoutNowResponse.Builder createTimeoutNowResponse(); + + RpcRequests.ReadIndexRequest.Builder createReadIndexRequest(); + + RpcRequests.ReadIndexResponse.Builder createReadIndexResponse(); } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java index acc19c5..2989f25 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java @@ -29,15 +29,6 @@ public interface RaftRpcFactory { RpcResponseFactory DEFAULT = new RpcResponseFactory() {}; /** - * Register serializer with class name. - * - * @param className class name - * @param args extended parameters, different implementers may need different parameters, - * the order of parameters need a convention - */ - void registerProtobufSerializer(final String className, final Object... args); - - /** * Creates a raft RPC client. * * @return a new rpc client instance diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java index 74568b7..eb6ac4f 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java @@ -19,14 +19,8 @@ package com.alipay.sofa.jraft.rpc; -import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.entity.LeaderChangeContext; import com.alipay.sofa.jraft.entity.RaftOutter; -import com.alipay.sofa.jraft.option.BootstrapOptions; -import com.alipay.sofa.jraft.option.ReplicatorOptions; import com.alipay.sofa.jraft.util.ByteString; -import com.alipay.sofa.jraft.util.DisruptorBuilder; -import java.io.ByteArrayOutputStream; public final class RpcRequests { private RpcRequests() { @@ -45,7 +39,7 @@ public final class RpcRequests { } public static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createPingRequest(); } } @@ -73,7 +67,7 @@ public final class RpcRequests { } public static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createErrorResponse(); } } @@ -140,7 +134,7 @@ public final class RpcRequests { public interface TimeoutNowRequest extends Message { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createTimeoutNowRequest(); } java.lang.String getGroupId(); @@ -166,7 +160,7 @@ public final class RpcRequests { public interface TimeoutNowResponse extends HasErrorResponse { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createTimeoutNowResponse(); } static Message getDefaultInstance() { @@ -231,7 +225,7 @@ public final class RpcRequests { } static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createVoteRequest(); } } @@ -241,7 +235,7 @@ public final class RpcRequests { } static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createVoteResponse(); } /** @@ -292,7 +286,7 @@ public final class RpcRequests { public interface AppendEntriesRequest extends Message { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createAppendEntriesRequest(); } /** @@ -355,7 +349,7 @@ public final class RpcRequests { } static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createAppendEntriesResponse(); } long getTerm(); @@ -445,7 +439,7 @@ public final class RpcRequests { public interface ReadIndexRequest extends Message { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createReadIndexRequest(); } java.lang.String getGroupId(); @@ -477,7 +471,7 @@ public final class RpcRequests { public interface ReadIndexResponse extends HasErrorResponse { static Builder newBuilder() { - return null; + return MessageBuilderFactory.DEFAULT.createReadIndexResponse(); } static Message getDefaultInstance() { diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java index 80e7d1e..5701370 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java @@ -219,10 +219,15 @@ public abstract class AbstractClientService implements ClientService { if (result instanceof ErrorResponse) { status = handleErrorResponse((ErrorResponse) result); msg = (Message) result; - } else if (result instanceof HasErrorResponse) { + } else if (result instanceof HasErrorResponse) { // TODO asch we don't need this. final ErrorResponse eResp = ((HasErrorResponse) result).getErrorResponse(); - status = handleErrorResponse(eResp); - msg = (Message) eResp; + if (eResp != null) { + status = handleErrorResponse(eResp); + msg = eResp; + } + else { + msg = (T) result; + } } else { msg = (T) result; } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java new file mode 100644 index 0000000..0607196 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java @@ -0,0 +1,35 @@ +package com.alipay.sofa.jraft.rpc.impl; + +import com.alipay.sofa.jraft.rpc.Connection; +import com.alipay.sofa.jraft.util.Endpoint; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class LocalConnection implements Connection { + private Map<String, Object> attrs = new ConcurrentHashMap<>(); + + final LocalRpcClient client; + final Endpoint srv; + + public LocalConnection(LocalRpcClient client, Endpoint srv) { + this.client = client; + this.srv = srv; + } + + @Override public Object getAttribute(String key) { + return attrs.get(key); + } + + @Override public void setAttribute(String key, Object value) { + attrs.put(key, value); + } + + @Override public Object setAttributeIfAbsent(String key, Object value) { + return attrs.putIfAbsent(key, value); + } + + @Override public void close() { + LocalRpcServer.closeConnection(client, srv); + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java index acaa136..ac53089 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.jraft.rpc.impl; +import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.RaftRpcFactory; import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.rpc.RpcServer; @@ -31,16 +32,40 @@ import org.slf4j.LoggerFactory; @SPI public class LocalRaftRpcFactory implements RaftRpcFactory { private static final Logger LOG = LoggerFactory.getLogger(LocalRaftRpcFactory.class); + @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) { + LocalRpcClient rpcClient = new LocalRpcClient(); - @Override public void registerProtobufSerializer(String className, Object... args) { + if (helper != null) + helper.config(rpcClient); + return rpcClient; } - @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) { - return null; + @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) { + LocalRpcServer srv = new LocalRpcServer(endpoint); + + if (helper != null) + helper.config(srv); + + return srv; } - @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) { - return null; + @Override public ConfigHelper<RpcServer> defaultJRaftServerConfigHelper(RpcOptions opts) { + return new ConfigHelper<RpcServer>() { + @Override public void config(RpcServer instance) { + LocalRpcServer srv = (LocalRpcServer) instance; + // TODO asch. + } + }; + } + + @Override + public ConfigHelper<RpcClient> defaultJRaftClientConfigHelper(final RpcOptions opts) { + return new ConfigHelper<RpcClient>() { + @Override public void config(RpcClient instance) { + LocalRpcClient rpcClient = (LocalRpcClient) instance; + // TODO asch. + } + }; } } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java index a6c063b..cbff3d6 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java @@ -17,41 +17,95 @@ package com.alipay.sofa.jraft.rpc.impl; import com.alipay.sofa.jraft.ReplicatorGroup; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.InvokeTimeoutException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.option.RpcOptions; +import com.alipay.sofa.jraft.rpc.Connection; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeContext; import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.util.Endpoint; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; /** - * Bolt rpc client impl. + * Local rpc client impl. * - * @author jiachun.fjc + * @author ascherbakov. */ public class LocalRpcClient implements RpcClient { + private volatile ReplicatorGroup replicatorGroup = null; + @Override public boolean checkConnection(Endpoint endpoint) { - return false; + return LocalRpcServer.connect(this, endpoint, false, null); } @Override public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) { - return false; + return LocalRpcServer.connect(this, endpoint, createIfAbsent, this::onCreated); } @Override public void closeConnection(Endpoint endpoint) { - + LocalRpcServer.closeConnection(this, endpoint); } @Override public void registerConnectEventListener(ReplicatorGroup replicatorGroup) { + this.replicatorGroup = replicatorGroup; + } + private void onCreated(LocalConnection conn) { + if (replicatorGroup != null) { + final PeerId peer = new PeerId(); + if (peer.parse(conn.srv.toString())) { + replicatorGroup.checkReplicator(peer, true); + } + else + System.out.println("Fail to parse peer: {}" + peer); // TODO asch + } } @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException { - return null; + if (!checkConnection(endpoint, true)) + throw new RemotingException("Server is dead " + endpoint); + + LocalRpcServer srv = LocalRpcServer.servers.get(endpoint); + if (srv == null) + throw new RemotingException("Server is dead " + endpoint); + + CompletableFuture fut = new CompletableFuture(); + + Object[] tuple = {this, request, fut}; + assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue. + + try { + return fut.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw new RemotingException(e); + } catch (TimeoutException e) { + throw new InvokeTimeoutException(e); + } } @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback, long timeoutMs) throws InterruptedException, RemotingException { + if (!checkConnection(endpoint, true)) + throw new RemotingException("Server is dead " + endpoint); + + LocalRpcServer srv = LocalRpcServer.servers.get(endpoint); + if (srv == null) + throw new RemotingException("Server is dead " + endpoint); + + CompletableFuture fut = new CompletableFuture(); + + Object[] tuple = {this, request, fut}; + assert srv.incoming.offer(tuple); + fut.whenComplete((BiConsumer<Object, Throwable>) (res, err) -> { + callback.complete(res, err); + }).orTimeout(timeoutMs, TimeUnit.MILLISECONDS); } @Override public boolean init(RpcOptions opts) { @@ -59,6 +113,10 @@ public class LocalRpcClient implements RpcClient { } @Override public void shutdown() { - + // Close all connection from this peer. + for (LocalRpcServer value : LocalRpcServer.servers.values()) + LocalRpcServer.closeConnection(this, value.local); } + + } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java index 4423a42..71ae19b 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java @@ -16,32 +16,181 @@ */ package com.alipay.sofa.jraft.rpc.impl; +import com.alipay.sofa.jraft.rpc.Connection; +import com.alipay.sofa.jraft.rpc.Message; +import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.util.Endpoint; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Consumer; /** - * Bolt RPC server impl. + * Local RPC server impl. * - * @author jiachun.fjc + * @author ascherbakov. */ public class LocalRpcServer implements RpcServer { - @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) { + /** Running servers. */ + public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>(); + + Endpoint local; + + /** Remote connections to this server. */ + public ConcurrentMap<LocalRpcClient, LocalConnection> conns = new ConcurrentHashMap<>(); + + private Map<String, RpcProcessor> processors = new ConcurrentHashMap<>(); + + private volatile boolean started = false; + + private Thread worker; + private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>(); + + BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch use some kind of MPSC queue. + + public LocalRpcServer(Endpoint local) { + this.local = local; } - @Override public void registerProcessor(RpcProcessor<?> processor) { + static synchronized boolean connect(LocalRpcClient client, Endpoint srv, boolean createIfAbsent, Consumer<LocalConnection> onCreated) { + LocalRpcServer locSrv = servers.get(srv); + + if (locSrv == null) + return false; // Server is dead. + + LocalConnection conn = locSrv.conns.get(client); + + if (conn == null) { + if (!createIfAbsent) + return false; + + conn = new LocalConnection(client, srv); + + locSrv.conns.put(client, conn); + + onCreated.accept(conn); + } + + return true; + } + + static synchronized void closeConnection(LocalRpcClient client, Endpoint srv) { + LocalRpcServer locSrv = servers.get(srv); + + if (locSrv == null) + return; + + LocalConnection conn = locSrv.conns.remove(client); + + if (conn == null) + return; + + locSrv.listeners.forEach(l -> l.onClosed(client.toString(), conn)); + } + + @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) { + if (!listeners.contains(listener)) + listeners.add(listener); + } + @Override public void registerProcessor(RpcProcessor<?> processor) { + processors.put(processor.interest(), processor); } @Override public int boundPort() { - return 0; + return local.getPort(); } - @Override public boolean init(Void opts) { - return false; + @Override public synchronized boolean init(Void opts) { + if (started) + return false; + + worker = new Thread(new Runnable() { + @Override public void run() { + while(started) { + try { + Object[] tuple = incoming.take(); + LocalRpcClient sender = (LocalRpcClient) tuple[0]; + + // Connection is not established, ignore message. + LocalConnection conn = conns.get(sender); + if (conn == null) + continue; + + Message msg = (Message) tuple[1]; + CompletableFuture<Object> fut = (CompletableFuture) tuple[2]; + + Class<? extends Message> cls = msg.getClass(); + RpcProcessor prc = processors.get(cls.getName()); + + // TODO asch cache it. + if (prc == null) { + for (Class<?> iface : cls.getInterfaces()) { + prc = processors.get(iface.getName()); + + if (prc != null) + break; + } + } + + if (prc == null) + System.out.println(); + + prc.handleRequest(new RpcContext() { + @Override public void sendResponse(Object responseObj) { + fut.complete(responseObj); + } + + @Override public Connection getConnection() { + return conn; + } + + @Override public String getRemoteAddress() { + return sender.toString(); + } + }, msg); + + } catch (InterruptedException e) { + return; + } + } + } + }); + + worker.setName("LocalRPCServer-Thread: " + local.toString()); + worker.start(); + + servers.put(local, this); + + started = true; + + return true; } - @Override public void shutdown() { + @Override public synchronized void shutdown() { + if (!started) + return; + + started = false; + worker.interrupt(); + try { + worker.join(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local); + } + + // Close all connections to this server. + for (LocalRpcClient client : conns.keySet()) + closeConnection(client, local); + servers.remove(local); } } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java new file mode 100644 index 0000000..416cc30 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java @@ -0,0 +1,130 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.entity.RaftOutter; +import com.alipay.sofa.jraft.rpc.RpcRequests; +import com.alipay.sofa.jraft.util.ByteString; +import com.alipay.sofa.jraft.util.Marshaller; +import java.util.ArrayList; +import java.util.List; + +class AppendEntriesRequestImpl implements RpcRequests.AppendEntriesRequest, RpcRequests.AppendEntriesRequest.Builder { + private String groupId; + private String serverId; + private String peerId; + private long term; + private long prevLogTerm; + private long prevLogIndex; + private List<RaftOutter.EntryMeta> entiesList = new ArrayList<>(); + private long committedIndex; + private ByteString data = ByteString.EMPTY; + + @Override public String getGroupId() { + return groupId; + } + + @Override public String getServerId() { + return serverId; + } + + @Override public String getPeerId() { + return peerId; + } + + @Override public long getTerm() { + return term; + } + + @Override public long getPrevLogTerm() { + return prevLogTerm; + } + + @Override public long getPrevLogIndex() { + return prevLogIndex; + } + + @Override public List<RaftOutter.EntryMeta> getEntriesList() { + return entiesList; + } + + @Override public RaftOutter.EntryMeta getEntries(int index) { + return entiesList.get(index); + } + + @Override public int getEntriesCount() { + return entiesList.size(); + } + + @Override public long getCommittedIndex() { + return committedIndex; + } + + @Override public ByteString getData() { + return data; + } + + @Override public boolean hasData() { + return data != ByteString.EMPTY; + } + + @Override public byte[] toByteArray() { + return Marshaller.DEFAULT.marshall(this); + } + + @Override public RpcRequests.AppendEntriesRequest build() { + return this; + } + + @Override public Builder setData(ByteString data) { + this.data = data; + + return this; + } + + @Override public Builder setTerm(long term) { + this.term = term; + + return this; + } + + @Override public Builder setGroupId(String groupId) { + this.groupId = groupId; + + return this; + } + + @Override public Builder setServerId(String serverId) { + this.serverId = serverId; + + return this; + } + + @Override public Builder setPeerId(String peerId) { + this.peerId = peerId; + + return this; + } + + @Override public Builder setPrevLogIndex(long prevLogIndex) { + this.prevLogIndex = prevLogIndex; + + return this; + } + + @Override public Builder setPrevLogTerm(long prevLogTerm) { + this.prevLogTerm = prevLogTerm; + + return this; + } + + @Override public Builder setCommittedIndex(long lastCommittedIndex) { + this.committedIndex = lastCommittedIndex; + + return this; + } + + @Override public Builder addEntries(RaftOutter.EntryMeta entryMeta) { + entiesList.add(entryMeta); + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java new file mode 100644 index 0000000..9187ab6 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java @@ -0,0 +1,47 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +public class AppendEntriesResponseImpl implements RpcRequests.AppendEntriesResponse, RpcRequests.AppendEntriesResponse.Builder { + private long term; + private boolean success; + private long lastLogIndex; + + @Override public long getTerm() { + return term; + } + + @Override public boolean getSuccess() { + return success; + } + + @Override public long getLastLogIndex() { + return lastLogIndex; + } + + @Override public RpcRequests.ErrorResponse getErrorResponse() { + return null; + } + + @Override public RpcRequests.AppendEntriesResponse build() { + return this; + } + + @Override public Builder setSuccess(boolean success) { + this.success = success; + + return this; + } + + @Override public Builder setTerm(long currTerm) { + this.term = currTerm; + + return this; + } + + @Override public Builder setLastLogIndex(long lastLogIndex) { + this.lastLogIndex = lastLogIndex; + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java index 14a3fb4..9b8b54d 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java @@ -1,15 +1,66 @@ package com.alipay.sofa.jraft.rpc.message; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter; +import com.alipay.sofa.jraft.entity.LocalStorageOutter; +import com.alipay.sofa.jraft.entity.RaftOutter; import com.alipay.sofa.jraft.rpc.CliRequests; import com.alipay.sofa.jraft.rpc.MessageBuilderFactory; +import com.alipay.sofa.jraft.rpc.RpcRequests; public class DefaultMessageBuilderFactory implements MessageBuilderFactory { - @Override public CliRequests.AddPeerRequest.Builder createAddPeer() { + @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() { return new AddPeerRequestImpl(); } @Override public LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta() { return new LocalFileMetaImpl(); } + + @Override public RpcRequests.PingRequest.Builder createPingRequest() { + return new PingRequestImpl(); + } + + @Override public RpcRequests.RequestVoteRequest.Builder createVoteRequest() { + return new PreVoteRequestImpl(); + } + + @Override public RpcRequests.RequestVoteResponse.Builder createVoteResponse() { + return new RequestVoteResponseImpl(); + } + + @Override public RpcRequests.ErrorResponse.Builder createErrorResponse() { + return new ErrorResponseImpl(); + } + + @Override public LocalStorageOutter.StablePBMeta.Builder createStableMeta() { + return new StableMeta(); + } + + @Override public RpcRequests.AppendEntriesRequest.Builder createAppendEntriesRequest() { + return new AppendEntriesRequestImpl(); + } + + @Override public RpcRequests.AppendEntriesResponse.Builder createAppendEntriesResponse() { + return new AppendEntriesResponseImpl(); + } + + @Override public RaftOutter.EntryMeta.Builder createEntryMeta() { + return new EntryMetaImpl(); + } + + @Override public RpcRequests.TimeoutNowRequest.Builder createTimeoutNowRequest() { + return new TimeoutNowRequestImpl(); + } + + @Override public RpcRequests.TimeoutNowResponse.Builder createTimeoutNowResponse() { + return new TimeoutNowResponseImpl(); + } + + @Override public RpcRequests.ReadIndexRequest.Builder createReadIndexRequest() { + return new ReadIndexRequestImpl(); + } + + @Override public RpcRequests.ReadIndexResponse.Builder createReadIndexResponse() { + return new ReadIndexResponseImpl(); + } } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java new file mode 100644 index 0000000..5356d76 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java @@ -0,0 +1,133 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.entity.EnumOutter; +import com.alipay.sofa.jraft.entity.RaftOutter; +import java.util.ArrayList; +import java.util.List; + +class EntryMetaImpl implements RaftOutter.EntryMeta, RaftOutter.EntryMeta.Builder { + private long term; + private EnumOutter.EntryType type; + private List<String> peersList = new ArrayList<>(); + private long dataLen; + private List<String> oldPeersList = new ArrayList<>(); + private long checksum; + private List<String> learnersList = new ArrayList<>(); + private List<String> oldLearnersList = new ArrayList<>(); + + @Override public long getTerm() { + return term; + } + + @Override public EnumOutter.EntryType getType() { + return type; + } + + @Override public List<String> getPeersList() { + return peersList; + } + + @Override public int getPeersCount() { + return peersList.size(); + } + + @Override public String getPeers(int index) { + return peersList.get(index); + } + + @Override public long getDataLen() { + return dataLen; + } + + @Override public List<String> getOldPeersList() { + return oldPeersList; + } + + @Override public int getOldPeersCount() { + return oldPeersList.size(); + } + + @Override public String getOldPeers(int index) { + return oldPeersList.get(index); + } + + @Override public long getChecksum() { + return checksum; + } + + @Override public List<String> getLearnersList() { + return learnersList; + } + + @Override public int getLearnersCount() { + return learnersList.size(); + } + + @Override public String getLearners(int index) { + return learnersList.get(index); + } + + @Override public List<String> getOldLearnersList() { + return oldLearnersList; + } + + @Override public int getOldLearnersCount() { + return oldLearnersList.size(); + } + + @Override public String getOldLearners(int index) { + return oldLearnersList.get(index); + } + + @Override public RaftOutter.EntryMeta build() { + return this; + } + + @Override public Builder setTerm(long term) { + this.term = term; + + return this; + } + + @Override public Builder setChecksum(long checksum) { + this.checksum = checksum; + + return this; + } + + @Override public Builder setType(EnumOutter.EntryType type) { + this.type = type; + + return this; + } + + @Override public Builder setDataLen(int remaining) { + this.dataLen = remaining; + + return this; + } + + @Override public Builder addPeers(String peerId) { + peersList.add(peerId); + + return this; + } + + @Override public Builder addOldPeers(String oldPeerId) { + oldPeersList.add(oldPeerId); + + return this; + } + + @Override public Builder addLearners(String learnerId) { + learnersList.add(learnerId); + + return this; + } + + @Override public Builder addOldLearners(String oldLearnerId) { + oldLearnersList.add(oldLearnerId); + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java new file mode 100644 index 0000000..58447d0 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java @@ -0,0 +1,32 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +public class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder { + private int errorCode; + private String errorMsg; + + @Override public int getErrorCode() { + return errorCode; + } + + @Override public Builder setErrorCode(int errorCode) { + this.errorCode = errorCode; + + return this; + } + + @Override public String getErrorMsg() { + return errorMsg; + } + + @Override public Builder setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + + return this; + } + + @Override public RpcRequests.ErrorResponse build() { + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java new file mode 100644 index 0000000..480f9a4 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java @@ -0,0 +1,21 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +class PingRequestImpl implements RpcRequests.PingRequest , RpcRequests.PingRequest .Builder { + private long sendTimestamp; + + @Override public long getSendTimestamp() { + return sendTimestamp; + } + + @Override public Builder setSendTimestamp(long timestamp) { + this.sendTimestamp = timestamp; + + return this; + } + + @Override public RpcRequests.PingRequest build() { + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java new file mode 100644 index 0000000..d5303c3 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java @@ -0,0 +1,87 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +class PreVoteRequestImpl implements RpcRequests.RequestVoteRequest, RpcRequests.RequestVoteRequest.Builder { + private String groupId; + private String serverId; + private String peerId; + private long term; + private long lastLogTerm; + private long lastLogIndex; + private boolean preVote; + + @Override public String getGroupId() { + return groupId; + } + + @Override public Builder setGroupId(String groupId) { + this.groupId = groupId; + + return this; + } + + @Override public String getServerId() { + return serverId; + } + + @Override public Builder setServerId(String serverId) { + this.serverId = serverId; + + return this; + } + + @Override public String getPeerId() { + return peerId; + } + + @Override public Builder setPeerId(String peerId) { + this.peerId = peerId; + + return this; + } + + @Override public long getTerm() { + return term; + } + + @Override public Builder setTerm(long term) { + this.term = term; + + return this; + } + + @Override public long getLastLogTerm() { + return lastLogTerm; + } + + @Override public Builder setLastLogTerm(long lastLogTerm) { + this.lastLogTerm = lastLogTerm; + + return this; + } + + @Override public long getLastLogIndex() { + return lastLogIndex; + } + + @Override public Builder setLastLogIndex(long lastLogIndex) { + this.lastLogIndex = lastLogIndex; + + return this; + } + + public boolean getPreVote() { + return preVote; + } + + @Override public Builder setPreVote(boolean preVote) { + this.preVote = preVote; + + return this; + } + + @Override public RpcRequests.RequestVoteRequest build() { + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java new file mode 100644 index 0000000..dd1868e --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java @@ -0,0 +1,76 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; +import com.alipay.sofa.jraft.util.ByteString; +import java.util.ArrayList; +import java.util.List; + +class ReadIndexRequestImpl implements RpcRequests.ReadIndexRequest, RpcRequests.ReadIndexRequest.Builder { + private String groupId; + private String serverId; + private List<ByteString> entriesList = new ArrayList<>(); + private String peerId; + + @Override public String getGroupId() { + return groupId; + } + + @Override public String getServerId() { + return serverId; + } + + @Override public List<ByteString> getEntriesList() { + return entriesList; + } + + @Override public int getEntriesCount() { + return entriesList.size(); + } + + @Override public ByteString getEntries(int index) { + return entriesList.get(index); + } + + @Override public String getPeerId() { + return peerId; + } + + @Override public RpcRequests.ReadIndexRequest build() { + return this; + } + + @Override public Builder mergeFrom(RpcRequests.ReadIndexRequest request) { + setGroupId(request.getGroupId()); + setServerId(request.getServerId()); + setPeerId(request.getPeerId()); + for (ByteString data : request.getEntriesList()) { + addEntries(data); + } + + return this; + } + + @Override public Builder setPeerId(String peerId) { + this.peerId = peerId; + + return this; + } + + @Override public Builder setGroupId(String groupId) { + this.groupId = groupId; + + return this; + } + + @Override public Builder setServerId(String serverId) { + this.serverId = serverId; + + return this; + } + + @Override public Builder addEntries(ByteString data) { + entriesList.add(data); + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java new file mode 100644 index 0000000..6cc658f --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java @@ -0,0 +1,36 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +class ReadIndexResponseImpl implements RpcRequests.ReadIndexResponse, RpcRequests.ReadIndexResponse.Builder { + private long index; + private boolean success; + + @Override public long getIndex() { + return index; + } + + @Override public boolean getSuccess() { + return success; + } + + @Override public RpcRequests.ErrorResponse getErrorResponse() { + return null; + } + + @Override public RpcRequests.ReadIndexResponse build() { + return this; + } + + @Override public Builder setSuccess(boolean success) { + this.success = success; + + return this; + } + + @Override public Builder setIndex(long lastCommittedIndex) { + this.index = index; + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java new file mode 100644 index 0000000..e628940 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java @@ -0,0 +1,36 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +public class RequestVoteResponseImpl implements RpcRequests.RequestVoteResponse, RpcRequests.RequestVoteResponse.Builder { + private long term; + private boolean granted; + + @Override public long getTerm() { + return term; + } + + @Override public boolean getGranted() { + return granted; + } + + @Override public RpcRequests.ErrorResponse getErrorResponse() { + return null; + } + + @Override public RpcRequests.RequestVoteResponse build() { + return this; + } + + @Override public Builder setTerm(long currTerm) { + this.term = currTerm; + + return this; + } + + @Override public Builder setGranted(boolean granted) { + this.granted = granted; + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java new file mode 100644 index 0000000..e7790bb --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java @@ -0,0 +1,32 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.entity.LocalStorageOutter; + +class StableMeta implements LocalStorageOutter.StablePBMeta, LocalStorageOutter.StablePBMeta.Builder { + private long term; + private String votedFor; + + @Override public long getTerm() { + return term; + } + + @Override public String getVotedfor() { + return votedFor; + } + + @Override public Builder setTerm(long term) { + this.term = term; + + return this; + } + + @Override public Builder setVotedfor(String votedFor) { + this.votedFor = votedFor; + + return this; + } + + @Override public LocalStorageOutter.StablePBMeta build() { + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java new file mode 100644 index 0000000..af0be57 --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java @@ -0,0 +1,54 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +class TimeoutNowRequestImpl implements RpcRequests.TimeoutNowRequest, RpcRequests.TimeoutNowRequest.Builder { + private String groupId; + private String serverId; + private String peerId; + private long term; + + @Override public String getGroupId() { + return groupId; + } + + @Override public String getServerId() { + return serverId; + } + + @Override public String getPeerId() { + return peerId; + } + + @Override public long getTerm() { + return term; + } + + @Override public RpcRequests.TimeoutNowRequest build() { + return this; + } + + @Override public Builder setTerm(long term) { + this.term = term; + + return this; + } + + @Override public Builder setGroupId(String groupId) { + this.groupId = groupId; + + return this; + } + + @Override public Builder setServerId(String serverId) { + this.serverId = serverId; + + return this; + } + + @Override public Builder setPeerId(String peerId) { + this.peerId = peerId; + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java new file mode 100644 index 0000000..da441ff --- /dev/null +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java @@ -0,0 +1,36 @@ +package com.alipay.sofa.jraft.rpc.message; + +import com.alipay.sofa.jraft.rpc.RpcRequests; + +class TimeoutNowResponseImpl implements RpcRequests.TimeoutNowResponse, RpcRequests.TimeoutNowResponse.Builder { + private long term; + private boolean success; + + @Override public long getTerm() { + return term; + } + + @Override public boolean getSuccess() { + return success; + } + + @Override public RpcRequests.ErrorResponse getErrorResponse() { + return null; + } + + @Override public RpcRequests.TimeoutNowResponse build() { + return this; + } + + @Override public Builder setTerm(long currTerm) { + this.term = term; + + return this; + } + + @Override public Builder setSuccess(boolean success) { + this.success = success; + + return this; + } +} diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java index cf9bdce..25c06d7 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java @@ -6,26 +6,34 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -/** */ +/** + * + */ public class JDKMarshaller implements Marshaller { - /** {@inheritDoc} */ - @Override public byte[] marshall(Object o) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(o); - oos.close(); - - return baos.toByteArray(); + /** + * {@inheritDoc} + */ + @Override public byte[] marshall(Object o) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(o); + oos.close(); + return baos.toByteArray(); + } catch (Exception e) { + throw new Error(e); + } } - /** {@inheritDoc} */ - @Override public Object unmarshall(byte[] raw) throws IOException{ - ByteArrayInputStream bais = new ByteArrayInputStream(raw); - ObjectInputStream oos = new ObjectInputStream(bais); - + /** + * {@inheritDoc} + */ + @Override public <T> T unmarshall(byte[] raw) { try { - return oos.readObject(); - } catch (ClassNotFoundException e) { + ByteArrayInputStream bais = new ByteArrayInputStream(raw); + ObjectInputStream oos = new ObjectInputStream(bais); + return (T) oos.readObject(); + } catch (Exception e) { throw new Error(e); } } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java index 6f28493..54b07ed 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java @@ -5,7 +5,7 @@ import java.io.IOException; public interface Marshaller { public static Marshaller DEFAULT = new JDKMarshaller(); - byte[] marshall(Object o) throws IOException; + byte[] marshall(Object o); - <T> T unmarshall(byte[] raw) throws IOException; + <T> T unmarshall(byte[] raw); } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java index 54c18e9..d650a10 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java @@ -381,7 +381,7 @@ public final class Utils { final boolean isDir = file.isDirectory(); // can't fsync on windowns. if (isDir && Platform.isWindows()) { - LOG.warn("Unable to fsync directory {} on windows.", file); + // LOG.warn("Unable to fsync directory {} on windows.", file); return; } try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ diff --git a/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory b/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory index 8416bc1..54429b7 100644 --- a/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory +++ b/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory @@ -1 +1 @@ -com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory \ No newline at end of file +com.alipay.sofa.jraft.rpc.impl.LocalRaftRpcFactory \ No newline at end of file diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java new file mode 100644 index 0000000..f34ebed --- /dev/null +++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java @@ -0,0 +1,154 @@ +package com.alipay.sofa.jraft.rpc; + +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient; +import com.alipay.sofa.jraft.rpc.impl.LocalRpcServer; +import com.alipay.sofa.jraft.util.Endpoint; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * TODO add test for localconn.close, timeouts. + */ +public class LocalRpcTest { + private Endpoint endpoint; + private LocalRpcServer server; + + @Before + public void setup() { + endpoint = PeerId.parsePeer("localhost:1000").getEndpoint(); + server = new LocalRpcServer(endpoint); + server.registerProcessor(new Request1RpcProcessor()); + server.registerProcessor(new Request2RpcProcessor()); + server.init(null); + } + + @After + public void teardown() { + server.shutdown(); + + assertNull(LocalRpcServer.servers.get(endpoint)); + } + + @Test + public void testStartStopServer() { + assertNotNull(LocalRpcServer.servers.get(endpoint)); + } + + @Test + public void testConnection() { + LocalRpcClient client = new LocalRpcClient(); + + assertFalse(client.checkConnection(endpoint)); + + assertTrue(client.checkConnection(endpoint, true)); + } + + @Test + public void testSyncProcessing() throws RemotingException, InterruptedException { + RpcClient client = new LocalRpcClient(); + Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000); + assertNotNull(resp1); + + Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000); + assertNotNull(resp2); + } + + @Test + public void testAsyncProcessing() throws RemotingException, InterruptedException { + RpcClient client = new LocalRpcClient(); + + CountDownLatch l1 = new CountDownLatch(1); + AtomicReference<Response1> resp1 = new AtomicReference<>(); + client.invokeAsync(endpoint, new Request1(), new InvokeContext(), (result, err) -> { + resp1.set((Response1) result); + l1.countDown(); + }, 5000); + l1.await(5_000, TimeUnit.MILLISECONDS); + assertNotNull(resp1); + + CountDownLatch l2 = new CountDownLatch(1); + AtomicReference<Response2> resp2 = new AtomicReference<>(); + client.invokeAsync(endpoint, new Request2(), new InvokeContext(), (result, err) -> { + resp2.set((Response2) result); + l2.countDown(); + }, 5000); + l2.await(5_000, TimeUnit.MILLISECONDS); + assertNotNull(resp2); + } + + @Test + public void testDisconnect1() { + RpcClient client1 = new LocalRpcClient(); + RpcClient client2 = new LocalRpcClient(); + + assertTrue(client1.checkConnection(endpoint, true)); + assertTrue(client2.checkConnection(endpoint, true)); + + client1.shutdown(); + + assertFalse(client1.checkConnection(endpoint)); + assertTrue(client2.checkConnection(endpoint)); + + client2.shutdown(); + + assertFalse(client1.checkConnection(endpoint)); + assertFalse(client2.checkConnection(endpoint)); + } + + @Test + public void testDisconnect2() { + RpcClient client1 = new LocalRpcClient(); + RpcClient client2 = new LocalRpcClient(); + + assertTrue(client1.checkConnection(endpoint, true)); + assertTrue(client2.checkConnection(endpoint, true)); + + server.shutdown(); + + assertFalse(client1.checkConnection(endpoint)); + assertFalse(client2.checkConnection(endpoint)); + } + + private static class Request1RpcProcessor implements RpcProcessor<Request1> { + @Override public void handleRequest(RpcContext rpcCtx, Request1 request) { + rpcCtx.sendResponse(new Response1()); + } + + @Override public String interest() { + return Request1.class.getName(); + } + } + + private static class Request2RpcProcessor implements RpcProcessor<Request2> { + @Override public void handleRequest(RpcContext rpcCtx, Request2 request) { + rpcCtx.sendResponse(new Response2()); + } + + @Override public String interest() { + return Request2.class.getName(); + } + } + + private static class Request1 implements Message { + } + + private static class Request2 implements Message { + } + + private static class Response1 implements Message { + } + + private static class Response2 implements Message { + } +} diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java index 72b2638..e890721 100644 --- a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java +++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java @@ -26,9 +26,4 @@ public class LocalLogStorageTest extends BaseLogStorageTest { protected LogStorage newLogStorage() { return new LocalLogStorage(this.path, new RaftOptions()); } - - @Test - @Override public void testEmptyState() { - super.testEmptyState(); - } } diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java index d4bcc66..2639ea8 100644 --- a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java +++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java @@ -19,7 +19,6 @@ package com.alipay.sofa.jraft.storage.io; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import com.alipay.sofa.jraft.util.Utils; import java.io.File; import java.io.FileNotFoundException; import java.nio.ByteBuffer; diff --git a/modules/raft/src/test/resources/log4j2.xml b/modules/raft/src/test/resources/log4j2.xml new file mode 100644 index 0000000..90d27d7 --- /dev/null +++ b/modules/raft/src/test/resources/log4j2.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> +<Configuration status="WARN"> + + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/> + </Console> + + <!--<RollingFile name="RollingFile" filename="log/jraft-example.log"--> + <!--filepattern="log/%d{YYYYMMddHHmmss}-jraft-example.log">--> + <!--<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>--> + <!--<Policies>--> + <!--<SizeBasedTriggeringPolicy size="100 MB"/>--> + <!--</Policies>--> + <!--<DefaultRolloverStrategy max="20"/>--> + <!--</RollingFile>--> + + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + <AppenderRef ref="RollingFile"/> + </Root> + </Loggers> +</Configuration> \ No newline at end of file
