This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6132b46efae [fix][broker] ServerCnx broken after recent cherry-picks
(#19521)
6132b46efae is described below
commit 6132b46efae60d87979966aca075b80ab7e2a87d
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 14 19:35:24 2023 -0600
[fix][broker] ServerCnx broken after recent cherry-picks (#19521)
I broke all release branches when I cherry picked
2847dd19f6e8a546f4d45bf51eb2b72aae0869ce to them. This change takes some of the
underlying logic from #19409, without taking the async logic.
* Make changes to `ServerCnx` to make tests pass
Tests are currently failing, so passing tests will show that this solution
is correct.
- [x] `doc-not-needed`
(cherry picked from commit 8246da282ca38e891bdf8a4e9abc47f640b22384)
(cherry picked from commit 15e4198e19ebb2045777c696ac39f969b2a57f66)
---
.../apache/pulsar/broker/service/ServerCnx.java | 49 +++++++++++-----------
1 file changed, 24 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index be2386bd369..899bdd49626 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -626,15 +626,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion)
{
- if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
- if (!isValidRoleAndOriginalPrincipal()) {
- state = State.Failed;
- service.getPulsarStats().recordConnectionCreateFail();
- final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError, "Invalid roles.");
-
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
- return;
- }
- }
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
@@ -651,7 +642,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
+ private void doAuthentication(AuthData clientData,
+ boolean useOriginalAuthState,
int clientProtocolVersion,
String clientVersion) throws Exception {
@@ -659,8 +651,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
// credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
AuthData brokerData = authState.authenticate(clientData);
@@ -693,6 +684,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (state != State.Connected) {
// First time authentication is done
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
+ if (!isValidRoleAndOriginalPrincipal()) {
+ state = State.Failed;
+ service.getPulsarStats().recordConnectionCreateFail();
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError, "Invalid roles.");
+
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
completeConnect(clientProtocolVersion, clientVersion);
} else {
// If the connection was already ready, it means we're doing a
refresh
@@ -706,18 +706,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
}
+ } else {
- return State.Connected;
- }
-
- // auth not complete, continue auth with client side.
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
clientProtocolVersion));
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
- log.debug("[{}] connect state change to : [{}]", remoteAddress,
State.Connecting.name());
+ // auth not complete, continue auth with client side.
+ ctx.writeAndFlush(Commands.newAuthChallenge(authMethod,
brokerData, clientProtocolVersion));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authentication in progress client by method
{}.",
+ remoteAddress, authMethod);
+ log.debug("[{}] connect state change to : [{}]",
remoteAddress, State.Connecting.name());
+ }
}
- return State.Connecting;
}
public void refreshAuthenticationCredentials() {
@@ -804,6 +802,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ state = State.Connecting;
+
try {
byte[] authData = connect.hasAuthData() ? connect.getAuthData() :
emptyArray;
AuthData clientData = AuthData.of(authData);
@@ -851,8 +851,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("[{}] Authenticate role : {}", remoteAddress, role);
}
- state = doAuthentication(clientData, clientProtocolVersion,
clientVersion);
-
// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
@@ -894,6 +892,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, originalPrincipal);
}
}
+ doAuthentication(clientData, false, clientProtocolVersion,
clientVersion);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(),
Optional.empty(), e);
@@ -917,7 +916,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
try {
AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
- doAuthentication(clientData, authResponse.getProtocolVersion(),
+ doAuthentication(clientData, originalAuthState != null,
authResponse.getProtocolVersion(),
authResponse.hasClientVersion() ?
authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();