This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f1765be36f0 [fix][proxy] Fix using wrong client version in pulsar proxy (#19540) f1765be36f0 is described below commit f1765be36f050bb9771a2aa7f4404d1d4824e1bf Author: Zike Yang <z...@apache.org> AuthorDate: Fri Feb 17 18:47:57 2023 +0800 [fix][proxy] Fix using wrong client version in pulsar proxy (#19540) ### Motivations Currently, if we connect the client to the proxy, the `clientVersion` won't be send to the broker and we can't get the client version using the PulsarAdmin. For example, there is no `clientVersion` field shown in the output of topic stats: ``` "publishers" : [ { "accessMode" : "Shared", "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "averageMsgSize" : 0.0, "chunkedMessageRate" : 0.0, "producerId" : 0, "metadata" : { }, "address" : "/127.0.0.1:65385", "producerName" : "AlvaroProducer", "connectedSince" : "2023-02-16T11:34:30.384548+08:00" } ], ``` It works fine when directly connecting to the broker. The root cause is that the pulsar proxy doesn't pass the clientVersion from the client to the broker. It set it to `Pulsar proxy`. And thus it will be ignored due to here : https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L693-L695 ### Modifications * Use the correct clientVersion from the client Signed-off-by: Zike Yang <z...@apache.org> --- .../pulsar/proxy/server/DirectProxyHandler.java | 2 +- .../apache/pulsar/proxy/server/ProxyClientCnx.java | 3 +-- .../pulsar/proxy/server/ProxyConnection.java | 5 +++-- .../org/apache/pulsar/proxy/server/ProxyTest.java | 23 ++++++++++++++++++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 4b5fef3a994..1e9fd676573 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -325,7 +325,7 @@ public class DirectProxyHandler { AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); ByteBuf command = Commands.newConnect( authentication.getAuthMethodName(), authData, protocolVersion, - "Pulsar proxy", null /* target broker */, + proxyConnection.clientVersion, null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod); writeAndFlush(command); isTlsOutboundChannel = ProxyConnection.isTlsChannel(inboundChannel); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 6985e1f96e0..a1994fb5af4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import java.util.Arrays; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; @@ -66,7 +65,7 @@ public class ProxyClientCnx extends ClientCnx { authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, - PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, + proxyConnection.clientVersion, proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 5ee79f4ad23..5a53f6ec014 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -45,7 +45,6 @@ import java.util.function.Supplier; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import lombok.Getter; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; @@ -99,6 +98,7 @@ public class ProxyConnection extends PulsarHandler { String clientAuthRole; AuthData clientAuthData; String clientAuthMethod; + String clientVersion; private String authMethod = "none"; AuthenticationProvider authenticationProvider; @@ -475,6 +475,7 @@ public class ProxyConnection extends PulsarHandler { this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl(); this.protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect); this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? connect.getProxyToBrokerUrl() : "null"; + this.clientVersion = connect.getClientVersion(); if (LOG.isDebugEnabled()) { LOG.debug("Received CONNECT from {} proxyToBroker={}", remoteAddress, proxyToBrokerUrl); @@ -568,7 +569,7 @@ public class ProxyConnection extends PulsarHandler { if (authResponse.hasClientVersion()) { clientVersion = authResponse.getClientVersion(); } else { - clientVersion = PulsarVersion.getVersion(); + clientVersion = this.clientVersion; } int protocolVersion; if (authResponse.hasProtocolVersion()) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 6c9a834bb04..af128ce036f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -35,6 +35,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.avro.reflect.Nullable; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Consumer; @@ -311,6 +312,28 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { } } + @Test + public void testGetClientVersion() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) + .build(); + + String topic = "persistent://sample/test/local/testGetClientVersion"; + String subName = "test-sub"; + + @Cleanup + Consumer<byte[]> consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscribe(); + + consumer.receiveAsync(); + + + Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers() + .get(0).getClientVersion(), PulsarVersion.getVersion()); + } + private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf) throws Exception { ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());