jerrypeng commented on a change in pull request #10775:
URL: https://github.com/apache/pulsar/pull/10775#discussion_r644361349



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -330,48 +331,54 @@ protected void handleAuthChallenge(CommandAuthChallenge 
authChallenge) {
         checkArgument(authChallenge.hasChallenge());
         checkArgument(authChallenge.getChallenge().hasAuthData());
 
-        if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, 
authChallenge.getChallenge().getAuthData())) {
-            try {
-                authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
-            } catch (PulsarClientException e) {
-                log.error("{} Error when refreshing authentication data 
provider: {}", ctx.channel(), e);
-                connectionFuture.completeExceptionally(e);
-                return;
+        // Run this the code to respond to the authentication challenge in 
another thread so we are not potentially
+        // blocking a pulsar-client-io thread.  A portion of the code is user 
supplied, such as getting a token, which is another
+        // reason to run in a separate thread so user code cannot not block 
pulsar-client-io thread. Potential blocking code
+        // will also render send timeouts to be not respected since it uses 
pulsar-client-io thread as well.
+        CompletableFuture.runAsync(() -> {
+            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, 
authChallenge.getChallenge().getAuthData())) {
+                try {
+                    authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+                } catch (PulsarClientException e) {
+                    log.error("{} Error when refreshing authentication data 
provider: {}", ctx.channel(), e);
+                    connectionFuture.completeExceptionally(e);
+                    return;
+                }
             }
-        }
-
-        // mutual authn. If auth not complete, continue auth; if auth 
complete, complete connectionFuture.
-        try {
-            AuthData authData = authenticationDataProvider
-                
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
 
-            checkState(!authData.isComplete());
+            // mutual authn. If auth not complete, continue auth; if auth 
complete, complete connectionFuture.
+            try {
+                AuthData authData = authenticationDataProvider
+                        
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
 
-            ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
-                authData,
-                this.protocolVersion,
-                PulsarVersion.getVersion());
+                checkState(!authData.isComplete());
 
-            if (log.isDebugEnabled()) {
-                log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
-            }
+                ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
+                        authData,
+                        protocolVersion,
+                        PulsarVersion.getVersion());
 
-            ctx.writeAndFlush(request).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    log.warn("{} Failed to send request for mutual auth to 
broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                    
connectionFuture.completeExceptionally(writeFuture.cause());
+                if (log.isDebugEnabled()) {
+                    log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
                 }
-            });
 
-            if (state == State.SentConnectFrame) {
-                state = State.Connecting;
+                ctx.writeAndFlush(request).addListener(writeFuture -> {
+                    if (!writeFuture.isSuccess()) {
+                        log.warn("{} Failed to send request for mutual auth to 
broker: {}", ctx.channel(),
+                                writeFuture.cause().getMessage());
+                        
connectionFuture.completeExceptionally(writeFuture.cause());
+                    }
+                });
+
+                if (state == State.SentConnectFrame) {
+                    state = State.Connecting;

Review comment:
       there is but it should be fine.  Currently,  the handler code can be 
concurrently executed when the number of pulsar-client-io threads is set to be 
more than 1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to