jerrypeng commented on a change in pull request #10775:
URL: https://github.com/apache/pulsar/pull/10775#discussion_r644361349
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -330,48 +331,54 @@ protected void handleAuthChallenge(CommandAuthChallenge
authChallenge) {
checkArgument(authChallenge.hasChallenge());
checkArgument(authChallenge.getChallenge().hasAuthData());
- if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES,
authChallenge.getChallenge().getAuthData())) {
- try {
- authenticationDataProvider =
authentication.getAuthData(remoteHostName);
- } catch (PulsarClientException e) {
- log.error("{} Error when refreshing authentication data
provider: {}", ctx.channel(), e);
- connectionFuture.completeExceptionally(e);
- return;
+ // Run this the code to respond to the authentication challenge in
another thread so we are not potentially
+ // blocking a pulsar-client-io thread. A portion of the code is user
supplied, such as getting a token, which is another
+ // reason to run in a separate thread so user code cannot not block
pulsar-client-io thread. Potential blocking code
+ // will also render send timeouts to be not respected since it uses
pulsar-client-io thread as well.
+ CompletableFuture.runAsync(() -> {
+ if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES,
authChallenge.getChallenge().getAuthData())) {
+ try {
+ authenticationDataProvider =
authentication.getAuthData(remoteHostName);
+ } catch (PulsarClientException e) {
+ log.error("{} Error when refreshing authentication data
provider: {}", ctx.channel(), e);
+ connectionFuture.completeExceptionally(e);
+ return;
+ }
}
- }
-
- // mutual authn. If auth not complete, continue auth; if auth
complete, complete connectionFuture.
- try {
- AuthData authData = authenticationDataProvider
-
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
- checkState(!authData.isComplete());
+ // mutual authn. If auth not complete, continue auth; if auth
complete, complete connectionFuture.
+ try {
+ AuthData authData = authenticationDataProvider
+
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData()));
- ByteBuf request =
Commands.newAuthResponse(authentication.getAuthMethodName(),
- authData,
- this.protocolVersion,
- PulsarVersion.getVersion());
+ checkState(!authData.isComplete());
- if (log.isDebugEnabled()) {
- log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
- }
+ ByteBuf request =
Commands.newAuthResponse(authentication.getAuthMethodName(),
+ authData,
+ protocolVersion,
+ PulsarVersion.getVersion());
- ctx.writeAndFlush(request).addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- log.warn("{} Failed to send request for mutual auth to
broker: {}", ctx.channel(),
- writeFuture.cause().getMessage());
-
connectionFuture.completeExceptionally(writeFuture.cause());
+ if (log.isDebugEnabled()) {
+ log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
}
- });
- if (state == State.SentConnectFrame) {
- state = State.Connecting;
+ ctx.writeAndFlush(request).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ log.warn("{} Failed to send request for mutual auth to
broker: {}", ctx.channel(),
+ writeFuture.cause().getMessage());
+
connectionFuture.completeExceptionally(writeFuture.cause());
+ }
+ });
+
+ if (state == State.SentConnectFrame) {
+ state = State.Connecting;
Review comment:
there is but it should be fine. Currently, the handler code can be
concurrently executed when the number of pulsar-client-io threads is set to be
more than 1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]