This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f1765be36f0 [fix][proxy] Fix using wrong client version in pulsar 
proxy (#19540)
f1765be36f0 is described below

commit f1765be36f050bb9771a2aa7f4404d1d4824e1bf
Author: Zike Yang <z...@apache.org>
AuthorDate: Fri Feb 17 18:47:57 2023 +0800

    [fix][proxy] Fix using wrong client version in pulsar proxy (#19540)
    
    ### Motivations
    
    Currently, if we connect the client to the proxy, the `clientVersion` won't 
be send to the broker and we can't get the client version using the PulsarAdmin.
    
    For example, there is no `clientVersion` field shown in the output of topic 
stats:
    ```
    "publishers" : [ {
        "accessMode" : "Shared",
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "chunkedMessageRate" : 0.0,
        "producerId" : 0,
        "metadata" : { },
        "address" : "/127.0.0.1:65385",
        "producerName" : "AlvaroProducer",
        "connectedSince" : "2023-02-16T11:34:30.384548+08:00"
      } ],
    ```
    
    It works fine when directly connecting to the broker.
    
    The root cause is that the pulsar proxy doesn't pass the clientVersion from 
the client to the broker. It set it to `Pulsar proxy`. And thus it will be 
ignored due to here :
    
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L693-L695
    
    ### Modifications
    
    * Use the correct clientVersion from the client
    
    Signed-off-by: Zike Yang <z...@apache.org>
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  2 +-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  3 +--
 .../pulsar/proxy/server/ProxyConnection.java       |  5 +++--
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 23 ++++++++++++++++++++++
 4 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 4b5fef3a994..1e9fd676573 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -325,7 +325,7 @@ public class DirectProxyHandler {
             AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command = Commands.newConnect(
                     authentication.getAuthMethodName(), authData, 
protocolVersion,
-                    "Pulsar proxy", null /* target broker */,
+                    proxyConnection.clientVersion, null /* target broker */,
                     originalPrincipal, clientAuthData, clientAuthMethod);
             writeAndFlush(command);
             isTlsOutboundChannel = 
ProxyConnection.isTlsChannel(inboundChannel);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 6985e1f96e0..a1994fb5af4 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.util.Arrays;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
@@ -66,7 +65,7 @@ public class ProxyClientCnx extends ClientCnx {
         authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
-                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
+                proxyConnection.clientVersion, proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
                 clientAuthMethod);
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5ee79f4ad23..5a53f6ec014 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -45,7 +45,6 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import lombok.Getter;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -99,6 +98,7 @@ public class ProxyConnection extends PulsarHandler {
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
+    String clientVersion;
 
     private String authMethod = "none";
     AuthenticationProvider authenticationProvider;
@@ -475,6 +475,7 @@ public class ProxyConnection extends PulsarHandler {
         this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
         this.protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
         this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? 
connect.getProxyToBrokerUrl() : "null";
+        this.clientVersion = connect.getClientVersion();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received CONNECT from {} proxyToBroker={}", 
remoteAddress, proxyToBrokerUrl);
@@ -568,7 +569,7 @@ public class ProxyConnection extends PulsarHandler {
                     if (authResponse.hasClientVersion()) {
                         clientVersion = authResponse.getClientVersion();
                     } else {
-                        clientVersion = PulsarVersion.getVersion();
+                        clientVersion = this.clientVersion;
                     }
                     int protocolVersion;
                     if (authResponse.hasProtocolVersion()) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 6c9a834bb04..af128ce036f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -35,6 +35,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Consumer;
@@ -311,6 +312,28 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
         }
     }
 
+    @Test
+    public void testGetClientVersion() throws Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .build();
+
+        String topic = "persistent://sample/test/local/testGetClientVersion";
+        String subName = "test-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        consumer.receiveAsync();
+
+
+        
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers()
+                .get(0).getClientVersion(), PulsarVersion.getVersion());
+    }
+
     private static PulsarClient 
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
             throws Exception {
         ThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());

Reply via email to