This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 0cc266d54d6 [improve][broker] Support showing client ip address in client stats while using reverse proxy (#23974) 0cc266d54d6 is described below commit 0cc266d54d69205c78dc1bcf03f0b608c20fb62b Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Mar 4 22:37:11 2025 -0800 [improve][broker] Support showing client ip address in client stats while using reverse proxy (#23974) --- .../java/org/apache/pulsar/broker/service/Consumer.java | 5 ++++- .../java/org/apache/pulsar/broker/service/Producer.java | 5 ++++- .../test/java/org/apache/pulsar/client/api/TlsSniTest.java | 14 ++++++++++++-- .../main/java/org/apache/pulsar/client/impl/ClientCnx.java | 9 ++++++++- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 14 ++++++++++++-- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 14 ++++++++++++-- .../java/org/apache/pulsar/common/naming/Metadata.java | 1 + .../proxy/server/ProxyEnableHAProxyProtocolTest.java | 4 ++-- 8 files changed, 55 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 61f9d5c86b3..a59b755144e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -64,6 +65,7 @@ import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.KeyLongValue; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageIdData; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TopicOperation; @@ -226,7 +228,8 @@ public class Consumer { this.metadata = metadata != null ? metadata : Collections.emptyMap(); stats = new ConsumerStatsImpl(); - stats.setAddress(cnx.clientSourceAddressAndPort()); + String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null; + stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort()); stats.consumerName = consumerName; stats.appId = appId; stats.setConnectedSince(DateFormatter.format(connectedSince)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 0784f74591e..a494627aa4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; @@ -55,6 +56,7 @@ import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TopicOperation; @@ -131,7 +133,8 @@ public class Producer { this.metadata = metadata != null ? metadata : Collections.emptyMap(); this.stats = isNonPersistentTopic ? new NonPersistentPublisherStatsImpl() : new PublisherStatsImpl(); - stats.setAddress(cnx.clientSourceAddressAndPort()); + String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null; + stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort()); stats.setConnectedSince(DateFormatter.now()); stats.setClientVersion(cnx.getClientVersion()); stats.setProducerName(producerName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java index 173fa8acb0f..56fd26a0326 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java @@ -18,16 +18,19 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertNotNull; import java.net.InetAddress; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; - import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.testng.annotations.Test; - import lombok.Cleanup; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.common.naming.Metadata; @Test(groups = "broker-api") public class TlsSniTest extends TlsProducerConsumerBase { @@ -51,6 +54,7 @@ public class TlsSniTest extends TlsProducerConsumerBase { ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl) .tlsTrustCertsFilePath(CA_CERT_FILE_PATH).allowTlsInsecureConnection(false) + .proxyServiceUrl(brokerServiceIpAddressUrl, ProxyProtocol.SNI) .enableTlsHostnameVerification(false) .operationTimeout(1000, TimeUnit.MILLISECONDS); Map<String, String> authParams = new HashMap<>(); @@ -62,6 +66,12 @@ public class TlsSniTest extends TlsProducerConsumerBase { PulsarClient pulsarClient = clientBuilder.build(); // should be able to create producer successfully pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newConsumer().topic(topicName).subscriptionName("test").subscribe(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get(); + Producer producer = topic.getProducers().values().iterator().next(); + assertNotNull(producer.getMetadata().get(Metadata.CLIENT_IP)); + Consumer consumer = topic.getSubscription("test").getDispatcher().getConsumers().iterator().next(); + assertNotNull(consumer.getMetadata().get(Metadata.CLIENT_IP)); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index e8b691b2eea..1659b611096 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -119,6 +119,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") public class ClientCnx extends PulsarHandler { + private static final Logger log = LoggerFactory.getLogger(ClientCnx.class); protected final Authentication authentication; protected State state; @@ -1438,7 +1439,13 @@ public class ClientCnx extends PulsarHandler { } } - private static final Logger log = LoggerFactory.getLogger(ClientCnx.class); + public boolean isProxy() { + return proxyToTargetBrokerAddress != null; + } + + public SocketAddress getLocalAddress() { + return this.localAddress; + } /** * Check client connection is now free. This method will not change the state to idle. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6f2ad9152d3..4691c402b2f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -121,6 +121,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaInfo; @@ -187,7 +188,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private final MessageCrypto msgCrypto; - private final Map<String, String> metadata; + private Map<String, String> metadata; private final boolean readCompacted; private final boolean resetIncludeHead; @@ -361,7 +362,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (conf.getProperties().isEmpty()) { metadata = Collections.emptyMap(); } else { - metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); + metadata = new HashMap<>(conf.getProperties()); } this.connectionHandler = new ConnectionHandler(this, @@ -910,6 +911,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them final CompletableFuture<Void> future = new CompletableFuture<>(); synchronized (this) { + updateProxyMetadataIfNeeded(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.getReplicateSubscriptionState(), @@ -3134,6 +3136,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return cmd; } + private void updateProxyMetadataIfNeeded(ClientCnx cnx) { + boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null; + if (isProxy && cnx.getLocalAddress() != null) { + metadata = metadata.isEmpty() ? new HashMap<>() : metadata; + metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString()); + } + } + private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType, Map<String, Long> properties, TxnID txnID) { long requestId = client.newRequestId(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 039468386ed..0b1f8edf107 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -92,6 +92,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; @@ -156,7 +157,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private ScheduledFuture<?> keyGeneratorTask = null; - private final Map<String, String> metadata; + private Map<String, String> metadata; private Optional<byte[]> schemaVersion = Optional.empty(); @@ -280,7 +281,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (conf.getProperties().isEmpty()) { metadata = Collections.emptyMap(); } else { - metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); + metadata = new HashMap<>(conf.getProperties()); } InstrumentProvider ip = client.instrumentProvider(); @@ -1856,6 +1857,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } final CompletableFuture<Void> future = new CompletableFuture<>(); + updateProxyMetadataIfNeeded(cnx); cnx.sendRequestWithId( Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata, schemaInfo, epoch, userProvidedProducerName, @@ -2028,6 +2030,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } } + private void updateProxyMetadataIfNeeded(ClientCnx cnx) { + boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null; + if (isProxy && cnx.getLocalAddress() != null) { + metadata = metadata.isEmpty() ? new HashMap<>() : metadata; + metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString()); + } + } + private void closeProducerTasks() { Timeout timeout = sendTimeout; if (timeout != null) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java index eba492cf6bf..635238ef919 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java @@ -25,6 +25,7 @@ import java.util.Map; */ public class Metadata { + public static final String CLIENT_IP = "X-Pulsar-Client-IP"; private Metadata() {} public static void validateMetadata(Map<String, String> metadata, diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 40aa8f50405..f24a80fc2a2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -122,11 +122,11 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subName); Assert.assertEquals(subscriptionStats.getConsumers().size(), 1); Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(), - ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", "")); + ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString()); topicStats = admin.topics().getStats(topicName); Assert.assertEquals(topicStats.getPublishers().size(), 1); Assert.assertEquals(topicStats.getPublishers().get(0).getAddress(), - ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", "")); + ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString()); } }