This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4a0d58f2a83f6c190286d40094f60d8b23dc5514 Author: fengyubiao <[email protected]> AuthorDate: Sat May 11 01:36:30 2024 +0800 Revert "[improve] [log] Print source client addr when enabled haProxyProtocolEnabled (#22686)" This reverts commit 42f9774c4f77b77488f5e4922823fe69338d20ff. --- .../org/apache/pulsar/broker/service/Consumer.java | 2 +- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../apache/pulsar/broker/service/ServerCnx.java | 8 ++--- .../pulsar/broker/service/TopicListService.java | 18 +++++----- .../pulsar/common/protocol/PulsarDecoder.java | 2 +- .../pulsar/common/protocol/PulsarHandler.java | 39 ++++------------------ 6 files changed, 23 insertions(+), 48 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index ecb21c57326..c61d4e47165 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -873,7 +873,7 @@ public class Consumer { public String toString() { if (subscription != null && cnx != null) { return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId) - .add("consumerName", consumerName).add("address", this.cnx.toString()).toString(); + .add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString(); } else { return MoreObjects.toStringHelper(this).add("consumerId", consumerId) .add("consumerName", consumerName).toString(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index edd83635d37..6ad07a70a37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -593,7 +593,7 @@ public class Producer { @Override public String toString() { - return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.toString()) + return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.clientAddress()) .add("producerName", producerName).add("producerId", producerId).toString(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 473dbecd0c7..55cb71cb256 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1069,7 +1069,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { remoteAddress, getPrincipal()); } - log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.toString(), + log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.ctx().channel().toString(), topicName, subscriptionName, consumerId); try { Metadata.validateMetadata(metadata, @@ -1680,7 +1680,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (log.isDebugEnabled()) { log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard" + " this command. consumerId: {}, cnx: {}, messageIdCount: {}", ack.getConsumerId(), - this.toString(), ack.getMessageIdsCount()); + this.ctx().channel().toString(), ack.getMessageIdsCount()); } } } @@ -3074,7 +3074,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } } catch (Throwable t) { log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", topic, producerName, - this.toString()); + ctx.channel()); } } } @@ -3270,7 +3270,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ctx.executor().schedule(() -> { if (finalConnectionCheckInProgress == connectionCheckInProgress && !finalConnectionCheckInProgress.isDone()) { - log.warn("[{}] Connection check timed out. Closing connection.", this.toString()); + log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress); ctx.close(); } }, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java index 92179731683..9e13ad58039 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -131,7 +131,7 @@ public class TopicListService { } else { msg += "Pattern longer than maximum: " + maxSubscriptionPatternLength; } - log.warn("[{}] {} on namespace {}", connection.toString(), msg, namespaceName); + log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), msg, namespaceName); connection.getCommandSender().sendErrorResponse(requestId, ServerError.NotAllowedError, msg); lookupSemaphore.release(); return; @@ -144,14 +144,14 @@ public class TopicListService { TopicListWatcher watcher = existingWatcherFuture.getNow(null); log.info("[{}] Watcher with the same id is already created:" + " watcherId={}, watcher={}", - connection.toString(), watcherId, watcher); + connection.getRemoteAddress(), watcherId, watcher); watcherFuture = existingWatcherFuture; } else { // There was an early request to create a watcher with the same watcherId. This can happen when // client timeout is lower the broker timeouts. We need to wait until the previous watcher // creation request either completes or fails. log.warn("[{}] Watcher with id is already present on the connection," - + " consumerId={}", connection.toString(), watcherId); + + " consumerId={}", connection.getRemoteAddress(), watcherId); ServerError error; if (!existingWatcherFuture.isDone()) { error = ServerError.ServiceNotReady; @@ -179,14 +179,14 @@ public class TopicListService { if (log.isDebugEnabled()) { log.debug( "[{}] Received WatchTopicList for namespace [//{}] by {}", - connection.toString(), namespaceName, requestId); + connection.getRemoteAddress(), namespaceName, requestId); } connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, hash, topicList); lookupSemaphore.release(); }) .exceptionally(ex -> { log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}", - connection.toString(), namespaceName, requestId); + connection.getRemoteAddress(), namespaceName, requestId); connection.getCommandSender().sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode( new BrokerServiceException.ServerMetadataException(ex)), ex.getMessage()); @@ -228,7 +228,7 @@ public class TopicListService { CompletableFuture<TopicListWatcher> watcherFuture = watchers.get(watcherId); if (watcherFuture == null) { log.info("[{}] TopicListWatcher was not registered on the connection: {}", - watcherId, connection.toString()); + watcherId, connection.getRemoteAddress()); return; } @@ -238,14 +238,14 @@ public class TopicListService { // watcher future as failed and we can tell the client the close operation was successful. When the actual // create operation will complete, the new watcher will be discarded. log.info("[{}] Closed watcher before its creation was completed. watcherId={}", - connection.toString(), watcherId); + connection.getRemoteAddress(), watcherId); watchers.remove(watcherId); return; } if (watcherFuture.isCompletedExceptionally()) { log.info("[{}] Closed watcher that already failed to be created. watcherId={}", - connection.toString(), watcherId); + connection.getRemoteAddress(), watcherId); watchers.remove(watcherId); return; } @@ -253,7 +253,7 @@ public class TopicListService { // Proceed with normal watcher close topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null)); watchers.remove(watcherId); - log.info("[{}] Closed watcher, watcherId={}", connection.toString(), watcherId); + log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index 528c6ff65f1..d0b26103461 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -113,7 +113,7 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { cmd.parseFrom(buffer, cmdSize); if (log.isDebugEnabled()) { - log.debug("[{}] Received cmd {}", ctx.channel(), cmd.getType()); + log.debug("[{}] Received cmd {}", ctx.channel().remoteAddress(), cmd.getType()); } messageReceived(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index a84a67ba08f..a507554eaaf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -64,7 +64,7 @@ public abstract class PulsarHandler extends PulsarDecoder { this.ctx = ctx; if (log.isDebugEnabled()) { - log.debug("[{}] Scheduling keep-alive task every {} s", this.toString(), keepAliveIntervalSeconds); + log.debug("[{}] Scheduling keep-alive task every {} s", ctx.channel(), keepAliveIntervalSeconds); } if (keepAliveIntervalSeconds > 0) { this.keepAliveTask = ctx.executor() @@ -82,13 +82,13 @@ public abstract class PulsarHandler extends PulsarDecoder { protected final void handlePing(CommandPing ping) { // Immediately reply success to ping requests if (log.isDebugEnabled()) { - log.debug("[{}] Replying back to ping message", this.toString()); + log.debug("[{}] Replying back to ping message", ctx.channel()); } ctx.writeAndFlush(Commands.newPong()) .addListener(future -> { if (!future.isSuccess()) { log.warn("[{}] Forcing connection to close since cannot send a pong message.", - toString(), future.cause()); + ctx.channel(), future.cause()); ctx.close(); } }); @@ -104,31 +104,30 @@ public abstract class PulsarHandler extends PulsarDecoder { } if (!isHandshakeCompleted()) { - log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", this.toString()); + log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", ctx.channel()); ctx.close(); } else if (waitingForPingResponse && ctx.channel().config().isAutoRead()) { // We were waiting for a response and another keep-alive just completed. // If auto-read was disabled, it means we stopped reading from the connection, so we might receive the Ping // response later and thus not enforce the strict timeout here. - log.warn("[{}] Forcing connection to close after keep-alive timeout", this.toString()); + log.warn("[{}] Forcing connection to close after keep-alive timeout", ctx.channel()); ctx.close(); } else if (getRemoteEndpointProtocolVersion() >= ProtocolVersion.v1.getValue()) { // Send keep alive probe to peer only if it supports the ping/pong commands, added in v1 if (log.isDebugEnabled()) { - log.debug("[{}] Sending ping message", this.toString()); + log.debug("[{}] Sending ping message", ctx.channel()); } waitingForPingResponse = true; sendPing(); } else { if (log.isDebugEnabled()) { - log.debug("[{}] Peer doesn't support keep-alive", this.toString()); + log.debug("[{}] Peer doesn't support keep-alive", ctx.channel()); } } } protected ChannelFuture sendPing() { return ctx.writeAndFlush(Commands.newPing()) -<<<<<<< HEAD .addListener(future -> { if (!future.isSuccess()) { log.warn("[{}] Forcing connection to close since cannot send a ping message.", @@ -136,15 +135,6 @@ public abstract class PulsarHandler extends PulsarDecoder { ctx.close(); } }); -======= - .addListener(future -> { - if (!future.isSuccess()) { - log.warn("[{}] Forcing connection to close since cannot send a ping message.", - this.toString(), future.cause()); - ctx.close(); - } - }); ->>>>>>> d77c5de5d7 ([improve] [log] Print source client addr when enabled haProxyProtocolEnabled (#22686)) } public void cancelKeepAliveTask() { @@ -159,20 +149,5 @@ public abstract class PulsarHandler extends PulsarDecoder { */ protected abstract boolean isHandshakeCompleted(); - /** - * Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038]. - * L: local Address. - * R: remote address. - */ - @Override - public String toString() { - ChannelHandlerContext ctx = this.ctx; - if (ctx == null) { - return "[ctx: null]"; - } else { - return ctx.channel().toString(); - } - } - private static final Logger log = LoggerFactory.getLogger(PulsarHandler.class); }
