codelipenghui commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r983029975
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -453,5 +453,8 @@ public void doMarkAndReleaseUselessConnections(){
// Do release idle connections.
releaseIdleConnectionTaskList.forEach(Runnable::run);
}
-}
+ public ConcurrentMap<InetSocketAddress, ConcurrentMap<Integer,
CompletableFuture<ClientCnx>>> getPool() {
+ return pool;
+ }
Review Comment:
We should return a new structure or immutable structure. Otherwise, the
caller can modify the map which will introduce unpredictable behavior
My suggestion is we can change to `getConnections()` and only return a List
structure.
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse
authResponse) {
try {
AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
+ if (connectionPool != null && state == State.ProxyLookupRequests) {
+ if
(service.getConfiguration().isForwardAuthorizationCredentials()) {
+ connectionPool.getPool().values().forEach(n ->
n.values().forEach(cnxFuture -> {
+ String clientVersion;
+ if (authResponse.hasClientVersion()) {
+ clientVersion = authResponse.getClientVersion();
+ } else {
+ clientVersion = PulsarVersion.getVersion();
+ }
+ int protocolVersion;
+ if (authResponse.hasProtocolVersion()) {
+ protocolVersion =
authResponse.getProtocolVersion();
+ } else {
+ protocolVersion =
Commands.getCurrentProtocolVersion();
+ }
+
+ ByteBuf cmd =
+ Commands.newAuthResponse(clientAuthMethod,
clientData, protocolVersion, clientVersion);
+ cnxFuture.thenAccept(clientCnx ->
clientCnx.ctx().writeAndFlush(cmd)
+ .addListener(writeFuture -> {
+ if (writeFuture.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{} authentication is
refreshed successfully by {}, auth method: {} ",
+ clientCnx.ctx().channel(),
ctx.channel(), clientAuthMethod);
+ }
+ } else {
+ LOG.error("{} Failed to refresh
request for mutual auth to client {}",
+ clientCnx.ctx().channel(),
+ writeFuture.cause());
+
ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());
Review Comment:
It should be `toServerCnx`, not the `ctx`? The `ctx` is from client to
proxy. Here means we are not able to write data to the broker through the
`toServerCnx` right?
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse
authResponse) {
try {
AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
+ if (connectionPool != null && state == State.ProxyLookupRequests) {
+ if
(service.getConfiguration().isForwardAuthorizationCredentials()) {
+ connectionPool.getPool().values().forEach(n ->
n.values().forEach(cnxFuture -> {
+ String clientVersion;
+ if (authResponse.hasClientVersion()) {
+ clientVersion = authResponse.getClientVersion();
+ } else {
+ clientVersion = PulsarVersion.getVersion();
+ }
+ int protocolVersion;
+ if (authResponse.hasProtocolVersion()) {
+ protocolVersion =
authResponse.getProtocolVersion();
+ } else {
+ protocolVersion =
Commands.getCurrentProtocolVersion();
+ }
+
+ ByteBuf cmd =
+ Commands.newAuthResponse(clientAuthMethod,
clientData, protocolVersion, clientVersion);
+ cnxFuture.thenAccept(clientCnx ->
clientCnx.ctx().writeAndFlush(cmd)
Review Comment:
And the name is also confusing, it should be `toBrokerCnx`?
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
- return Commands.newConnect(authentication.getAuthMethodName(),
authData, this.protocolVersion,
- PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
- clientAuthMethod);
+ return Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
+ PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
+ clientAuthMethod);
}
- private static final Logger log =
LoggerFactory.getLogger(ProxyClientCnx.class);
+ @Override
+ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+ checkArgument(authChallenge.hasChallenge());
+ checkArgument(authChallenge.getChallenge().hasAuthData());
+
+ boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES,
authChallenge.getChallenge().getAuthData());
+ if (!forwardClientAuthData || !isRefresh ||
refreshClientAuthDataNotifier == null) {
+ super.handleAuthChallenge(authChallenge);
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Request to refresh the original client
authentication data", ctx.channel());
+ }
+ refreshClientAuthDataNotifier.run();
Review Comment:
Looks like we can only pass the `ctx` and `protocolVersion` to
`ProxyClientCnx` from `ProxyConnection`, so we don't need a callback mechanism
to complete the auth challenge handling
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -535,6 +542,42 @@ protected void handleAuthResponse(CommandAuthResponse
authResponse) {
try {
AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
+ if (connectionPool != null && state == State.ProxyLookupRequests) {
+ if
(service.getConfiguration().isForwardAuthorizationCredentials()) {
+ connectionPool.getPool().values().forEach(n ->
n.values().forEach(cnxFuture -> {
+ String clientVersion;
+ if (authResponse.hasClientVersion()) {
+ clientVersion = authResponse.getClientVersion();
+ } else {
+ clientVersion = PulsarVersion.getVersion();
+ }
+ int protocolVersion;
+ if (authResponse.hasProtocolVersion()) {
+ protocolVersion =
authResponse.getProtocolVersion();
+ } else {
+ protocolVersion =
Commands.getCurrentProtocolVersion();
+ }
+
+ ByteBuf cmd =
+ Commands.newAuthResponse(clientAuthMethod,
clientData, protocolVersion, clientVersion);
+ cnxFuture.thenAccept(clientCnx ->
clientCnx.ctx().writeAndFlush(cmd)
Review Comment:
I think we need some logs if the `cnxFuture` is completed with exception
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,16 +425,22 @@ public void brokerConnected(DirectProxyHandler
directProxyHandler, CommandConnec
}
// According to auth result, send newConnected or newAuthChallenge command.
- private void doAuthentication(AuthData clientData) throws Exception {
+ private void doAuthentication(AuthData clientData)
+ throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role
{}",
- remoteAddress, authMethod, clientAuthRole);
+ remoteAddress, authMethod, clientAuthRole);
+ }
+
+ // First connection
+ if (this.connectionPool == null || state == State.Connecting) {
Review Comment:
Why do we need to check `this.connectionPool == null`?
It looks like to avoid calling `completeConnect(clientData);` multiple times?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]