This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new cac4915cc74 [branch-2.10][fix][proxy] Fix using wrong client version 
in pulsar proxy (#19576)
cac4915cc74 is described below

commit cac4915cc74797100be50fc1353c987e1801d0eb
Author: Zike Yang <[email protected]>
AuthorDate: Tue Feb 21 19:48:16 2023 +0800

    [branch-2.10][fix][proxy] Fix using wrong client version in pulsar proxy 
(#19576)
    
    ### Motivation
    
    Cherry pick https://github.com/apache/pulsar/pull/19540 to branch 2.11
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  3 ++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  3 +--
 .../pulsar/proxy/server/ProxyConnection.java       |  5 +++--
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 23 ++++++++++++++++++++++
 4 files changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index ca9e6cdb2ae..5de24142c59 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -314,7 +314,8 @@ public class DirectProxyHandler {
             authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
             AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command;
-            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion, "Pulsar proxy",
+            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
+                    proxyConnection.clientVersion,
                     null /* target broker */, originalPrincipal, 
clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command)
                     
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 283b835fff5..6bb867f1a9c 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.util.Arrays;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
@@ -60,7 +59,7 @@ public class ProxyClientCnx extends ClientCnx {
         authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
-                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
+                proxyConnection.clientVersion, proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
                 clientAuthMethod);
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index be5943e49c8..34ae6f37878 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -41,7 +41,6 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import lombok.Getter;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -92,6 +91,7 @@ public class ProxyConnection extends PulsarHandler {
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
+    String clientVersion;
 
     private String authMethod = "none";
     AuthenticationProvider authenticationProvider;
@@ -408,6 +408,7 @@ public class ProxyConnection extends PulsarHandler {
         this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
         this.protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
         this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? 
connect.getProxyToBrokerUrl() : "null";
+        this.clientVersion = connect.getClientVersion();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received CONNECT from {} proxyToBroker={}", 
remoteAddress, proxyToBrokerUrl);
@@ -496,7 +497,7 @@ public class ProxyConnection extends PulsarHandler {
                     if (authResponse.hasClientVersion()) {
                         clientVersion = authResponse.getClientVersion();
                     } else {
-                        clientVersion = PulsarVersion.getVersion();
+                        clientVersion = this.clientVersion;
                     }
                     int protocolVersion;
                     if (authResponse.hasProtocolVersion()) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 4fe51380777..a43fbc2deb6 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -36,6 +36,7 @@ import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
 import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Consumer;
@@ -311,6 +312,28 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
         }
     }
 
+    @Test
+    public void testGetClientVersion() throws Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .build();
+
+        String topic = "persistent://sample/test/local/testGetClientVersion";
+        String subName = "test-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        consumer.receiveAsync();
+
+
+        
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers()
+                .get(0).getClientVersion(), PulsarVersion.getVersion());
+    }
+
     private static PulsarClient 
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
             throws Exception {
         ThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());

Reply via email to