This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc266d54d6 [improve][broker] Support showing client ip address in 
client stats while using reverse proxy (#23974)
0cc266d54d6 is described below

commit 0cc266d54d69205c78dc1bcf03f0b608c20fb62b
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Mar 4 22:37:11 2025 -0800

    [improve][broker] Support showing client ip address in client stats while 
using reverse proxy (#23974)
---
 .../java/org/apache/pulsar/broker/service/Consumer.java    |  5 ++++-
 .../java/org/apache/pulsar/broker/service/Producer.java    |  5 ++++-
 .../test/java/org/apache/pulsar/client/api/TlsSniTest.java | 14 ++++++++++++--
 .../main/java/org/apache/pulsar/client/impl/ClientCnx.java |  9 ++++++++-
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java   | 14 ++++++++++++--
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java   | 14 ++++++++++++--
 .../java/org/apache/pulsar/common/naming/Metadata.java     |  1 +
 .../proxy/server/ProxyEnableHAProxyProtocolTest.java       |  4 ++--
 8 files changed, 55 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 61f9d5c86b3..a59b755144e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -64,6 +65,7 @@ import 
org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.KeyLongValue;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TopicOperation;
@@ -226,7 +228,8 @@ public class Consumer {
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
         stats = new ConsumerStatsImpl();
-        stats.setAddress(cnx.clientSourceAddressAndPort());
+        String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : 
null;
+        stats.setAddress(StringUtils.isNotBlank(address) ? address : 
cnx.clientSourceAddressAndPort());
         stats.consumerName = consumerName;
         stats.appId = appId;
         stats.setConnectedSince(DateFormatter.format(connectedSince));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 0784f74591e..a494627aa4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -55,6 +56,7 @@ import 
org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TopicOperation;
@@ -131,7 +133,8 @@ public class Producer {
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
         this.stats = isNonPersistentTopic ? new 
NonPersistentPublisherStatsImpl() : new PublisherStatsImpl();
-        stats.setAddress(cnx.clientSourceAddressAndPort());
+        String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : 
null;
+        stats.setAddress(StringUtils.isNotBlank(address) ? address : 
cnx.clientSourceAddressAndPort());
         stats.setConnectedSince(DateFormatter.now());
         stats.setClientVersion(cnx.getClientVersion());
         stats.setProducerName(producerName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
index 173fa8acb0f..56fd26a0326 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
@@ -18,16 +18,19 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.assertNotNull;
 import java.net.InetAddress;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.testng.annotations.Test;
-
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.common.naming.Metadata;
 
 @Test(groups = "broker-api")
 public class TlsSniTest extends TlsProducerConsumerBase {
@@ -51,6 +54,7 @@ public class TlsSniTest extends TlsProducerConsumerBase {
 
         ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
                 
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH).allowTlsInsecureConnection(false)
+                .proxyServiceUrl(brokerServiceIpAddressUrl, ProxyProtocol.SNI)
                 .enableTlsHostnameVerification(false)
                 .operationTimeout(1000, TimeUnit.MILLISECONDS);
         Map<String, String> authParams = new HashMap<>();
@@ -62,6 +66,12 @@ public class TlsSniTest extends TlsProducerConsumerBase {
         PulsarClient pulsarClient = clientBuilder.build();
         // should be able to create producer successfully
         pulsarClient.newProducer().topic(topicName).create();
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName("test").subscribe();
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        Producer producer = topic.getProducers().values().iterator().next();
+        assertNotNull(producer.getMetadata().get(Metadata.CLIENT_IP));
+        Consumer consumer = 
topic.getSubscription("test").getDispatcher().getConsumers().iterator().next();
+        assertNotNull(consumer.getMetadata().get(Metadata.CLIENT_IP));
     }
 }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index e8b691b2eea..1659b611096 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -119,6 +119,7 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("unchecked")
 public class ClientCnx extends PulsarHandler {
 
+    private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
     protected final Authentication authentication;
     protected State state;
 
@@ -1438,7 +1439,13 @@ public class ClientCnx extends PulsarHandler {
         }
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
+    public boolean isProxy() {
+        return proxyToTargetBrokerAddress != null;
+    }
+
+    public SocketAddress getLocalAddress() {
+        return this.localAddress;
+    }
 
     /**
      * Check client connection is now free. This method will not change the 
state to idle.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f2ad9152d3..4691c402b2f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -121,6 +121,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -187,7 +188,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final MessageCrypto msgCrypto;
 
-    private final Map<String, String> metadata;
+    private Map<String, String> metadata;
 
     private final boolean readCompacted;
     private final boolean resetIncludeHead;
@@ -361,7 +362,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (conf.getProperties().isEmpty()) {
             metadata = Collections.emptyMap();
         } else {
-            metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
+            metadata = new HashMap<>(conf.getProperties());
         }
 
         this.connectionHandler = new ConnectionHandler(this,
@@ -910,6 +911,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
         final CompletableFuture<Void> future = new CompletableFuture<>();
         synchronized (this) {
+            updateProxyMetadataIfNeeded(cnx);
             ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
                     priorityLevel, consumerName, isDurable, 
startMessageIdData, metadata, readCompacted,
                     conf.getReplicateSubscriptionState(),
@@ -3134,6 +3136,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return cmd;
     }
 
+    private void updateProxyMetadataIfNeeded(ClientCnx cnx) {
+        boolean isProxy = cnx.isProxy() || 
client.getConfiguration().getProxyServiceUrl() != null;
+        if (isProxy && cnx.getLocalAddress() != null) {
+            metadata = metadata.isEmpty() ? new HashMap<>() : metadata;
+            metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString());
+        }
+    }
+
     private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
                                                                         
Map<String, Long> properties, TxnID txnID) {
         long requestId = client.newRequestId();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 039468386ed..0b1f8edf107 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -92,6 +92,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     private ScheduledFuture<?> keyGeneratorTask = null;
 
-    private final Map<String, String> metadata;
+    private Map<String, String> metadata;
 
     private Optional<byte[]> schemaVersion = Optional.empty();
 
@@ -280,7 +281,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         if (conf.getProperties().isEmpty()) {
             metadata = Collections.emptyMap();
         } else {
-            metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
+            metadata = new HashMap<>(conf.getProperties());
         }
 
         InstrumentProvider ip = client.instrumentProvider();
@@ -1856,6 +1857,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
+        updateProxyMetadataIfNeeded(cnx);
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata,
                         schemaInfo, epoch, userProvidedProducerName,
@@ -2028,6 +2030,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
+    private void updateProxyMetadataIfNeeded(ClientCnx cnx) {
+        boolean isProxy = cnx.isProxy() || 
client.getConfiguration().getProxyServiceUrl() != null;
+        if (isProxy && cnx.getLocalAddress() != null) {
+            metadata = metadata.isEmpty() ? new HashMap<>() : metadata;
+            metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString());
+        }
+    }
+
     private void closeProducerTasks() {
         Timeout timeout = sendTimeout;
         if (timeout != null) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
index eba492cf6bf..635238ef919 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java
@@ -25,6 +25,7 @@ import java.util.Map;
  */
 public class Metadata {
 
+    public static final String CLIENT_IP = "X-Pulsar-Client-IP";
     private Metadata() {}
 
     public static void validateMetadata(Map<String, String> metadata,
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 40aa8f50405..f24a80fc2a2 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -122,11 +122,11 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         SubscriptionStats subscriptionStats = 
topicStats.getSubscriptions().get(subName);
         Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
         
Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(),
-                ((ConsumerImpl) 
consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/",
 ""));
+                ((ConsumerImpl) 
consumer).getClientCnx().ctx().channel().localAddress().toString());
 
         topicStats = admin.topics().getStats(topicName);
         Assert.assertEquals(topicStats.getPublishers().size(), 1);
         Assert.assertEquals(topicStats.getPublishers().get(0).getAddress(),
-                ((ProducerImpl) 
producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/",
 ""));
+                ((ProducerImpl) 
producer).getClientCnx().ctx().channel().localAddress().toString());
     }
 }

Reply via email to