eolivelli commented on code in PR #19292:
URL: https://github.com/apache/pulsar/pull/19292#discussion_r1086341817
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,31 +421,65 @@ public void brokerConnected(DirectProxyHandler
directProxyHandler, CommandConnec
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
throws Exception {
- AuthData brokerData = authState.authenticate(clientData);
- // authentication has completed, will send newConnected command.
- if (authState.isComplete()) {
- clientAuthRole = authState.getAuthRole();
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Client successfully authenticated with {} role
{}",
- remoteAddress, authMethod, clientAuthRole);
+ CompletableFuture<AuthData> authChallengeFuture =
authState.authenticateAsync(clientData);
+ if (authChallengeFuture.isDone()) {
+ if (!authChallengeFuture.isCompletedExceptionally()) {
+ authChallengeSuccessCallback(authChallengeFuture.get());
+ } else {
+ try {
Review Comment:
what about
```
authChallengeFuture.exceptionally(e -> {
authenticationFailedCallback(unwrapCompletionException(e);)
});
```
and probably you could consolidate the two branches in one single
"whenComplete" block
```
authChallengeFuture.whenComplete( (result, error) -> {
if (error != null) {
authenticationFailedCallback(unwrapCompletionException(error));
} else {
authChallengeSuccessCallback(result);
}
});
```
as you did below
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,31 +421,65 @@ public void brokerConnected(DirectProxyHandler
directProxyHandler, CommandConnec
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
throws Exception {
- AuthData brokerData = authState.authenticate(clientData);
- // authentication has completed, will send newConnected command.
- if (authState.isComplete()) {
- clientAuthRole = authState.getAuthRole();
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Client successfully authenticated with {} role
{}",
- remoteAddress, authMethod, clientAuthRole);
+ CompletableFuture<AuthData> authChallengeFuture =
authState.authenticateAsync(clientData);
+ if (authChallengeFuture.isDone()) {
+ if (!authChallengeFuture.isCompletedExceptionally()) {
+ authChallengeSuccessCallback(authChallengeFuture.get());
+ } else {
+ try {
+ authChallengeFuture.get();
+ } catch (ExecutionException e) {
+ authenticationFailedCallback(e.getCause());
+ }
}
+ } else {
+ state = State.Connecting;
+ authChallengeFuture.whenCompleteAsync((authChallenge, throwable)
-> {
+ if (throwable == null) {
+ authChallengeSuccessCallback(authChallenge);
+ } else {
+ authenticationFailedCallback(throwable);
+ }
+ }, ctx.executor());
+ }
+ }
+
+ protected void authenticationFailedCallback(Throwable t) {
+ LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Failed to authenticate");
+ writeAndFlushAndClose(msg);
+ }
+
+ // Always run in this class's event loop.
+ protected void authChallengeSuccessCallback(AuthData authChallenge) {
+ try {
+ // authentication has completed, will send newConnected command.
+ if (authChallenge == null) {
+ clientAuthRole = authState.getAuthRole();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Client successfully authenticated with {}
role {}",
+ remoteAddress, authMethod, clientAuthRole);
+ }
- // First connection
- if (this.connectionPool == null || state == State.Connecting) {
- // authentication has completed, will send newConnected
command.
- completeConnect(clientData);
+ // First connection
+ if (this.connectionPool == null || state == State.Connecting) {
+ // authentication has completed, will send newConnected
command.
+ completeConnect();
+ }
+ return;
}
- return;
- }
- // auth not complete, continue auth with client side.
- final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData,
protocolVersionToAdvertise);
- writeAndFlush(msg);
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
+ // auth not complete, continue auth with client side.
+ final ByteBuf msg = Commands.newAuthChallenge(authMethod,
authChallenge, protocolVersionToAdvertise);
+ writeAndFlush(msg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Authentication in progress client by method
{}.",
+ remoteAddress, authMethod);
+ }
+ state = State.Connecting;
+ } catch (Exception e) {
Review Comment:
why do we catch blindly "Exception" here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]