This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15453964f6e [feat][proxy] PIP-250: Add proxyVersion to CommandConnect
(#19618)
15453964f6e is described below
commit 15453964f6e6086b0c5e34736958cf749f16f5e1
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Apr 11 13:19:32 2023 -0500
[feat][proxy] PIP-250: Add proxyVersion to CommandConnect (#19618)
PIP #19623
Relates to https://github.com/apache/pulsar/pull/19540
### Motivation
In order to get more information about connections, it is helpful for the
proxy to supply its version to the broker.
### Modifications
* Add `proxy_version` field to the `CommandConnect` protobuf message
* Update proxy and broker to handle this new field
### Verifying this change
New tests are added with this PR.
### Does this pull request potentially affect one of the following parts:
- [x] The binary protocol
This will be submitted as part of a PIP.
### Documentation
- [x] `doc-not-needed`
---
.../broker/authorization/AuthorizationService.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 40 ++++++++++++++++++---
.../apache/pulsar/broker/service/TransportCnx.java | 1 +
.../pulsar/broker/service/ServerCnxTest.java | 41 ++++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 11 ++++++
pulsar-common/src/main/proto/PulsarApi.proto | 6 ++--
.../pulsar/proxy/server/DirectProxyHandler.java | 2 +-
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 3 +-
.../pulsar/proxy/server/ProxyConnection.java | 9 +++++
9 files changed, 105 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 706eadf0ec2..0c61219b57a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -464,7 +464,7 @@ public class AuthorizationService {
}
}
- private boolean isProxyRole(String role) {
+ public boolean isProxyRole(String role) {
return role != null && conf.getProxyRoles().contains(role);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bfd7c001e4c..01274ab4460 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -210,6 +210,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private int pendingSendRequest = 0;
private final String replicatorPrefix;
private String clientVersion = null;
+ private String proxyVersion = null;
private int nonPersistentPendingMessages = 0;
private final int maxNonPersistentPendingMessages;
private String originalPrincipal = null;
@@ -320,7 +321,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
- log.info("New connection from {}", remoteAddress);
+ if (log.isDebugEnabled()) {
+ // Connection information is logged after a successful Connect
command is processed.
+ log.debug("New connection from {}", remoteAddress);
+ }
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor,
this);
this.service.getPulsarStats().recordConnectionCreate();
@@ -690,6 +694,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
+ if (proxyVersion != null &&
!service.getAuthorizationService().isProxyRole(authRole)) {
+ // Only allow proxyVersion to be set when connecting with
a proxy
+ state = State.Failed;
+ service.getPulsarStats().recordConnectionCreateFail();
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError,
+ "Must not set proxyVersion without connecting as a
ProxyRole.");
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
+ return;
+ }
}
maybeScheduleAuthenticationCredentialsRefresh();
}
@@ -703,6 +716,18 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (isNotBlank(clientVersion)) {
this.clientVersion = clientVersion.intern();
}
+ if (!service.isAuthenticationEnabled()) {
+ log.info("[{}] connected with clientVersion={},
clientProtocolVersion={}, proxyVersion={}", remoteAddress,
+ clientVersion, clientProtoVersion, proxyVersion);
+ } else if (originalPrincipal != null) {
+ log.info("[{}] connected role={} and originalAuthRole={} using
authMethod={}, clientVersion={}, "
+ + "clientProtocolVersion={}, proxyVersion={}",
remoteAddress, authRole, originalPrincipal,
+ authMethod, clientVersion, clientProtoVersion,
proxyVersion);
+ } else {
+ log.info("[{}] connected with role={} using authMethod={},
clientVersion={}, clientProtocolVersion={}, "
+ + "proxyVersion={}", remoteAddress, authRole,
authMethod, clientVersion, clientProtoVersion,
+ proxyVersion);
+ }
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
}
@@ -761,10 +786,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
authenticateOriginalData(clientProtocolVersion,
clientVersion);
} else {
completeConnect(clientProtocolVersion, clientVersion);
- if (log.isDebugEnabled()) {
- log.debug("[{}] Client successfully authenticated
with {} role {} and originalPrincipal {}",
- remoteAddress, authMethod, this.authRole,
originalPrincipal);
- }
}
} else {
// Refresh the auth data
@@ -948,6 +969,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
features.copyFrom(connect.getFeatureFlags());
}
+ if (connect.hasProxyVersion()) {
+ proxyVersion = connect.getProxyVersion();
+ }
+
if (!service.isAuthenticationEnabled()) {
completeConnect(clientProtocolVersion, clientVersion);
return;
@@ -3266,6 +3291,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return clientVersion;
}
+ @Override
+ public String getProxyVersion() {
+ return proxyVersion;
+ }
+
@VisibleForTesting
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index f1aaca2b290..d267160652a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -27,6 +27,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationDataSource;
public interface TransportCnx {
String getClientVersion();
+ String getProxyVersion();
SocketAddress clientAddress();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 874d4389622..c3bab634a42 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -297,6 +297,21 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testConnectCommandWithProxyVersion() throws Exception {
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect("none", null, 1, null,
null, null, null, null,
+ "my-pulsar-proxy");
+ channel.writeInbound(clientCommand);
+
+ assertEquals(serverCnx.getState(), State.Connected);
+ assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy");
+ channel.finish();
+ }
+
@DataProvider(name = "clientVersions")
public Object[][] clientVersions() {
return new Object[][]{
@@ -512,6 +527,32 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test
+ public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthorizationEnabled(true);
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
AuthData.of("pass.pass".getBytes()),
+ 1, null, null, null, null, null, "my-pulsar-proxy");
+ channel.writeInbound(clientCommand);
+ Object response = getResponse();
+ assertTrue(response instanceof CommandError);
+ assertEquals(((CommandError) response).getError(),
ServerError.AuthorizationError);
+ assertEquals(serverCnx.getState(), State.Failed);
+ channel.finish();
+ }
+
+ @Test
public void testAuthChallengePrincipalChangeFails() throws Exception {
AuthenticationService authenticationService =
mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new
MockAlwaysExpiredAuthenticationProvider();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 85c4d021fdf..cf0cd820a6d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -234,11 +234,22 @@ public class Commands {
public static ByteBuf newConnect(String authMethodName, AuthData authData,
int protocolVersion, String libVersion,
String targetBroker, String
originalPrincipal, AuthData originalAuthData,
String originalAuthMethod) {
+ return newConnect(authMethodName, authData, protocolVersion,
libVersion, targetBroker, originalPrincipal,
+ originalAuthData, originalAuthMethod, null);
+ }
+
+ public static ByteBuf newConnect(String authMethodName, AuthData authData,
int protocolVersion, String libVersion,
+ String targetBroker, String
originalPrincipal, AuthData originalAuthData,
+ String originalAuthMethod, String
proxyVersion) {
BaseCommand cmd = localCmd(Type.CONNECT);
CommandConnect connect = cmd.setConnect()
.setClientVersion(libVersion != null ? libVersion : "Pulsar
Client")
.setAuthMethodName(authMethodName);
+ if (proxyVersion != null) {
+ connect.setProxyVersion(proxyVersion);
+ }
+
if (targetBroker != null) {
// When connecting through a proxy, we need to specify which
broker do we want to be proxied through
connect.setProxyToBrokerUrl(targetBroker);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index d9c41eeec97..afe193eeb7e 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -268,7 +268,7 @@ enum ProtocolVersion {
}
message CommandConnect {
- required string client_version = 1;
+ required string client_version = 1; // The version of the client. Proxy
should forward client's client_version.
optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name"
instead.
optional string auth_method_name = 5;
optional bytes auth_data = 3;
@@ -291,6 +291,8 @@ message CommandConnect {
// Feature flags
optional FeatureFlags feature_flags = 10;
+
+ optional string proxy_version = 11; // Version of the proxy. Should only
be forwarded by a proxy.
}
message FeatureFlags {
@@ -308,7 +310,7 @@ message CommandConnected {
}
message CommandAuthResponse {
- optional string client_version = 1;
+ optional string client_version = 1; // The version of the client. Proxy
should forward client's client_version.
optional AuthData response = 2;
optional int32 protocol_version = 3 [default = 0];
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index cb93f885a1d..d63b04b6734 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -327,7 +327,7 @@ public class DirectProxyHandler {
ByteBuf command = Commands.newConnect(
authentication.getAuthMethodName(), authData,
protocolVersion,
proxyConnection.clientVersion, null /* target broker */,
- originalPrincipal, clientAuthData, clientAuthMethod);
+ originalPrincipal, clientAuthData, clientAuthMethod,
PulsarVersion.getVersion());
writeAndFlush(command);
isTlsOutboundChannel =
ProxyConnection.isTlsChannel(inboundChannel);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index a1994fb5af4..00ec0b592e4 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
@@ -66,7 +67,7 @@ public class ProxyClientCnx extends ClientCnx {
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
proxyConnection.clientVersion, proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
- clientAuthMethod);
+ clientAuthMethod, PulsarVersion.getVersion());
}
@Override
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index f03aa59619f..4d32c9dce2d 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -497,6 +497,15 @@ public class ProxyConnection extends PulsarHandler {
return;
}
+ if (connect.hasProxyVersion()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Client illegally provided proxyVersion.",
remoteAddress);
+ }
+ state = State.Closing;
+ writeAndFlushAndClose(Commands.newError(-1,
ServerError.NotAllowedError, "Must not provide proxyVersion"));
+ return;
+ }
+
try {
// init authn
this.clientConf = createClientConfiguration();