nodece commented on code in PR #17831:
URL: https://github.com/apache/pulsar/pull/17831#discussion_r980827317


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -361,51 +369,54 @@ protected void handleConnected(CommandConnected 
connected) {
         state = State.Ready;
     }
 
-    @Override
-    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
-        checkArgument(authChallenge.hasChallenge());
-        checkArgument(authChallenge.getChallenge().hasAuthData());
+    protected final void sendMutualAuthCommand(String authMethod, AuthData 
authData) {
+        if (log.isDebugEnabled()) {
+            log.debug("{} Mutual auth {}", ctx.channel(), authMethod);
+        }
 
+        ByteBuf request = Commands.newAuthResponse(authMethod,
+                authData,
+                this.protocolVersion,
+                PulsarVersion.getVersion());
+
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send request for mutual auth to broker: 
{}", ctx.channel(),
+                        writeFuture.cause().getMessage());
+                close(writeFuture.cause());
+            }
+        });
+    }
+
+    protected void prepareMutualAuth(CommandAuthChallenge authChallenge) 
throws AuthenticationException {
         if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, 
authChallenge.getChallenge().getAuthData())) {
             try {
                 authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
             } catch (PulsarClientException e) {
                 log.error("{} Error when refreshing authentication data 
provider: {}", ctx.channel(), e);
-                connectionFuture.completeExceptionally(e);
+                close(e);
                 return;
             }
         }
-
         // mutual authn. If auth not complete, continue auth; if auth 
complete, complete connectionFuture.
-        try {
-            AuthData authData = authenticationDataProvider
-                
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
-
-            checkState(!authData.isComplete());
-
-            ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
-                authData,
-                this.protocolVersion,
-                PulsarVersion.getVersion());
-
-            if (log.isDebugEnabled()) {
-                log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
-            }
-
-            ctx.writeAndFlush(request).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    log.warn("{} Failed to send request for mutual auth to 
broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                    
connectionFuture.completeExceptionally(writeFuture.cause());
-                }
-            });
+        AuthData authData =
+                
authenticationDataProvider.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
+        checkState(!authData.isComplete());
+        sendMutualAuthCommand(authentication.getAuthMethodName(), authData);
+    }
 
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+        try {
+            prepareMutualAuth(authChallenge);

Review Comment:
   I'll refactor this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to