This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a5651f6e9 RATIS-2418. NettyClient LoggingHandler prints too many 
messages. (#1359)
a5651f6e9 is described below

commit a5651f6e9a95f5b5ac44ff01363b7de1c8b4c026
Author: slfan1989 <[email protected]>
AuthorDate: Fri Feb 27 02:20:21 2026 +0800

    RATIS-2418. NettyClient LoggingHandler prints too many messages. (#1359)
---
 .../java/org/apache/ratis/netty/NettyRpcProxy.java | 43 ++++++++++++++++------
 .../test/java/org/apache/ratis/RaftBasicTests.java | 29 ++++++++-------
 .../org/apache/ratis/netty/TestNettyRpcProxy.java  | 13 ++++++-
 .../apache/ratis/netty/TestRaftAsyncWithNetty.java |  8 ++++
 4 files changed, 67 insertions(+), 26 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index cfcabc274..e72d6c677 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -156,7 +155,15 @@ public class NettyRpcProxy implements Closeable {
         @Override
         protected void channelRead0(ChannelHandlerContext ctx,
                                     RaftNettyServerReplyProto proto) {
-          final CompletableFuture<RaftNettyServerReplyProto> future = 
getReplyFuture(getCallId(proto), null);
+          final long callId = getCallId(proto);
+          final CompletableFuture<RaftNettyServerReplyProto> future = 
getReplyFuture(callId, null, "reply");
+          if (future == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Ignoring reply for callId={} from {} (no outstanding 
request, outstanding={})",
+                  callId, peer, replies.size());
+            }
+            return;
+          }
           if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {
             final Object ioe = 
ProtoUtils.toObject(proto.getExceptionReply().getException());
             future.completeExceptionally((IOException)ioe);
@@ -183,7 +190,8 @@ public class NettyRpcProxy implements Closeable {
         protected void initChannel(SocketChannel ch) {
           final ChannelPipeline p = ch.pipeline();
 
-          p.addLast(new LoggingHandler(LogLevel.WARN));
+          // LoggingHandler emits all events at the chosen level; use DEBUG to 
reduce noise by default.
+          p.addLast(new LoggingHandler(LogLevel.DEBUG));
           p.addLast(new ProtobufVarint32FrameDecoder());
           p.addLast(new 
ProtobufDecoder(RaftNettyServerReplyProto.getDefaultInstance()));
           p.addLast(new ProtobufVarint32LengthFieldPrepender());
@@ -197,9 +205,12 @@ public class NettyRpcProxy implements Closeable {
     }
 
     private CompletableFuture<RaftNettyServerReplyProto> getReplyFuture(long 
callId,
-        CompletableFuture<RaftNettyServerReplyProto> expected) {
+        CompletableFuture<RaftNettyServerReplyProto> expected, String reason) {
       final CompletableFuture<RaftNettyServerReplyProto> removed = 
replies.remove(callId);
-      Objects.requireNonNull(removed, () -> "Request #" + callId + " not 
found");
+      if (removed == null && LOG.isDebugEnabled()) {
+        LOG.debug("Request {} not found for callId={} from {} (reason={}, 
outstanding={})",
+            expected == null ? "future" : "reply", callId, peer, reason, 
replies.size());
+      }
       if (expected != null) {
         Preconditions.assertSame(expected, removed, "removed");
       }
@@ -207,23 +218,30 @@ public class NettyRpcProxy implements Closeable {
     }
 
     synchronized CompletableFuture<RaftNettyServerReplyProto> 
offer(RaftNettyServerRequestProto request) {
+      final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
+      final long callId = getRequest(request).getCallId();
+      final CompletableFuture<RaftNettyServerReplyProto> previous = 
replies.put(callId, reply);
+      Preconditions.assertNull(previous, "previous");
+
       final ChannelFuture future;
       try {
         future = client.writeAndFlush(request);
       } catch (AlreadyClosedException e) {
+        replies.remove(callId, reply);
         return JavaUtils.completeExceptionally(e);
       }
 
-      final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
-      final long callId = getRequest(request).getCallId();
-      final CompletableFuture<RaftNettyServerReplyProto> previous = 
replies.put(callId, reply);
-      Preconditions.assertNull(previous, "previous");
-
       future.addListener(cf -> {
         if (!cf.isSuccess()) {
           // Remove from queue on async write failure to prevent reply 
mismatch.
           // Only complete exceptionally if removal succeeds (not already 
polled).
-          getReplyFuture(callId, reply).completeExceptionally(cf.cause());
+          final CompletableFuture<RaftNettyServerReplyProto> removed =
+              getReplyFuture(callId, reply, "write-failure");
+          if (removed != null) {
+            removed.completeExceptionally(cf.cause());
+          } else if (LOG.isDebugEnabled()) {
+            LOG.debug("Write failed for callId={} to {} after request 
removed", callId, peer, cf.cause());
+          }
           client.close();
         }
       });
@@ -240,6 +258,9 @@ public class NettyRpcProxy implements Closeable {
       if (!replies.isEmpty()) {
         LOG.warn("Still have {} requests outstanding from {} connection: {}",
             replies.size(), peer, cause.toString());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Outstanding request ids from {}: {}", peer, 
replies.keySet());
+        }
         replies.values().forEach(f -> f.completeExceptionally(cause));
         replies.clear();
       }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f1319cde7..c71b57e82 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -318,10 +318,10 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
         }
       } catch(Exception t) {
         if (exceptionInClientThread.compareAndSet(null, t)) {
-          log.error(this + " failed", t);
+          log.error("{} failed", this, t);
         } else {
           exceptionInClientThread.get().addSuppressed(t);
-          log.error(this + " failed again!", t);
+          log.error("{} failed again!", this, t);
         }
       } finally {
         isRunning.set(false);
@@ -347,8 +347,8 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
   static void testWithLoad(final int numClients, final int numMessages,
       boolean useAsync, MiniRaftCluster cluster, Logger log) throws Exception {
-    log.info("Running testWithLoad: numClients=" + numClients
-        + ", numMessages=" + numMessages + ", async=" + useAsync);
+    log.info("Running testWithLoad: numClients={}, numMessages={}, async={}",
+        numClients, numMessages, useAsync);
 
     waitForLeader(cluster);
 
@@ -364,24 +364,25 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
       @Override
       public void run() {
-        log.info(cluster.printServers());
-        log.info(BlockRequestHandlingInjection.getInstance().toString());
-        log.info(cluster.toString());
-        clients.forEach(c -> log.info("  " + c));
-        JavaUtils.dumpAllThreads(s -> log.info(s));
-
         final int last = lastStep.get();
         if (last != previousLastStep.get()) {
           previousLastStep.set(last);
         } else {
+          // Only dump cluster/client state when no progress is detected to 
reduce log noise.
+          log.info(cluster.printServers());
+          log.info(BlockRequestHandlingInjection.getInstance().toString());
+          log.info(cluster.toString());
+          clients.forEach(c -> log.info("  {}", c));
+          JavaUtils.dumpAllThreads(s -> log.info(s));
+
           final RaftServer.Division leader = cluster.getLeader();
-          log.info("NO PROGRESS at " + last + ", try to restart leader=" + 
leader);
+          log.info("NO PROGRESS at {}, try to restart leader={}", last, 
leader);
           if (leader != null) {
             try {
               cluster.restartServer(leader.getId(), false);
-              log.info("Restarted leader=" + leader);
+              log.info("Restarted leader={}", leader);
             } catch (IOException e) {
-              log.error("Failed to restart leader=" + leader);
+              log.error("Failed to restart leader={}", leader);
             }
           }
         }
@@ -415,7 +416,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
         log.error("Failed to change leader ", e);
       }
     }
-    log.info("Leader change count=" + count);
+    log.info("Leader change count={}", count);
     timer.cancel();
 
     for(Client4TestWithLoad c : clients) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
index 780c249b6..75d471061 100644
--- a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
@@ -21,6 +21,8 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
@@ -78,8 +80,17 @@ public class TestNettyRpcProxy extends BaseTest {
 
       // Close to force AlreadyClosedException on write and trigger rollback 
logic.
       proxy.close();
+      final RaftRpcRequestProto rpcRequest = RaftRpcRequestProto.newBuilder()
+          .setCallId(1)
+          .build();
+      final GroupListRequestProto groupListRequest = 
GroupListRequestProto.newBuilder()
+          .setRpcRequest(rpcRequest)
+          .build();
+      final RaftNettyServerRequestProto request = 
RaftNettyServerRequestProto.newBuilder()
+          .setGroupListRequest(groupListRequest)
+          .build();
       final CompletableFuture<RaftNettyServerReplyProto> reply =
-          proxy.sendAsync(RaftNettyServerRequestProto.getDefaultInstance());
+          proxy.sendAsync(request);
 
       // Ensure the future completes exceptionally with AlreadyClosedException.
       final Throwable thrown = assertThrows(CompletionException.class, 
reply::join);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
index d2f23e143..c3e19a26e 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
@@ -18,13 +18,21 @@
 package org.apache.ratis.netty;
 
 import org.apache.ratis.RaftAsyncTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.Slf4jUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.event.Level;
 
 @Timeout(100)
 public class TestRaftAsyncWithNetty
     extends RaftAsyncTests<MiniRaftClusterWithNetty>
     implements MiniRaftClusterWithNetty.FactoryGet {
+  {
+    Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.INFO);
+    Slf4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
 
   @Override
   @Test

Reply via email to