This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fbe87b6d1665f0f2a9be6842b07383cd5aad2fd2 Author: fengyubiao <[email protected]> AuthorDate: Sat May 11 01:38:39 2024 +0800 [improve] [log] Print source client addr when enabled haProxyProtocolEnabled (#22686) (cherry picked from commit d77c5de5d713043237773dc057caa1920134bfe3) --- .../org/apache/pulsar/broker/service/Consumer.java | 2 +- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../apache/pulsar/broker/service/ServerCnx.java | 8 +++--- .../pulsar/broker/service/TopicListService.java | 18 ++++++------- .../pulsar/common/protocol/PulsarDecoder.java | 2 +- .../pulsar/common/protocol/PulsarHandler.java | 31 ++++++++++++++++------ 6 files changed, 39 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index c61d4e47165..ecb21c57326 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -873,7 +873,7 @@ public class Consumer { public String toString() { if (subscription != null && cnx != null) { return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId) - .add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString(); + .add("consumerName", consumerName).add("address", this.cnx.toString()).toString(); } else { return MoreObjects.toStringHelper(this).add("consumerId", consumerId) .add("consumerName", consumerName).toString(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 6ad07a70a37..edd83635d37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -593,7 +593,7 @@ public class Producer { @Override public String toString() { - return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.clientAddress()) + return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.toString()) .add("producerName", producerName).add("producerId", producerId).toString(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 55cb71cb256..473dbecd0c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1069,7 +1069,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { remoteAddress, getPrincipal()); } - log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.ctx().channel().toString(), + log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.toString(), topicName, subscriptionName, consumerId); try { Metadata.validateMetadata(metadata, @@ -1680,7 +1680,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (log.isDebugEnabled()) { log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard" + " this command. consumerId: {}, cnx: {}, messageIdCount: {}", ack.getConsumerId(), - this.ctx().channel().toString(), ack.getMessageIdsCount()); + this.toString(), ack.getMessageIdsCount()); } } } @@ -3074,7 +3074,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } } catch (Throwable t) { log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", topic, producerName, - ctx.channel()); + this.toString()); } } } @@ -3270,7 +3270,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ctx.executor().schedule(() -> { if (finalConnectionCheckInProgress == connectionCheckInProgress && !finalConnectionCheckInProgress.isDone()) { - log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress); + log.warn("[{}] Connection check timed out. Closing connection.", this.toString()); ctx.close(); } }, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java index 9e13ad58039..92179731683 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -131,7 +131,7 @@ public class TopicListService { } else { msg += "Pattern longer than maximum: " + maxSubscriptionPatternLength; } - log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), msg, namespaceName); + log.warn("[{}] {} on namespace {}", connection.toString(), msg, namespaceName); connection.getCommandSender().sendErrorResponse(requestId, ServerError.NotAllowedError, msg); lookupSemaphore.release(); return; @@ -144,14 +144,14 @@ public class TopicListService { TopicListWatcher watcher = existingWatcherFuture.getNow(null); log.info("[{}] Watcher with the same id is already created:" + " watcherId={}, watcher={}", - connection.getRemoteAddress(), watcherId, watcher); + connection.toString(), watcherId, watcher); watcherFuture = existingWatcherFuture; } else { // There was an early request to create a watcher with the same watcherId. This can happen when // client timeout is lower the broker timeouts. We need to wait until the previous watcher // creation request either completes or fails. log.warn("[{}] Watcher with id is already present on the connection," - + " consumerId={}", connection.getRemoteAddress(), watcherId); + + " consumerId={}", connection.toString(), watcherId); ServerError error; if (!existingWatcherFuture.isDone()) { error = ServerError.ServiceNotReady; @@ -179,14 +179,14 @@ public class TopicListService { if (log.isDebugEnabled()) { log.debug( "[{}] Received WatchTopicList for namespace [//{}] by {}", - connection.getRemoteAddress(), namespaceName, requestId); + connection.toString(), namespaceName, requestId); } connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, hash, topicList); lookupSemaphore.release(); }) .exceptionally(ex -> { log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}", - connection.getRemoteAddress(), namespaceName, requestId); + connection.toString(), namespaceName, requestId); connection.getCommandSender().sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode( new BrokerServiceException.ServerMetadataException(ex)), ex.getMessage()); @@ -228,7 +228,7 @@ public class TopicListService { CompletableFuture<TopicListWatcher> watcherFuture = watchers.get(watcherId); if (watcherFuture == null) { log.info("[{}] TopicListWatcher was not registered on the connection: {}", - watcherId, connection.getRemoteAddress()); + watcherId, connection.toString()); return; } @@ -238,14 +238,14 @@ public class TopicListService { // watcher future as failed and we can tell the client the close operation was successful. When the actual // create operation will complete, the new watcher will be discarded. log.info("[{}] Closed watcher before its creation was completed. watcherId={}", - connection.getRemoteAddress(), watcherId); + connection.toString(), watcherId); watchers.remove(watcherId); return; } if (watcherFuture.isCompletedExceptionally()) { log.info("[{}] Closed watcher that already failed to be created. watcherId={}", - connection.getRemoteAddress(), watcherId); + connection.toString(), watcherId); watchers.remove(watcherId); return; } @@ -253,7 +253,7 @@ public class TopicListService { // Proceed with normal watcher close topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null)); watchers.remove(watcherId); - log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId); + log.info("[{}] Closed watcher, watcherId={}", connection.toString(), watcherId); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index d0b26103461..528c6ff65f1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -113,7 +113,7 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { cmd.parseFrom(buffer, cmdSize); if (log.isDebugEnabled()) { - log.debug("[{}] Received cmd {}", ctx.channel().remoteAddress(), cmd.getType()); + log.debug("[{}] Received cmd {}", ctx.channel(), cmd.getType()); } messageReceived(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index a507554eaaf..ad9ae33870e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -64,7 +64,7 @@ public abstract class PulsarHandler extends PulsarDecoder { this.ctx = ctx; if (log.isDebugEnabled()) { - log.debug("[{}] Scheduling keep-alive task every {} s", ctx.channel(), keepAliveIntervalSeconds); + log.debug("[{}] Scheduling keep-alive task every {} s", this.toString(), keepAliveIntervalSeconds); } if (keepAliveIntervalSeconds > 0) { this.keepAliveTask = ctx.executor() @@ -82,13 +82,13 @@ public abstract class PulsarHandler extends PulsarDecoder { protected final void handlePing(CommandPing ping) { // Immediately reply success to ping requests if (log.isDebugEnabled()) { - log.debug("[{}] Replying back to ping message", ctx.channel()); + log.debug("[{}] Replying back to ping message", this.toString()); } ctx.writeAndFlush(Commands.newPong()) .addListener(future -> { if (!future.isSuccess()) { log.warn("[{}] Forcing connection to close since cannot send a pong message.", - ctx.channel(), future.cause()); + toString(), future.cause()); ctx.close(); } }); @@ -104,24 +104,24 @@ public abstract class PulsarHandler extends PulsarDecoder { } if (!isHandshakeCompleted()) { - log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", ctx.channel()); + log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", this.toString()); ctx.close(); } else if (waitingForPingResponse && ctx.channel().config().isAutoRead()) { // We were waiting for a response and another keep-alive just completed. // If auto-read was disabled, it means we stopped reading from the connection, so we might receive the Ping // response later and thus not enforce the strict timeout here. - log.warn("[{}] Forcing connection to close after keep-alive timeout", ctx.channel()); + log.warn("[{}] Forcing connection to close after keep-alive timeout", this.toString()); ctx.close(); } else if (getRemoteEndpointProtocolVersion() >= ProtocolVersion.v1.getValue()) { // Send keep alive probe to peer only if it supports the ping/pong commands, added in v1 if (log.isDebugEnabled()) { - log.debug("[{}] Sending ping message", ctx.channel()); + log.debug("[{}] Sending ping message", this.toString()); } waitingForPingResponse = true; sendPing(); } else { if (log.isDebugEnabled()) { - log.debug("[{}] Peer doesn't support keep-alive", ctx.channel()); + log.debug("[{}] Peer doesn't support keep-alive", this.toString()); } } } @@ -131,7 +131,7 @@ public abstract class PulsarHandler extends PulsarDecoder { .addListener(future -> { if (!future.isSuccess()) { log.warn("[{}] Forcing connection to close since cannot send a ping message.", - ctx.channel(), future.cause()); + this.toString(), future.cause()); ctx.close(); } }); @@ -149,5 +149,20 @@ public abstract class PulsarHandler extends PulsarDecoder { */ protected abstract boolean isHandshakeCompleted(); + /** + * Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038]. + * L: local Address. + * R: remote address. + */ + @Override + public String toString() { + ChannelHandlerContext ctx = this.ctx; + if (ctx == null) { + return "[ctx: null]"; + } else { + return ctx.channel().toString(); + } + } + private static final Logger log = LoggerFactory.getLogger(PulsarHandler.class); }
