This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new cac4915cc74 [branch-2.10][fix][proxy] Fix using wrong client version
in pulsar proxy (#19576)
cac4915cc74 is described below
commit cac4915cc74797100be50fc1353c987e1801d0eb
Author: Zike Yang <[email protected]>
AuthorDate: Tue Feb 21 19:48:16 2023 +0800
[branch-2.10][fix][proxy] Fix using wrong client version in pulsar proxy
(#19576)
### Motivation
Cherry pick https://github.com/apache/pulsar/pull/19540 to branch 2.11
---
.../pulsar/proxy/server/DirectProxyHandler.java | 3 ++-
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 3 +--
.../pulsar/proxy/server/ProxyConnection.java | 5 +++--
.../org/apache/pulsar/proxy/server/ProxyTest.java | 23 ++++++++++++++++++++++
4 files changed, 29 insertions(+), 5 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index ca9e6cdb2ae..5de24142c59 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -314,7 +314,8 @@ public class DirectProxyHandler {
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
ByteBuf command;
- command = Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion, "Pulsar proxy",
+ command = Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
+ proxyConnection.clientVersion,
null /* target broker */, originalPrincipal,
clientAuthData, clientAuthMethod);
outboundChannel.writeAndFlush(command)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 283b835fff5..6bb867f1a9c 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
@@ -60,7 +59,7 @@ public class ProxyClientCnx extends ClientCnx {
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
- PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
+ proxyConnection.clientVersion, proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
clientAuthMethod);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index be5943e49c8..34ae6f37878 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -41,7 +41,6 @@ import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
-import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -92,6 +91,7 @@ public class ProxyConnection extends PulsarHandler {
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
+ String clientVersion;
private String authMethod = "none";
AuthenticationProvider authenticationProvider;
@@ -408,6 +408,7 @@ public class ProxyConnection extends PulsarHandler {
this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
this.protocolVersionToAdvertise =
getProtocolVersionToAdvertise(connect);
this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ?
connect.getProxyToBrokerUrl() : "null";
+ this.clientVersion = connect.getClientVersion();
if (LOG.isDebugEnabled()) {
LOG.debug("Received CONNECT from {} proxyToBroker={}",
remoteAddress, proxyToBrokerUrl);
@@ -496,7 +497,7 @@ public class ProxyConnection extends PulsarHandler {
if (authResponse.hasClientVersion()) {
clientVersion = authResponse.getClientVersion();
} else {
- clientVersion = PulsarVersion.getVersion();
+ clientVersion = this.clientVersion;
}
int protocolVersion;
if (authResponse.hasProtocolVersion()) {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 4fe51380777..a43fbc2deb6 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -36,6 +36,7 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Consumer;
@@ -311,6 +312,28 @@ public class ProxyTest extends MockedPulsarServiceBaseTest
{
}
}
+ @Test
+ public void testGetClientVersion() throws Exception {
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+ .build();
+
+ String topic = "persistent://sample/test/local/testGetClientVersion";
+ String subName = "test-sub";
+
+ @Cleanup
+ Consumer<byte[]> consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ consumer.receiveAsync();
+
+
+
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers()
+ .get(0).getClientVersion(), PulsarVersion.getVersion());
+ }
+
private static PulsarClient
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());