This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15453964f6e [feat][proxy] PIP-250: Add proxyVersion to CommandConnect 
(#19618)
15453964f6e is described below

commit 15453964f6e6086b0c5e34736958cf749f16f5e1
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Apr 11 13:19:32 2023 -0500

    [feat][proxy] PIP-250: Add proxyVersion to CommandConnect (#19618)
    
    PIP #19623
    Relates to https://github.com/apache/pulsar/pull/19540
    
    ### Motivation
    
    In order to get more information about connections, it is helpful for the 
proxy to supply its version to the broker.
    
    ### Modifications
    
    * Add `proxy_version` field to the `CommandConnect` protobuf message
    * Update proxy and broker to handle this new field
    
    ### Verifying this change
    
    New tests are added with this PR.
    
    ### Does this pull request potentially affect one of the following parts:
    
    - [x] The binary protocol
    
    This will be submitted as part of a PIP.
    
    ### Documentation
    
    - [x] `doc-not-needed`
---
 .../broker/authorization/AuthorizationService.java |  2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 40 ++++++++++++++++++---
 .../apache/pulsar/broker/service/TransportCnx.java |  1 +
 .../pulsar/broker/service/ServerCnxTest.java       | 41 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    | 11 ++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  6 ++--
 .../pulsar/proxy/server/DirectProxyHandler.java    |  2 +-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  3 +-
 .../pulsar/proxy/server/ProxyConnection.java       |  9 +++++
 9 files changed, 105 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 706eadf0ec2..0c61219b57a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -464,7 +464,7 @@ public class AuthorizationService {
         }
     }
 
-    private boolean isProxyRole(String role) {
+    public boolean isProxyRole(String role) {
         return role != null && conf.getProxyRoles().contains(role);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bfd7c001e4c..01274ab4460 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -210,6 +210,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private int pendingSendRequest = 0;
     private final String replicatorPrefix;
     private String clientVersion = null;
+    private String proxyVersion = null;
     private int nonPersistentPendingMessages = 0;
     private final int maxNonPersistentPendingMessages;
     private String originalPrincipal = null;
@@ -320,7 +321,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
             return;
         }
-        log.info("New connection from {}", remoteAddress);
+        if (log.isDebugEnabled()) {
+            // Connection information is logged after a successful Connect 
command is processed.
+            log.debug("New connection from {}", remoteAddress);
+        }
         this.ctx = ctx;
         this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, 
this);
         this.service.getPulsarStats().recordConnectionCreate();
@@ -690,6 +694,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
                     return;
                 }
+                if (proxyVersion != null && 
!service.getAuthorizationService().isProxyRole(authRole)) {
+                    // Only allow proxyVersion to be set when connecting with 
a proxy
+                    state = State.Failed;
+                    service.getPulsarStats().recordConnectionCreateFail();
+                    final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthorizationError,
+                            "Must not set proxyVersion without connecting as a 
ProxyRole.");
+                    NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
+                    return;
+                }
             }
             maybeScheduleAuthenticationCredentialsRefresh();
         }
@@ -703,6 +716,18 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (isNotBlank(clientVersion)) {
             this.clientVersion = clientVersion.intern();
         }
+        if (!service.isAuthenticationEnabled()) {
+            log.info("[{}] connected with clientVersion={}, 
clientProtocolVersion={}, proxyVersion={}", remoteAddress,
+                    clientVersion, clientProtoVersion, proxyVersion);
+        } else if (originalPrincipal != null) {
+            log.info("[{}] connected role={} and originalAuthRole={} using 
authMethod={}, clientVersion={}, "
+                            + "clientProtocolVersion={}, proxyVersion={}", 
remoteAddress, authRole, originalPrincipal,
+                    authMethod, clientVersion, clientProtoVersion, 
proxyVersion);
+        } else {
+            log.info("[{}] connected with role={} using authMethod={}, 
clientVersion={}, clientProtocolVersion={}, "
+                            + "proxyVersion={}", remoteAddress, authRole, 
authMethod, clientVersion, clientProtoVersion,
+                    proxyVersion);
+        }
         if (brokerInterceptor != null) {
             brokerInterceptor.onConnectionCreated(this);
         }
@@ -761,10 +786,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         authenticateOriginalData(clientProtocolVersion, 
clientVersion);
                     } else {
                         completeConnect(clientProtocolVersion, clientVersion);
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Client successfully authenticated 
with {} role {} and originalPrincipal {}",
-                                    remoteAddress, authMethod, this.authRole, 
originalPrincipal);
-                        }
                     }
                 } else {
                     // Refresh the auth data
@@ -948,6 +969,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             features.copyFrom(connect.getFeatureFlags());
         }
 
+        if (connect.hasProxyVersion()) {
+            proxyVersion = connect.getProxyVersion();
+        }
+
         if (!service.isAuthenticationEnabled()) {
             completeConnect(clientProtocolVersion, clientVersion);
             return;
@@ -3266,6 +3291,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         return clientVersion;
     }
 
+    @Override
+    public String getProxyVersion() {
+        return proxyVersion;
+    }
+
     @VisibleForTesting
     void setAutoReadDisabledRateLimiting(boolean isLimiting) {
         this.autoReadDisabledRateLimiting = isLimiting;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index f1aaca2b290..d267160652a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 public interface TransportCnx {
 
     String getClientVersion();
+    String getProxyVersion();
 
     SocketAddress clientAddress();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 874d4389622..c3bab634a42 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -297,6 +297,21 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testConnectCommandWithProxyVersion() throws Exception {
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect("none", null, 1, null, 
null, null, null, null,
+                "my-pulsar-proxy");
+        channel.writeInbound(clientCommand);
+
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy");
+        channel.finish();
+    }
+
     @DataProvider(name = "clientVersions")
     public Object[][] clientVersions() {
         return new Object[][]{
@@ -512,6 +527,32 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test
+    public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthorizationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
AuthData.of("pass.pass".getBytes()),
+                1, null, null, null, null, null, "my-pulsar-proxy");
+        channel.writeInbound(clientCommand);
+        Object response = getResponse();
+        assertTrue(response instanceof CommandError);
+        assertEquals(((CommandError) response).getError(), 
ServerError.AuthorizationError);
+        assertEquals(serverCnx.getState(), State.Failed);
+        channel.finish();
+    }
+
+    @Test
     public void testAuthChallengePrincipalChangeFails() throws Exception {
         AuthenticationService authenticationService = 
mock(AuthenticationService.class);
         AuthenticationProvider authenticationProvider = new 
MockAlwaysExpiredAuthenticationProvider();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 85c4d021fdf..cf0cd820a6d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -234,11 +234,22 @@ public class Commands {
     public static ByteBuf newConnect(String authMethodName, AuthData authData, 
int protocolVersion, String libVersion,
                                      String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
                                      String originalAuthMethod) {
+        return newConnect(authMethodName, authData, protocolVersion, 
libVersion, targetBroker, originalPrincipal,
+                originalAuthData, originalAuthMethod, null);
+    }
+
+    public static ByteBuf newConnect(String authMethodName, AuthData authData, 
int protocolVersion, String libVersion,
+                                     String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
+                                     String originalAuthMethod, String 
proxyVersion) {
         BaseCommand cmd = localCmd(Type.CONNECT);
         CommandConnect connect = cmd.setConnect()
                 .setClientVersion(libVersion != null ? libVersion : "Pulsar 
Client")
                 .setAuthMethodName(authMethodName);
 
+        if (proxyVersion != null) {
+            connect.setProxyVersion(proxyVersion);
+        }
+
         if (targetBroker != null) {
             // When connecting through a proxy, we need to specify which 
broker do we want to be proxied through
             connect.setProxyToBrokerUrl(targetBroker);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index d9c41eeec97..afe193eeb7e 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -268,7 +268,7 @@ enum ProtocolVersion {
 }
 
 message CommandConnect {
-    required string client_version = 1;
+    required string client_version = 1; // The version of the client. Proxy 
should forward client's client_version.
     optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" 
instead.
     optional string auth_method_name = 5;
     optional bytes auth_data = 3;
@@ -291,6 +291,8 @@ message CommandConnect {
 
     // Feature flags
     optional FeatureFlags feature_flags = 10;
+
+    optional string proxy_version = 11; // Version of the proxy. Should only 
be forwarded by a proxy.
 }
 
 message FeatureFlags {
@@ -308,7 +310,7 @@ message CommandConnected {
 }
 
 message CommandAuthResponse {
-    optional string client_version = 1;
+    optional string client_version = 1; // The version of the client. Proxy 
should forward client's client_version.
     optional AuthData response = 2;
     optional int32 protocol_version = 3 [default = 0];
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index cb93f885a1d..d63b04b6734 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -327,7 +327,7 @@ public class DirectProxyHandler {
             ByteBuf command = Commands.newConnect(
                     authentication.getAuthMethodName(), authData, 
protocolVersion,
                     proxyConnection.clientVersion, null /* target broker */,
-                    originalPrincipal, clientAuthData, clientAuthMethod);
+                    originalPrincipal, clientAuthData, clientAuthMethod, 
PulsarVersion.getVersion());
             writeAndFlush(command);
             isTlsOutboundChannel = 
ProxyConnection.isTlsChannel(inboundChannel);
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index a1994fb5af4..00ec0b592e4 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.util.Arrays;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
@@ -66,7 +67,7 @@ public class ProxyClientCnx extends ClientCnx {
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
                 proxyConnection.clientVersion, proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
-                clientAuthMethod);
+                clientAuthMethod, PulsarVersion.getVersion());
     }
 
     @Override
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index f03aa59619f..4d32c9dce2d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -497,6 +497,15 @@ public class ProxyConnection extends PulsarHandler {
             return;
         }
 
+        if (connect.hasProxyVersion()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Client illegally provided proxyVersion.", 
remoteAddress);
+            }
+            state = State.Closing;
+            writeAndFlushAndClose(Commands.newError(-1, 
ServerError.NotAllowedError, "Must not provide proxyVersion"));
+            return;
+        }
+
         try {
             // init authn
             this.clientConf = createClientConfiguration();

Reply via email to