This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 1d7b8f1b320 [fix][broker] ServerCnx broken after recent cherry-picks
(#19521)
1d7b8f1b320 is described below
commit 1d7b8f1b3203f37d9c4c10e68f70f5176726cc4e
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 14 19:35:24 2023 -0600
[fix][broker] ServerCnx broken after recent cherry-picks (#19521)
I broke all release branches when I cherry picked
2847dd19f6e8a546f4d45bf51eb2b72aae0869ce to them. This change takes some of the
underlying logic from #19409, without taking the async logic.
* Make changes to `ServerCnx` to make tests pass
Tests are currently failing, so passing tests will show that this solution
is correct.
- [x] `doc-not-needed`
(cherry picked from commit 8246da282ca38e891bdf8a4e9abc47f640b22384)
(cherry picked from commit 15e4198e19ebb2045777c696ac39f969b2a57f66)
(cherry picked from commit 6132b46efae60d87979966aca075b80ab7e2a87d)
---
.../apache/pulsar/broker/service/ServerCnx.java | 50 +++++++++++-----------
1 file changed, 25 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9a55a728717..d44febb002e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -604,15 +604,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion)
{
- if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
- if (!isValidRoleAndOriginalPrincipal()) {
- state = State.Failed;
- service.getPulsarStats().recordConnectionCreateFail();
- final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError, "Invalid roles.");
-
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
- return;
- }
- }
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
@@ -626,7 +617,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
+ private void doAuthentication(AuthData clientData,
+ boolean useOriginalAuthState,
int clientProtocolVersion,
String clientVersion) throws Exception {
@@ -634,8 +626,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
// credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
AuthData brokerData = authState.authenticate(clientData);
@@ -668,6 +659,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (state != State.Connected) {
// First time authentication is done
+ if (service.isAuthenticationEnabled() &&
service.isAuthorizationEnabled()) {
+ if (!isValidRoleAndOriginalPrincipal()) {
+ state = State.Failed;
+ service.getPulsarStats().recordConnectionCreateFail();
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthorizationError, "Invalid roles.");
+
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
completeConnect(clientProtocolVersion, clientVersion);
} else {
// If the connection was already ready, it means we're doing a
refresh
@@ -681,18 +681,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
}
+ } else {
- return State.Connected;
- }
-
- // auth not complete, continue auth with client side.
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
clientProtocolVersion));
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
- log.debug("[{}] connect state change to : [{}]", remoteAddress,
State.Connecting.name());
+ // auth not complete, continue auth with client side.
+ ctx.writeAndFlush(Commands.newAuthChallenge(authMethod,
brokerData, clientProtocolVersion));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authentication in progress client by method
{}.",
+ remoteAddress, authMethod);
+ log.debug("[{}] connect state change to : [{}]",
remoteAddress, State.Connecting.name());
+ }
}
- return State.Connecting;
}
public void refreshAuthenticationCredentials() {
@@ -779,6 +777,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ state = State.Connecting;
+
try {
byte[] authData = connect.hasAuthData() ? connect.getAuthData() :
emptyArray;
AuthData clientData = AuthData.of(authData);
@@ -822,8 +822,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
authState != null ? authState.getAuthRole() : null);
}
- state = doAuthentication(clientData, clientProtocolVersion,
clientVersion);
-
// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
@@ -864,6 +862,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, originalPrincipal);
}
}
+ doAuthentication(clientData, false, clientProtocolVersion,
clientVersion);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(),
Optional.empty(), e);
@@ -887,7 +886,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
try {
AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
- doAuthentication(clientData, authResponse.getProtocolVersion(),
authResponse.getClientVersion());
+ doAuthentication(clientData, originalAuthState != null,
authResponse.getProtocolVersion(),
+ authResponse.getClientVersion());
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
log.warn("[{}] Authentication failed: {} ", remoteAddress,
e.getMessage());