This is an automated email from the ASF dual-hosted git repository. ascherbakov pushed a commit to branch ignite-13885 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push: new e343b42 IGNITE-13885 Message recording. e343b42 is described below commit e343b42b97f28fb9fa624fea966c3b3f089b8c2f Author: Alexey Scherbakov <alexey.scherbak...@gmail.com> AuthorDate: Mon Feb 8 19:47:20 2021 +0300 IGNITE-13885 Message recording. --- .../java/com/alipay/sofa/jraft/rpc/RpcClient.java | 1 + .../sofa/jraft/rpc/impl/LocalConnection.java | 66 ++++++++- .../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java | 45 ++++-- .../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 5 +- .../java/com/alipay/sofa/jraft/util/Utils.java | 8 + .../java/com/alipay/sofa/jraft/core/NodeTest.java | 36 +++++ .../com/alipay/sofa/jraft/rpc/LocalRpcTest.java | 161 ++++++++++++++++++++- 7 files changed, 299 insertions(+), 23 deletions(-) diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java index f6ae218..33b0ccf 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java @@ -41,6 +41,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> { * @param endpoint target address * @param createIfAbsent create a new one if there is no connection * @return true if there is a connection and the connection is active and writable. + * TODO asch it probably should return com.alipay.sofa.jraft.rpc.Connection. */ boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent); diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java index 0607196..d1ea408 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java @@ -1,22 +1,72 @@ package com.alipay.sofa.jraft.rpc.impl; import com.alipay.sofa.jraft.rpc.Connection; +import com.alipay.sofa.jraft.rpc.Message; import com.alipay.sofa.jraft.util.Endpoint; import java.util.Collection; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; public class LocalConnection implements Connection { + private static boolean RECORD_ALL_MESSAGES = false; + private Map<String, Object> attrs = new ConcurrentHashMap<>(); - final LocalRpcClient client; - final Endpoint srv; + public final LocalRpcClient client; + public final LocalRpcServer srv; + + private volatile Predicate<Message> recordPred; + private volatile Predicate<Message> blockPred; + + private LinkedBlockingQueue<Object[]> blockedMsgs = new LinkedBlockingQueue<>(); + private LinkedBlockingQueue<Message> recordedMsgs = new LinkedBlockingQueue<>(); - public LocalConnection(LocalRpcClient client, Endpoint srv) { + public LocalConnection(LocalRpcClient client, LocalRpcServer srv) { this.client = client; this.srv = srv; } + public void recordMessages(Predicate<Message> pred) { + this.recordPred = pred; + } + + public void blockMessages(Predicate<Message> pred) { + this.blockPred = pred; + } + + private void send(Message request, Future fut) { + Object[] tuple = {client, request, fut}; + assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue. + } + + public void onBeforeRequestSend(Message request, Future fut) { + if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(request)) + recordedMsgs.add(request); + + if (blockPred != null && blockPred.test(request)) { + blockedMsgs.add(new Object[]{request, fut}); + + return; + } + + send(request, fut); + } + + public void sendBlocked() { + blockedMsgs.drainTo(srv.incoming); + } + + public void onAfterResponseSend(Message msg, Throwable err) { + assert err == null : err; + + if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg)) + recordedMsgs.add(msg); + } + @Override public Object getAttribute(String key) { return attrs.get(key); } @@ -30,6 +80,14 @@ public class LocalConnection implements Connection { } @Override public void close() { - LocalRpcServer.closeConnection(client, srv); + LocalRpcServer.closeConnection(client, srv.local); + } + + public Queue<Message> recordedMessages() { + return recordedMsgs; + } + + @Override public String toString() { + return client.toString() + " -> " + srv.local.toString(); } } diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java index 474e400..d00a5f7 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java @@ -21,18 +21,19 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.InvokeTimeoutException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.option.RpcOptions; -import com.alipay.sofa.jraft.rpc.Connection; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeContext; +import com.alipay.sofa.jraft.rpc.Message; import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.rpc.RpcUtils; import com.alipay.sofa.jraft.util.Endpoint; -import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Local rpc client impl. @@ -40,7 +41,11 @@ import java.util.function.BiConsumer; * @author ascherbakov. */ public class LocalRpcClient implements RpcClient { - private volatile ReplicatorGroup replicatorGroup = null; + private static final Logger LOG = LoggerFactory.getLogger(LocalRpcClient.class); + + public volatile ReplicatorGroup replicatorGroup = null; + + public static Consumer<LocalConnection> onCreated = null; @Override public boolean checkConnection(Endpoint endpoint) { return LocalRpcServer.connect(this, endpoint, false, null); @@ -61,12 +66,15 @@ public class LocalRpcClient implements RpcClient { private void onCreated(LocalConnection conn) { if (replicatorGroup != null) { final PeerId peer = new PeerId(); - if (peer.parse(conn.srv.toString())) { + if (peer.parse(conn.srv.local.toString())) { RpcUtils.runInThread(() -> replicatorGroup.checkReplicator(peer, true)); // Avoid deadlock. } else - System.out.println("Fail to parse peer: {}" + peer); // TODO asch + LOG.warn("Failed to parse peer: {}", peer); // TODO asch how to handle ? } + + if (onCreated != null) + onCreated.accept(conn); } @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException { @@ -77,13 +85,16 @@ public class LocalRpcClient implements RpcClient { if (srv == null) throw new RemotingException("Server is dead " + endpoint); - CompletableFuture fut = new CompletableFuture(); + LocalConnection locConn = srv.conns.get(this); + if (locConn == null) + throw new RemotingException("Server is dead " + endpoint); - Object[] tuple = {this, request, fut}; - assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue. + CompletableFuture<Object> fut = new CompletableFuture(); + + locConn.onBeforeRequestSend((Message) request, fut); try { - return fut.get(timeoutMs, TimeUnit.MILLISECONDS); + return fut.whenComplete((res, err) -> locConn.onAfterResponseSend((Message) res, err)).get(timeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw new RemotingException(e); } catch (TimeoutException e) { @@ -99,18 +110,22 @@ public class LocalRpcClient implements RpcClient { if (srv == null) throw new RemotingException("Server is dead " + endpoint); - CompletableFuture fut = new CompletableFuture(); + LocalConnection locConn = srv.conns.get(this); + if (locConn == null) + throw new RemotingException("Server is dead " + endpoint); + + CompletableFuture<Object> fut = new CompletableFuture<>(); - Object[] tuple = {this, request, fut}; - assert srv.incoming.offer(tuple); + locConn.onBeforeRequestSend((Message) request, fut); - fut.whenComplete((BiConsumer<Object, Throwable>) (res, err) -> { + fut.whenComplete((res, err) -> { + locConn.onAfterResponseSend((Message) res, err); RpcUtils.runInThread(() -> callback.complete(res, err)); // Avoid deadlocks if a closure has completed in the same thread. }).orTimeout(timeoutMs, TimeUnit.MILLISECONDS); } @Override public boolean init(RpcOptions opts) { - return false; + return true; } @Override public void shutdown() { diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java index a24427d..db9648a 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java @@ -65,6 +65,7 @@ public class LocalRpcServer implements RpcServer { BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that. + // TODO FIXME asch Or better use com.alipay.sofa.jraft.rpc.RpcUtils.RPC_CLOSURE_EXECUTOR ? private ExecutorService defaultExecutor; public LocalRpcServer(Endpoint local) { @@ -75,7 +76,7 @@ public class LocalRpcServer implements RpcServer { LocalRpcServer locSrv = servers.get(srv); if (locSrv == null) - return false; // Server is dead. + return false; // Server is not ready. LocalConnection conn = locSrv.conns.get(client); @@ -83,7 +84,7 @@ public class LocalRpcServer implements RpcServer { if (!createIfAbsent) return false; - conn = new LocalConnection(client, srv); + conn = new LocalConnection(client, locSrv); LocalConnection oldConn = locSrv.conns.putIfAbsent(client, conn); diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java index 586ec0a..4566076 100644 --- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java +++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java @@ -30,6 +30,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; +import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -161,6 +162,13 @@ public final class Utils { } /** + * Run a callable in thread pool,returns the future object. + */ + public static <V> Future<V> runInThread(final Callable<V> runnable) { + return CLOSURE_EXECUTOR.submit(runnable); + } + + /** * Run closure with status in thread pool. */ @SuppressWarnings("Convert2Lambda") diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 2aa3a84..dacbd8c 100644 --- a/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -16,6 +16,10 @@ */ package com.alipay.sofa.jraft.core; +import com.alipay.sofa.jraft.rpc.impl.LocalConnection; +import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient; +import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService; +import com.alipay.sofa.jraft.util.concurrent.ConcurrentHashSet; import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -3389,6 +3394,37 @@ public class NodeTest { } } + @Test + public void testClusterWithMessageRecording() throws Exception { + Set<LocalConnection> conns = new ConcurrentHashSet<>(); + + LocalRpcClient.onCreated = conn -> { + conns.add(conn); + conn.recordMessages(msg -> true); + }; + + final List<PeerId> peers = TestUtils.generatePeers(3); + final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); + + for (final PeerId peer : peers) { + assertTrue(cluster.start(peer.getEndpoint())); + } + + cluster.waitLeader(); + + Thread.sleep(2_000); + + NodeImpl leader = (NodeImpl) cluster.getLeader(); + DefaultRaftClientService rpcService = (DefaultRaftClientService) leader.getRpcService(); + LocalRpcClient localRpcClient = (LocalRpcClient) rpcService.getRpcClient(); + + List<LocalConnection> leaderConns = conns.stream().filter(loc -> loc.client == localRpcClient).collect(Collectors.toList()); + + assertEquals(2, leaderConns.size()); + + cluster.stopAll(); + } + private NodeOptions createNodeOptionsWithSharedTimer() { final NodeOptions options = new NodeOptions(); options.setSharedElectionTimer(true); diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java index f34ebed..f748e92 100644 --- a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java +++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java @@ -2,20 +2,25 @@ package com.alipay.sofa.jraft.rpc; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.rpc.impl.LocalConnection; import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient; import com.alipay.sofa.jraft.rpc.impl.LocalRpcServer; import com.alipay.sofa.jraft.util.Endpoint; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * TODO add test for localconn.close, timeouts. @@ -120,9 +125,155 @@ public class LocalRpcTest { assertFalse(client2.checkConnection(endpoint)); } + @Test + public void testRecordedSync() throws RemotingException, InterruptedException { + AtomicReference<LocalConnection> connRef = new AtomicReference<>(); + + LocalRpcClient.onCreated = conn -> { + connRef.set(conn); + conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1); + }; + + RpcClient client1 = new LocalRpcClient(); + + assertTrue(client1.checkConnection(endpoint, true)); + + Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500); + Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500); + + Queue<Message> recorded = connRef.get().recordedMessages(); + + assertEquals(2, recorded.size()); + assertTrue(recorded.poll() instanceof Request1); + assertTrue(recorded.poll() instanceof Response1); + } + + @Test + public void testRecordedSyncTimeout() throws RemotingException, InterruptedException { + AtomicReference<LocalConnection> connRef = new AtomicReference<>(); + + LocalRpcClient.onCreated = conn -> { + connRef.set(conn); + conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1); + }; + + RpcClient client1 = new LocalRpcClient(); + + assertTrue(client1.checkConnection(endpoint, true)); + + try { + Request1 request = new Request1(); + request.val = 10_000; + Response1 resp1 = (Response1) client1.invokeSync(endpoint, request, 500); + + fail(); + } catch (Exception e) { + // Expected. + } + + Queue<Message> recorded = connRef.get().recordedMessages(); + + assertEquals(1, recorded.size()); + assertTrue(recorded.poll() instanceof Request1); + } + + @Test + public void testRecordedAsync() throws RemotingException, InterruptedException { + AtomicReference<LocalConnection> connRef = new AtomicReference<>(); + + LocalRpcClient.onCreated = conn -> { + connRef.set(conn); + conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1); + }; + + RpcClient client1 = new LocalRpcClient(); + + assertTrue(client1.checkConnection(endpoint, true)); + + CountDownLatch l = new CountDownLatch(2); + + client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500); + client1.invokeAsync(endpoint, new Request2(), null, (result, err) -> l.countDown(), 500); + + l.await(); + + Queue<Message> recorded = connRef.get().recordedMessages(); + + assertEquals(2, recorded.size()); + assertTrue(recorded.poll() instanceof Request1); + assertTrue(recorded.poll() instanceof Response1); + } + + @Test + @Ignore + public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException { + AtomicReference<LocalConnection> connRef = new AtomicReference<>(); + + LocalRpcClient.onCreated = conn -> { + connRef.set(conn); + conn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1); + }; + + RpcClient client1 = new LocalRpcClient(); + + assertTrue(client1.checkConnection(endpoint, true)); + + try { + Request1 request = new Request1(); + request.val = 10_000; + CountDownLatch l = new CountDownLatch(1); + + // TODO asch invokeasync with timeout not working. + client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500); + + l.await(); + + fail(); + } catch (Exception e) { + // Expected. + } + + Queue<Message> recorded = connRef.get().recordedMessages(); + + assertEquals(1, recorded.size()); + assertTrue(recorded.poll() instanceof Request1); + } + + @Test + public void testBlockedSync() throws RemotingException, InterruptedException { +// RpcClient client1 = new LocalRpcClient(); +// +// assertTrue(client1.checkConnection(endpoint, true)); +// +// LocalConnection conn = LocalRpcServer.servers.get(endpoint).conns.get(client1); +// +// assertNotNull(conn); +// +// conn.recordMessages(msg -> msg instanceof Request1); +// +// Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500); +// +// assertEquals(1, resp2.val); +// +// Future<Response1> resp = Utils.runInThread(() -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000)); +// +// Thread.sleep(3_000); +// +// assertFalse(resp.isDone()); + } + private static class Request1RpcProcessor implements RpcProcessor<Request1> { @Override public void handleRequest(RpcContext rpcCtx, Request1 request) { - rpcCtx.sendResponse(new Response1()); + if (request.val == 10_000) + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // No-op. + } + + Response1 resp1 = new Response1(); + resp1.val = request.val + 1; + rpcCtx.sendResponse(resp1); } @Override public String interest() { @@ -132,7 +283,9 @@ public class LocalRpcTest { private static class Request2RpcProcessor implements RpcProcessor<Request2> { @Override public void handleRequest(RpcContext rpcCtx, Request2 request) { - rpcCtx.sendResponse(new Response2()); + Response2 resp2 = new Response2(); + resp2.val = request.val + 1; + rpcCtx.sendResponse(resp2); } @Override public String interest() { @@ -141,14 +294,18 @@ public class LocalRpcTest { } private static class Request1 implements Message { + int val; } private static class Request2 implements Message { + int val; } private static class Response1 implements Message { + int val; } private static class Response2 implements Message { + int val; } }