This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d77c5de5d71 [improve] [log] Print source client addr when enabled
haProxyProtocolEnabled (#22686)
d77c5de5d71 is described below
commit d77c5de5d713043237773dc057caa1920134bfe3
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 11 01:27:52 2024 +0800
[improve] [log] Print source client addr when enabled
haProxyProtocolEnabled (#22686)
---
.../org/apache/pulsar/broker/service/Consumer.java | 2 +-
.../org/apache/pulsar/broker/service/Producer.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 35 ++++++++++++++++++----
.../broker/service/ServerCnxThrottleTracker.java | 2 +-
.../pulsar/broker/service/TopicListService.java | 20 ++++++-------
.../pulsar/common/protocol/PulsarDecoder.java | 2 +-
.../pulsar/common/protocol/PulsarHandler.java | 31 ++++++++++++++-----
7 files changed, 67 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index b1c3687b3a0..89a9bab497d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -934,7 +934,7 @@ public class Consumer {
public String toString() {
if (subscription != null && cnx != null) {
return MoreObjects.toStringHelper(this).add("subscription",
subscription).add("consumerId", consumerId)
- .add("consumerName", consumerName).add("address",
this.cnx.clientAddress()).toString();
+ .add("consumerName", consumerName).add("address",
this.cnx.toString()).toString();
} else {
return MoreObjects.toStringHelper(this).add("consumerId",
consumerId)
.add("consumerName", consumerName).toString();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 9cfde67802b..c10e33818ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -666,7 +666,7 @@ public class Producer {
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("topic",
topic).add("client", cnx.clientAddress())
+ return MoreObjects.toStringHelper(this).add("topic",
topic).add("client", cnx.toString())
.add("producerName", producerName).add("producerId",
producerId).toString();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5ccdbfbe715..59411aec040 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1201,7 +1201,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, getPrincipal());
}
- log.info("[{}] Subscribing on topic {} / {}. consumerId: {}",
this.ctx().channel().toString(),
+ log.info("[{}] Subscribing on topic {} / {}. consumerId: {}",
this.toString(),
topicName, subscriptionName, consumerId);
try {
Metadata.validateMetadata(metadata,
@@ -1921,7 +1921,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (log.isDebugEnabled()) {
log.debug("Consumer future is not complete(not complete or
error), but received command ack. so discard"
+ " this command. consumerId: {}, cnx: {},
messageIdCount: {}", ack.getConsumerId(),
- this.ctx().channel().toString(),
ack.getMessageIdsCount());
+ this.toString(), ack.getMessageIdsCount());
}
}
}
@@ -2267,7 +2267,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public String toString() {
return String.format("ServerCnx [%s] get largest batch
index when possible",
- ServerCnx.this.ctx.channel());
+ ServerCnx.this.toString());
}
}, null);
@@ -3301,7 +3301,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
} catch (Throwable t) {
log.warn("[{}] [{}] Failed to remove TCP no-delay property on
client cnx {}", topic, producerName,
- ctx.channel());
+ this.toString());
}
}
}
@@ -3364,6 +3364,31 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return remoteAddress;
}
+ /**
+ * Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038]
[SR:/240.240.0.5:58038].
+ * L: local Address.
+ * R: remote address.
+ * SR: source remote address. It is the source address when enabled
"haProxyProtocolEnabled".
+ */
+ @Override
+ public String toString() {
+ ChannelHandlerContext ctx = ctx();
+ // ctx.channel(): 96.
+ // clientSourceAddress: 5 + 46(ipv6).
+ // state: 19.
+ // Len = 166.
+ StringBuilder buf = new StringBuilder(166);
+ if (ctx == null) {
+ buf.append("[ctx: null]");
+ } else {
+ buf.append(ctx.channel().toString());
+ }
+ String clientSourceAddr = clientSourceAddress();
+ buf.append(" [SR:").append(clientSourceAddr == null ? "-" :
clientSourceAddr)
+ .append(", state:").append(state).append("]");
+ return buf.toString();
+ }
+
@Override
public BrokerService getBrokerService() {
return service;
@@ -3510,7 +3535,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.executor().schedule(() -> {
if (finalConnectionCheckInProgress ==
connectionCheckInProgress
&& !finalConnectionCheckInProgress.isDone()) {
- log.warn("[{}] Connection check timed out. Closing
connection.", remoteAddress);
+ log.warn("[{}] Connection check timed out. Closing
connection.", this.toString());
ctx.close();
}
}, connectionLivenessCheckTimeoutMillis,
TimeUnit.MILLISECONDS);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
index f223d6eee37..7e55397022d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
@@ -87,7 +87,7 @@ final class ServerCnxThrottleTracker {
private void changeAutoRead(boolean autoRead) {
if (isChannelActive()) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Setting auto read to {}",
serverCnx.ctx().channel(), autoRead);
+ log.debug("[{}] Setting auto read to {}",
serverCnx.toString(), autoRead);
}
// change the auto read flag on the channel
serverCnx.ctx().channel().config().setAutoRead(autoRead);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index aea5b9fc65b..b18286ee062 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -131,7 +131,7 @@ public class TopicListService {
} else {
msg += "Pattern longer than maximum: " +
maxSubscriptionPatternLength;
}
- log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(),
msg, namespaceName);
+ log.warn("[{}] {} on namespace {}", connection.toString(), msg,
namespaceName);
connection.getCommandSender().sendErrorResponse(requestId,
ServerError.NotAllowedError, msg);
lookupSemaphore.release();
return;
@@ -144,14 +144,14 @@ public class TopicListService {
TopicListWatcher watcher = existingWatcherFuture.getNow(null);
log.info("[{}] Watcher with the same id is already created:"
+ " watcherId={}, watcher={}",
- connection.getRemoteAddress(), watcherId, watcher);
+ connection.toString(), watcherId, watcher);
watcherFuture = existingWatcherFuture;
} else {
// There was an early request to create a watcher with the
same watcherId. This can happen when
// client timeout is lower the broker timeouts. We need to
wait until the previous watcher
// creation request either completes or fails.
log.warn("[{}] Watcher with id is already present on the
connection,"
- + " consumerId={}", connection.getRemoteAddress(),
watcherId);
+ + " consumerId={}", connection.toString(), watcherId);
ServerError error;
if (!existingWatcherFuture.isDone()) {
error = ServerError.ServiceNotReady;
@@ -179,14 +179,14 @@ public class TopicListService {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Received WatchTopicList for namespace
[//{}] by {}",
- connection.getRemoteAddress(), namespaceName,
requestId);
+ connection.toString(), namespaceName,
requestId);
}
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId,
hash, topicList);
lookupSemaphore.release();
})
.exceptionally(ex -> {
log.warn("[{}] Error WatchTopicList for namespace [//{}]
by {}",
- connection.getRemoteAddress(), namespaceName,
requestId);
+ connection.toString(), namespaceName, requestId);
connection.getCommandSender().sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(
new
BrokerServiceException.ServerMetadataException(ex)), ex.getMessage());
@@ -213,7 +213,7 @@ public class TopicListService {
} else {
if (!watcherFuture.complete(watcher)) {
log.warn("[{}] Watcher future was already
completed. Deregistering watcherId={}.",
- connection.getRemoteAddress(), watcherId);
+ connection.toString(), watcherId);
topicResources.deregisterPersistentTopicListener(watcher);
}
}
@@ -232,7 +232,7 @@ public class TopicListService {
CompletableFuture<TopicListWatcher> watcherFuture =
watchers.get(watcherId);
if (watcherFuture == null) {
log.info("[{}] TopicListWatcher was not registered on the
connection: {}",
- watcherId, connection.getRemoteAddress());
+ watcherId, connection.toString());
return;
}
@@ -242,14 +242,14 @@ public class TopicListService {
// watcher future as failed and we can tell the client the close
operation was successful. When the actual
// create operation will complete, the new watcher will be
discarded.
log.info("[{}] Closed watcher before its creation was completed.
watcherId={}",
- connection.getRemoteAddress(), watcherId);
+ connection.toString(), watcherId);
watchers.remove(watcherId);
return;
}
if (watcherFuture.isCompletedExceptionally()) {
log.info("[{}] Closed watcher that already failed to be created.
watcherId={}",
- connection.getRemoteAddress(), watcherId);
+ connection.toString(), watcherId);
watchers.remove(watcherId);
return;
}
@@ -257,7 +257,7 @@ public class TopicListService {
// Proceed with normal watcher close
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
watchers.remove(watcherId);
- log.info("[{}] Closed watcher, watcherId={}",
connection.getRemoteAddress(), watcherId);
+ log.info("[{}] Closed watcher, watcherId={}", connection.toString(),
watcherId);
}
/**
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c1c1ebe355b..c05b1d796df 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -122,7 +122,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
cmd.parseFrom(buffer, cmdSize);
if (log.isDebugEnabled()) {
- log.debug("[{}] Received cmd {}",
ctx.channel().remoteAddress(), cmd.getType());
+ log.debug("[{}] Received cmd {}", ctx.channel(),
cmd.getType());
}
messageReceived();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 51cd61afd63..d5c741be01e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -67,7 +67,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
this.ctx = ctx;
if (log.isDebugEnabled()) {
- log.debug("[{}] Scheduling keep-alive task every {} s",
ctx.channel(), keepAliveIntervalSeconds);
+ log.debug("[{}] Scheduling keep-alive task every {} s",
this.toString(), keepAliveIntervalSeconds);
}
if (keepAliveIntervalSeconds > 0) {
this.keepAliveTask = ctx.executor()
@@ -85,13 +85,13 @@ public abstract class PulsarHandler extends PulsarDecoder {
protected final void handlePing(CommandPing ping) {
// Immediately reply success to ping requests
if (log.isDebugEnabled()) {
- log.debug("[{}] Replying back to ping message", ctx.channel());
+ log.debug("[{}] Replying back to ping message", this.toString());
}
ctx.writeAndFlush(Commands.newPong())
.addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since
cannot send a pong message.",
- ctx.channel(), future.cause());
+ toString(), future.cause());
ctx.close();
}
});
@@ -107,24 +107,24 @@ public abstract class PulsarHandler extends PulsarDecoder
{
}
if (!isHandshakeCompleted()) {
- log.warn("[{}] Pulsar Handshake was not completed within timeout,
closing connection", ctx.channel());
+ log.warn("[{}] Pulsar Handshake was not completed within timeout,
closing connection", this.toString());
ctx.close();
} else if (waitingForPingResponse &&
ctx.channel().config().isAutoRead()) {
// We were waiting for a response and another keep-alive just
completed.
// If auto-read was disabled, it means we stopped reading from the
connection, so we might receive the Ping
// response later and thus not enforce the strict timeout here.
- log.warn("[{}] Forcing connection to close after keep-alive
timeout", ctx.channel());
+ log.warn("[{}] Forcing connection to close after keep-alive
timeout", this.toString());
ctx.close();
} else if (getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v1.getValue()) {
// Send keep alive probe to peer only if it supports the ping/pong
commands, added in v1
if (log.isDebugEnabled()) {
- log.debug("[{}] Sending ping message", ctx.channel());
+ log.debug("[{}] Sending ping message", this.toString());
}
waitingForPingResponse = true;
sendPing();
} else {
if (log.isDebugEnabled()) {
- log.debug("[{}] Peer doesn't support keep-alive",
ctx.channel());
+ log.debug("[{}] Peer doesn't support keep-alive",
this.toString());
}
}
}
@@ -134,7 +134,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
.addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since
cannot send a ping message.",
- ctx.channel(), future.cause());
+ this.toString(), future.cause());
ctx.close();
}
});
@@ -152,5 +152,20 @@ public abstract class PulsarHandler extends PulsarDecoder {
*/
protected abstract boolean isHandshakeCompleted();
+ /**
+ * Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038].
+ * L: local Address.
+ * R: remote address.
+ */
+ @Override
+ public String toString() {
+ ChannelHandlerContext ctx = this.ctx;
+ if (ctx == null) {
+ return "[ctx: null]";
+ } else {
+ return ctx.channel().toString();
+ }
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PulsarHandler.class);
}