This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d77c5de5d71 [improve] [log] Print source client addr when enabled 
haProxyProtocolEnabled (#22686)
d77c5de5d71 is described below

commit d77c5de5d713043237773dc057caa1920134bfe3
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 11 01:27:52 2024 +0800

    [improve] [log] Print source client addr when enabled 
haProxyProtocolEnabled (#22686)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  2 +-
 .../org/apache/pulsar/broker/service/Producer.java |  2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 35 ++++++++++++++++++----
 .../broker/service/ServerCnxThrottleTracker.java   |  2 +-
 .../pulsar/broker/service/TopicListService.java    | 20 ++++++-------
 .../pulsar/common/protocol/PulsarDecoder.java      |  2 +-
 .../pulsar/common/protocol/PulsarHandler.java      | 31 ++++++++++++++-----
 7 files changed, 67 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index b1c3687b3a0..89a9bab497d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -934,7 +934,7 @@ public class Consumer {
     public String toString() {
         if (subscription != null && cnx != null) {
             return MoreObjects.toStringHelper(this).add("subscription", 
subscription).add("consumerId", consumerId)
-                    .add("consumerName", consumerName).add("address", 
this.cnx.clientAddress()).toString();
+                    .add("consumerName", consumerName).add("address", 
this.cnx.toString()).toString();
         } else {
             return MoreObjects.toStringHelper(this).add("consumerId", 
consumerId)
                     .add("consumerName", consumerName).toString();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 9cfde67802b..c10e33818ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -666,7 +666,7 @@ public class Producer {
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this).add("topic", 
topic).add("client", cnx.clientAddress())
+        return MoreObjects.toStringHelper(this).add("topic", 
topic).add("client", cnx.toString())
                 .add("producerName", producerName).add("producerId", 
producerId).toString();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5ccdbfbe715..59411aec040 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1201,7 +1201,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             remoteAddress, getPrincipal());
                 }
 
-                log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", 
this.ctx().channel().toString(),
+                log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", 
this.toString(),
                         topicName, subscriptionName, consumerId);
                 try {
                     Metadata.validateMetadata(metadata,
@@ -1921,7 +1921,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (log.isDebugEnabled()) {
                 log.debug("Consumer future is not complete(not complete or 
error), but received command ack. so discard"
                                 + " this command. consumerId: {}, cnx: {}, 
messageIdCount: {}", ack.getConsumerId(),
-                        this.ctx().channel().toString(), 
ack.getMessageIdsCount());
+                        this.toString(), ack.getMessageIdsCount());
             }
         }
     }
@@ -2267,7 +2267,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 @Override
                 public String toString() {
                     return String.format("ServerCnx [%s] get largest batch 
index when possible",
-                            ServerCnx.this.ctx.channel());
+                            ServerCnx.this.toString());
                 }
             }, null);
 
@@ -3301,7 +3301,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 }
             } catch (Throwable t) {
                 log.warn("[{}] [{}] Failed to remove TCP no-delay property on 
client cnx {}", topic, producerName,
-                        ctx.channel());
+                        this.toString());
             }
         }
     }
@@ -3364,6 +3364,31 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         return remoteAddress;
     }
 
+    /**
+     * Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038] 
[SR:/240.240.0.5:58038].
+     * L: local Address.
+     * R: remote address.
+     * SR: source remote address. It is the source address when enabled 
"haProxyProtocolEnabled".
+     */
+    @Override
+    public String toString() {
+        ChannelHandlerContext ctx = ctx();
+        // ctx.channel(): 96.
+        // clientSourceAddress: 5 + 46(ipv6).
+        // state: 19.
+        // Len = 166.
+        StringBuilder buf = new StringBuilder(166);
+        if (ctx == null) {
+            buf.append("[ctx: null]");
+        } else {
+            buf.append(ctx.channel().toString());
+        }
+        String clientSourceAddr = clientSourceAddress();
+        buf.append(" [SR:").append(clientSourceAddr == null ? "-" : 
clientSourceAddr)
+                .append(", state:").append(state).append("]");
+        return buf.toString();
+    }
+
     @Override
     public BrokerService getBrokerService() {
         return service;
@@ -3510,7 +3535,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     ctx.executor().schedule(() -> {
                         if (finalConnectionCheckInProgress == 
connectionCheckInProgress
                                 && !finalConnectionCheckInProgress.isDone()) {
-                            log.warn("[{}] Connection check timed out. Closing 
connection.", remoteAddress);
+                            log.warn("[{}] Connection check timed out. Closing 
connection.", this.toString());
                             ctx.close();
                         }
                     }, connectionLivenessCheckTimeoutMillis, 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
index f223d6eee37..7e55397022d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
@@ -87,7 +87,7 @@ final class ServerCnxThrottleTracker {
     private void changeAutoRead(boolean autoRead) {
         if (isChannelActive()) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Setting auto read to {}", 
serverCnx.ctx().channel(), autoRead);
+                log.debug("[{}] Setting auto read to {}", 
serverCnx.toString(), autoRead);
             }
             // change the auto read flag on the channel
             serverCnx.ctx().channel().config().setAutoRead(autoRead);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index aea5b9fc65b..b18286ee062 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -131,7 +131,7 @@ public class TopicListService {
             } else {
                 msg += "Pattern longer than maximum: " + 
maxSubscriptionPatternLength;
             }
-            log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), 
msg, namespaceName);
+            log.warn("[{}] {} on namespace {}", connection.toString(), msg, 
namespaceName);
             connection.getCommandSender().sendErrorResponse(requestId, 
ServerError.NotAllowedError, msg);
             lookupSemaphore.release();
             return;
@@ -144,14 +144,14 @@ public class TopicListService {
                 TopicListWatcher watcher = existingWatcherFuture.getNow(null);
                 log.info("[{}] Watcher with the same id is already created:"
                                 + " watcherId={}, watcher={}",
-                        connection.getRemoteAddress(), watcherId, watcher);
+                        connection.toString(), watcherId, watcher);
                 watcherFuture = existingWatcherFuture;
             } else {
                 // There was an early request to create a watcher with the 
same watcherId. This can happen when
                 // client timeout is lower the broker timeouts. We need to 
wait until the previous watcher
                 // creation request either completes or fails.
                 log.warn("[{}] Watcher with id is already present on the 
connection,"
-                        + " consumerId={}", connection.getRemoteAddress(), 
watcherId);
+                        + " consumerId={}", connection.toString(), watcherId);
                 ServerError error;
                 if (!existingWatcherFuture.isDone()) {
                     error = ServerError.ServiceNotReady;
@@ -179,14 +179,14 @@ public class TopicListService {
                     if (log.isDebugEnabled()) {
                         log.debug(
                                 "[{}] Received WatchTopicList for namespace 
[//{}] by {}",
-                                connection.getRemoteAddress(), namespaceName, 
requestId);
+                                connection.toString(), namespaceName, 
requestId);
                     }
                     
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, 
hash, topicList);
                     lookupSemaphore.release();
                 })
                 .exceptionally(ex -> {
                     log.warn("[{}] Error WatchTopicList for namespace [//{}] 
by {}",
-                            connection.getRemoteAddress(), namespaceName, 
requestId);
+                            connection.toString(), namespaceName, requestId);
                     connection.getCommandSender().sendErrorResponse(requestId,
                             BrokerServiceException.getClientErrorCode(
                                     new 
BrokerServiceException.ServerMetadataException(ex)), ex.getMessage());
@@ -213,7 +213,7 @@ public class TopicListService {
                     } else {
                         if (!watcherFuture.complete(watcher)) {
                             log.warn("[{}] Watcher future was already 
completed. Deregistering watcherId={}.",
-                                    connection.getRemoteAddress(), watcherId);
+                                    connection.toString(), watcherId);
                             
topicResources.deregisterPersistentTopicListener(watcher);
                         }
                     }
@@ -232,7 +232,7 @@ public class TopicListService {
         CompletableFuture<TopicListWatcher> watcherFuture = 
watchers.get(watcherId);
         if (watcherFuture == null) {
             log.info("[{}] TopicListWatcher was not registered on the 
connection: {}",
-                    watcherId, connection.getRemoteAddress());
+                    watcherId, connection.toString());
             return;
         }
 
@@ -242,14 +242,14 @@ public class TopicListService {
             // watcher future as failed and we can tell the client the close 
operation was successful. When the actual
             // create operation will complete, the new watcher will be 
discarded.
             log.info("[{}] Closed watcher before its creation was completed. 
watcherId={}",
-                    connection.getRemoteAddress(), watcherId);
+                    connection.toString(), watcherId);
             watchers.remove(watcherId);
             return;
         }
 
         if (watcherFuture.isCompletedExceptionally()) {
             log.info("[{}] Closed watcher that already failed to be created. 
watcherId={}",
-                    connection.getRemoteAddress(), watcherId);
+                    connection.toString(), watcherId);
             watchers.remove(watcherId);
             return;
         }
@@ -257,7 +257,7 @@ public class TopicListService {
         // Proceed with normal watcher close
         
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
         watchers.remove(watcherId);
-        log.info("[{}] Closed watcher, watcherId={}", 
connection.getRemoteAddress(), watcherId);
+        log.info("[{}] Closed watcher, watcherId={}", connection.toString(), 
watcherId);
     }
 
     /**
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c1c1ebe355b..c05b1d796df 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -122,7 +122,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
             cmd.parseFrom(buffer, cmdSize);
 
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Received cmd {}", 
ctx.channel().remoteAddress(), cmd.getType());
+                log.debug("[{}] Received cmd {}", ctx.channel(), 
cmd.getType());
             }
             messageReceived();
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 51cd61afd63..d5c741be01e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -67,7 +67,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
         this.ctx = ctx;
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Scheduling keep-alive task every {} s", 
ctx.channel(), keepAliveIntervalSeconds);
+            log.debug("[{}] Scheduling keep-alive task every {} s", 
this.toString(), keepAliveIntervalSeconds);
         }
         if (keepAliveIntervalSeconds > 0) {
             this.keepAliveTask = ctx.executor()
@@ -85,13 +85,13 @@ public abstract class PulsarHandler extends PulsarDecoder {
     protected final void handlePing(CommandPing ping) {
         // Immediately reply success to ping requests
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Replying back to ping message", ctx.channel());
+            log.debug("[{}] Replying back to ping message", this.toString());
         }
         ctx.writeAndFlush(Commands.newPong())
                 .addListener(future -> {
                     if (!future.isSuccess()) {
                         log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
-                                ctx.channel(), future.cause());
+                                toString(), future.cause());
                         ctx.close();
                     }
                 });
@@ -107,24 +107,24 @@ public abstract class PulsarHandler extends PulsarDecoder 
{
         }
 
         if (!isHandshakeCompleted()) {
-            log.warn("[{}] Pulsar Handshake was not completed within timeout, 
closing connection", ctx.channel());
+            log.warn("[{}] Pulsar Handshake was not completed within timeout, 
closing connection", this.toString());
             ctx.close();
         } else if (waitingForPingResponse && 
ctx.channel().config().isAutoRead()) {
             // We were waiting for a response and another keep-alive just 
completed.
             // If auto-read was disabled, it means we stopped reading from the 
connection, so we might receive the Ping
             // response later and thus not enforce the strict timeout here.
-            log.warn("[{}] Forcing connection to close after keep-alive 
timeout", ctx.channel());
+            log.warn("[{}] Forcing connection to close after keep-alive 
timeout", this.toString());
             ctx.close();
         } else if (getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v1.getValue()) {
             // Send keep alive probe to peer only if it supports the ping/pong 
commands, added in v1
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Sending ping message", ctx.channel());
+                log.debug("[{}] Sending ping message", this.toString());
             }
             waitingForPingResponse = true;
             sendPing();
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Peer doesn't support keep-alive", 
ctx.channel());
+                log.debug("[{}] Peer doesn't support keep-alive", 
this.toString());
             }
         }
     }
@@ -134,7 +134,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
                 .addListener(future -> {
                     if (!future.isSuccess()) {
                         log.warn("[{}] Forcing connection to close since 
cannot send a ping message.",
-                                ctx.channel(), future.cause());
+                                this.toString(), future.cause());
                         ctx.close();
                     }
                 });
@@ -152,5 +152,20 @@ public abstract class PulsarHandler extends PulsarDecoder {
      */
     protected abstract boolean isHandshakeCompleted();
 
+    /**
+     * Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038].
+     * L: local Address.
+     * R: remote address.
+     */
+    @Override
+    public String toString() {
+        ChannelHandlerContext ctx = this.ctx;
+        if (ctx == null) {
+            return "[ctx: null]";
+        } else {
+            return ctx.channel().toString();
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarHandler.class);
 }

Reply via email to