This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a5651f6e9 RATIS-2418. NettyClient LoggingHandler prints too many
messages. (#1359)
a5651f6e9 is described below
commit a5651f6e9a95f5b5ac44ff01363b7de1c8b4c026
Author: slfan1989 <[email protected]>
AuthorDate: Fri Feb 27 02:20:21 2026 +0800
RATIS-2418. NettyClient LoggingHandler prints too many messages. (#1359)
---
.../java/org/apache/ratis/netty/NettyRpcProxy.java | 43 ++++++++++++++++------
.../test/java/org/apache/ratis/RaftBasicTests.java | 29 ++++++++-------
.../org/apache/ratis/netty/TestNettyRpcProxy.java | 13 ++++++-
.../apache/ratis/netty/TestRaftAsyncWithNetty.java | 8 ++++
4 files changed, 67 insertions(+), 26 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index cfcabc274..e72d6c677 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -156,7 +155,15 @@ public class NettyRpcProxy implements Closeable {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
RaftNettyServerReplyProto proto) {
- final CompletableFuture<RaftNettyServerReplyProto> future =
getReplyFuture(getCallId(proto), null);
+ final long callId = getCallId(proto);
+ final CompletableFuture<RaftNettyServerReplyProto> future =
getReplyFuture(callId, null, "reply");
+ if (future == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring reply for callId={} from {} (no outstanding
request, outstanding={})",
+ callId, peer, replies.size());
+ }
+ return;
+ }
if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {
final Object ioe =
ProtoUtils.toObject(proto.getExceptionReply().getException());
future.completeExceptionally((IOException)ioe);
@@ -183,7 +190,8 @@ public class NettyRpcProxy implements Closeable {
protected void initChannel(SocketChannel ch) {
final ChannelPipeline p = ch.pipeline();
- p.addLast(new LoggingHandler(LogLevel.WARN));
+ // LoggingHandler emits all events at the chosen level; use DEBUG to
reduce noise by default.
+ p.addLast(new LoggingHandler(LogLevel.DEBUG));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new
ProtobufDecoder(RaftNettyServerReplyProto.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
@@ -197,9 +205,12 @@ public class NettyRpcProxy implements Closeable {
}
private CompletableFuture<RaftNettyServerReplyProto> getReplyFuture(long
callId,
- CompletableFuture<RaftNettyServerReplyProto> expected) {
+ CompletableFuture<RaftNettyServerReplyProto> expected, String reason) {
final CompletableFuture<RaftNettyServerReplyProto> removed =
replies.remove(callId);
- Objects.requireNonNull(removed, () -> "Request #" + callId + " not
found");
+ if (removed == null && LOG.isDebugEnabled()) {
+ LOG.debug("Request {} not found for callId={} from {} (reason={},
outstanding={})",
+ expected == null ? "future" : "reply", callId, peer, reason,
replies.size());
+ }
if (expected != null) {
Preconditions.assertSame(expected, removed, "removed");
}
@@ -207,23 +218,30 @@ public class NettyRpcProxy implements Closeable {
}
synchronized CompletableFuture<RaftNettyServerReplyProto>
offer(RaftNettyServerRequestProto request) {
+ final CompletableFuture<RaftNettyServerReplyProto> reply = new
CompletableFuture<>();
+ final long callId = getRequest(request).getCallId();
+ final CompletableFuture<RaftNettyServerReplyProto> previous =
replies.put(callId, reply);
+ Preconditions.assertNull(previous, "previous");
+
final ChannelFuture future;
try {
future = client.writeAndFlush(request);
} catch (AlreadyClosedException e) {
+ replies.remove(callId, reply);
return JavaUtils.completeExceptionally(e);
}
- final CompletableFuture<RaftNettyServerReplyProto> reply = new
CompletableFuture<>();
- final long callId = getRequest(request).getCallId();
- final CompletableFuture<RaftNettyServerReplyProto> previous =
replies.put(callId, reply);
- Preconditions.assertNull(previous, "previous");
-
future.addListener(cf -> {
if (!cf.isSuccess()) {
// Remove from queue on async write failure to prevent reply
mismatch.
// Only complete exceptionally if removal succeeds (not already
polled).
- getReplyFuture(callId, reply).completeExceptionally(cf.cause());
+ final CompletableFuture<RaftNettyServerReplyProto> removed =
+ getReplyFuture(callId, reply, "write-failure");
+ if (removed != null) {
+ removed.completeExceptionally(cf.cause());
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Write failed for callId={} to {} after request
removed", callId, peer, cf.cause());
+ }
client.close();
}
});
@@ -240,6 +258,9 @@ public class NettyRpcProxy implements Closeable {
if (!replies.isEmpty()) {
LOG.warn("Still have {} requests outstanding from {} connection: {}",
replies.size(), peer, cause.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Outstanding request ids from {}: {}", peer,
replies.keySet());
+ }
replies.values().forEach(f -> f.completeExceptionally(cause));
replies.clear();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f1319cde7..c71b57e82 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -318,10 +318,10 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
}
} catch(Exception t) {
if (exceptionInClientThread.compareAndSet(null, t)) {
- log.error(this + " failed", t);
+ log.error("{} failed", this, t);
} else {
exceptionInClientThread.get().addSuppressed(t);
- log.error(this + " failed again!", t);
+ log.error("{} failed again!", this, t);
}
} finally {
isRunning.set(false);
@@ -347,8 +347,8 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
static void testWithLoad(final int numClients, final int numMessages,
boolean useAsync, MiniRaftCluster cluster, Logger log) throws Exception {
- log.info("Running testWithLoad: numClients=" + numClients
- + ", numMessages=" + numMessages + ", async=" + useAsync);
+ log.info("Running testWithLoad: numClients={}, numMessages={}, async={}",
+ numClients, numMessages, useAsync);
waitForLeader(cluster);
@@ -364,24 +364,25 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
@Override
public void run() {
- log.info(cluster.printServers());
- log.info(BlockRequestHandlingInjection.getInstance().toString());
- log.info(cluster.toString());
- clients.forEach(c -> log.info(" " + c));
- JavaUtils.dumpAllThreads(s -> log.info(s));
-
final int last = lastStep.get();
if (last != previousLastStep.get()) {
previousLastStep.set(last);
} else {
+ // Only dump cluster/client state when no progress is detected to
reduce log noise.
+ log.info(cluster.printServers());
+ log.info(BlockRequestHandlingInjection.getInstance().toString());
+ log.info(cluster.toString());
+ clients.forEach(c -> log.info(" {}", c));
+ JavaUtils.dumpAllThreads(s -> log.info(s));
+
final RaftServer.Division leader = cluster.getLeader();
- log.info("NO PROGRESS at " + last + ", try to restart leader=" +
leader);
+ log.info("NO PROGRESS at {}, try to restart leader={}", last,
leader);
if (leader != null) {
try {
cluster.restartServer(leader.getId(), false);
- log.info("Restarted leader=" + leader);
+ log.info("Restarted leader={}", leader);
} catch (IOException e) {
- log.error("Failed to restart leader=" + leader);
+ log.error("Failed to restart leader={}", leader);
}
}
}
@@ -415,7 +416,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
log.error("Failed to change leader ", e);
}
}
- log.info("Leader change count=" + count);
+ log.info("Leader change count={}", count);
timer.cancel();
for(Client4TestWithLoad c : clients) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
index 780c249b6..75d471061 100644
--- a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
@@ -21,6 +21,8 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
@@ -78,8 +80,17 @@ public class TestNettyRpcProxy extends BaseTest {
// Close to force AlreadyClosedException on write and trigger rollback
logic.
proxy.close();
+ final RaftRpcRequestProto rpcRequest = RaftRpcRequestProto.newBuilder()
+ .setCallId(1)
+ .build();
+ final GroupListRequestProto groupListRequest =
GroupListRequestProto.newBuilder()
+ .setRpcRequest(rpcRequest)
+ .build();
+ final RaftNettyServerRequestProto request =
RaftNettyServerRequestProto.newBuilder()
+ .setGroupListRequest(groupListRequest)
+ .build();
final CompletableFuture<RaftNettyServerReplyProto> reply =
- proxy.sendAsync(RaftNettyServerRequestProto.getDefaultInstance());
+ proxy.sendAsync(request);
// Ensure the future completes exceptionally with AlreadyClosedException.
final Throwable thrown = assertThrows(CompletionException.class,
reply::join);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
index d2f23e143..c3e19a26e 100644
---
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
@@ -18,13 +18,21 @@
package org.apache.ratis.netty;
import org.apache.ratis.RaftAsyncTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.Slf4jUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.slf4j.event.Level;
@Timeout(100)
public class TestRaftAsyncWithNetty
extends RaftAsyncTests<MiniRaftClusterWithNetty>
implements MiniRaftClusterWithNetty.FactoryGet {
+ {
+ Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.INFO);
+ Slf4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+ }
@Override
@Test