lin-zhao commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980609442
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -253,8 +254,25 @@ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
} else {
log.info("{} Connected through proxy to target broker at {}",
ctx.channel(), proxyToTargetBrokerAddress);
}
- // Send CONNECT command
- ctx.writeAndFlush(newConnectCommand())
+ completeActive();
+ }
+
+ protected void completeActive() throws Exception {
+ sendConnectCommand(null, null, null);
+ }
+
+ protected final void sendConnectCommand(String originalPrincipal, AuthData
originalAuthData,
+ String originalAuthMethod) throws
Exception {
+ // mutual authentication is to auth between `remoteHostName` and this
client for this channel.
+ // each channel will have a mutual client/server pair, mutual client
evaluateChallenge with init data,
+ // and return authData to server.
+ authenticationDataProvider =
authentication.getAuthData(remoteHostName);
Review Comment:
nit pick:
```suggestion
this.authenticationDataProvider =
authentication.getAuthData(remoteHostName);
```
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
*/
package org.apache.pulsar.proxy.server;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
+@Slf4j
public class ProxyClientCnx extends ClientCnx {
-
- String clientAuthRole;
- AuthData clientAuthData;
- String clientAuthMethod;
- int protocolVersion;
+ private final boolean forwardClientAuthData;
+ private final String clientAuthMethod;
+ private final String clientAuthRole;
+ private final Function<Boolean, CompletableFuture<AuthData>>
clientAuthDataSupplier;
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, String clientAuthRole,
- AuthData clientAuthData, String clientAuthMethod,
int protocolVersion) {
- super(conf, eventLoopGroup);
+ Function<Boolean, CompletableFuture<AuthData>>
clientAuthDataSupplier,
+ String clientAuthMethod,
+ int protocolVersion, boolean forwardClientAuthData) {
+ super(conf, eventLoopGroup, protocolVersion);
this.clientAuthRole = clientAuthRole;
- this.clientAuthData = clientAuthData;
+ this.clientAuthDataSupplier = clientAuthDataSupplier;
this.clientAuthMethod = clientAuthMethod;
- this.protocolVersion = protocolVersion;
+ this.forwardClientAuthData = forwardClientAuthData;
}
@Override
- protected ByteBuf newConnectCommand() throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("New Connection opened via ProxyClientCnx with params
clientAuthRole = {},"
- + " clientAuthData = {}, clientAuthMethod = {}",
- clientAuthRole, clientAuthData, clientAuthMethod);
+ protected void completeActive() {
+ clientAuthDataSupplier.apply(false).thenAccept(clientAuthData -> {
+ try {
+ sendConnectCommand(clientAuthRole, clientAuthData,
clientAuthMethod);
+ } catch (Exception e) {
+ log.error("{} Error during handshake", ctx.channel(), e);
+ close(e);
+ }
+ });
+ }
+
+ @Override
+ protected void prepareMutualAuth(CommandAuthChallenge authChallenge)
throws AuthenticationException {
+ boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES,
authChallenge.getChallenge().getAuthData());
+ if (!forwardClientAuthData || !isRefresh) {
+ super.prepareMutualAuth(authChallenge);
+ return;
}
- authenticationDataProvider =
authentication.getAuthData(remoteHostName);
- AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
- return Commands.newConnect(authentication.getAuthMethodName(),
authData, this.protocolVersion,
- PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
- clientAuthMethod);
+ clientAuthDataSupplier.apply(true).thenAccept(originalClientAuthData
-> {
+ sendMutualAuthCommand(clientAuthMethod, originalClientAuthData);
+ }).exceptionally(e -> {
+ log.error("{} Error mutual verify", ctx.channel(), e);
Review Comment:
What's the reason to swallow any exception instead of throwing to the
caller? This method declares `AuthenticationException`. Is it supposed to be
thrown by this line? If yes you probably don't want to handle and ignore all
exceptions here.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -1243,6 +1254,13 @@ public void close() {
}
}
+ public void close(Throwable e) {
Review Comment:
Is there a particular reason to make this public? `close` as a method name
is very generic.
How about
```suggestion
private void closeWithException(Throwable e) {
```
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -18,46 +18,61 @@
*/
package org.apache.pulsar.proxy.server;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
-import org.apache.pulsar.PulsarVersion;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import javax.naming.AuthenticationException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
+@Slf4j
public class ProxyClientCnx extends ClientCnx {
-
- String clientAuthRole;
- AuthData clientAuthData;
- String clientAuthMethod;
- int protocolVersion;
+ private final boolean forwardClientAuthData;
+ private final String clientAuthMethod;
+ private final String clientAuthRole;
+ private final Function<Boolean, CompletableFuture<AuthData>>
clientAuthDataSupplier;
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, String clientAuthRole,
Review Comment:
This is a backward incompatible change. Do you want to instead add a new
overloaded method instead of changing the existing constructor? Otherwise
there's argument this needs to be a major version bump.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]