kaushik-develop commented on a change in pull request #10775:
URL: https://github.com/apache/pulsar/pull/10775#discussion_r643549813
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -330,48 +331,54 @@ protected void handleAuthChallenge(CommandAuthChallenge
authChallenge) {
checkArgument(authChallenge.hasChallenge());
checkArgument(authChallenge.getChallenge().hasAuthData());
- if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES,
authChallenge.getChallenge().getAuthData())) {
- try {
- authenticationDataProvider =
authentication.getAuthData(remoteHostName);
- } catch (PulsarClientException e) {
- log.error("{} Error when refreshing authentication data
provider: {}", ctx.channel(), e);
- connectionFuture.completeExceptionally(e);
- return;
+ // Run this the code to respond to the authentication challenge in
another thread so we are not potentially
+ // blocking a pulsar-client-io thread. A portion of the code is user
supplied, such as getting a token, which is another
+ // reason to run in a separate thread so user code cannot not block
pulsar-client-io thread. Potential blocking code
+ // will also render send timeouts to be not respected since it uses
pulsar-client-io thread as well.
+ CompletableFuture.runAsync(() -> {
+ if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES,
authChallenge.getChallenge().getAuthData())) {
+ try {
+ authenticationDataProvider =
authentication.getAuthData(remoteHostName);
+ } catch (PulsarClientException e) {
+ log.error("{} Error when refreshing authentication data
provider: {}", ctx.channel(), e);
+ connectionFuture.completeExceptionally(e);
+ return;
+ }
}
- }
-
- // mutual authn. If auth not complete, continue auth; if auth
complete, complete connectionFuture.
- try {
- AuthData authData = authenticationDataProvider
-
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
- checkState(!authData.isComplete());
+ // mutual authn. If auth not complete, continue auth; if auth
complete, complete connectionFuture.
+ try {
+ AuthData authData = authenticationDataProvider
+
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
- ByteBuf request =
Commands.newAuthResponse(authentication.getAuthMethodName(),
- authData,
- this.protocolVersion,
- PulsarVersion.getVersion());
+ checkState(!authData.isComplete());
- if (log.isDebugEnabled()) {
- log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
- }
+ ByteBuf request =
Commands.newAuthResponse(authentication.getAuthMethodName(),
+ authData,
+ protocolVersion,
+ PulsarVersion.getVersion());
- ctx.writeAndFlush(request).addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- log.warn("{} Failed to send request for mutual auth to
broker: {}", ctx.channel(),
- writeFuture.cause().getMessage());
-
connectionFuture.completeExceptionally(writeFuture.cause());
+ if (log.isDebugEnabled()) {
+ log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
}
- });
- if (state == State.SentConnectFrame) {
- state = State.Connecting;
+ ctx.writeAndFlush(request).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ log.warn("{} Failed to send request for mutual auth to
broker: {}", ctx.channel(),
+ writeFuture.cause().getMessage());
+
connectionFuture.completeExceptionally(writeFuture.cause());
+ }
+ });
+
+ if (state == State.SentConnectFrame) {
+ state = State.Connecting;
Review comment:
Qs: is there any (added?) possibility of multiple concurrent calls,
updating `state`, now that this will run asynchronously off the io thread?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]