kaushik-develop commented on a change in pull request #10775:
URL: https://github.com/apache/pulsar/pull/10775#discussion_r643549813



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -330,48 +331,54 @@ protected void handleAuthChallenge(CommandAuthChallenge 
authChallenge) {
         checkArgument(authChallenge.hasChallenge());
         checkArgument(authChallenge.getChallenge().hasAuthData());
 
-        if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, 
authChallenge.getChallenge().getAuthData())) {
-            try {
-                authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
-            } catch (PulsarClientException e) {
-                log.error("{} Error when refreshing authentication data 
provider: {}", ctx.channel(), e);
-                connectionFuture.completeExceptionally(e);
-                return;
+        // Run this the code to respond to the authentication challenge in 
another thread so we are not potentially
+        // blocking a pulsar-client-io thread.  A portion of the code is user 
supplied, such as getting a token, which is another
+        // reason to run in a separate thread so user code cannot not block 
pulsar-client-io thread. Potential blocking code
+        // will also render send timeouts to be not respected since it uses 
pulsar-client-io thread as well.
+        CompletableFuture.runAsync(() -> {
+            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, 
authChallenge.getChallenge().getAuthData())) {
+                try {
+                    authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+                } catch (PulsarClientException e) {
+                    log.error("{} Error when refreshing authentication data 
provider: {}", ctx.channel(), e);
+                    connectionFuture.completeExceptionally(e);
+                    return;
+                }
             }
-        }
-
-        // mutual authn. If auth not complete, continue auth; if auth 
complete, complete connectionFuture.
-        try {
-            AuthData authData = authenticationDataProvider
-                
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
 
-            checkState(!authData.isComplete());
+            // mutual authn. If auth not complete, continue auth; if auth 
complete, complete connectionFuture.
+            try {
+                AuthData authData = authenticationDataProvider
+                        
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
 
-            ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
-                authData,
-                this.protocolVersion,
-                PulsarVersion.getVersion());
+                checkState(!authData.isComplete());
 
-            if (log.isDebugEnabled()) {
-                log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
-            }
+                ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
+                        authData,
+                        protocolVersion,
+                        PulsarVersion.getVersion());
 
-            ctx.writeAndFlush(request).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    log.warn("{} Failed to send request for mutual auth to 
broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                    
connectionFuture.completeExceptionally(writeFuture.cause());
+                if (log.isDebugEnabled()) {
+                    log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
                 }
-            });
 
-            if (state == State.SentConnectFrame) {
-                state = State.Connecting;
+                ctx.writeAndFlush(request).addListener(writeFuture -> {
+                    if (!writeFuture.isSuccess()) {
+                        log.warn("{} Failed to send request for mutual auth to 
broker: {}", ctx.channel(),
+                                writeFuture.cause().getMessage());
+                        
connectionFuture.completeExceptionally(writeFuture.cause());
+                    }
+                });
+
+                if (state == State.SentConnectFrame) {
+                    state = State.Connecting;

Review comment:
       Qs: is there any (added?) possibility of multiple concurrent calls, 
updating `state`, now that this will run asynchronously off the io thread?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to