This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 804969091ca [improve][broker] ServerCnx: go to Failed state when auth
fails (#19312)
804969091ca is described below
commit 804969091ca0d38132f9938fd6ffc519699eaeb6
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Jan 24 21:34:51 2023 -0600
[improve][broker] ServerCnx: go to Failed state when auth fails (#19312)
PIP: #12105
### Motivation
When authentication fails in the `ServerCnx`, the state is left in `Start`
if the primary `authData` fails authentication and in `Connecting` or
`Connected` if the `originalAuthData` authentication fails. To prevent any kind
of unexpected behavior, we should go to `Failed` state.
Note that the tests verify the current behavior where a failed
`originalAuthData` results first in a `Connected` command from the broker and
then an `Error` command. I documented that I think this is sub optimal here
https://github.com/apache/pulsar/issues/19311.
### Modifications
* Update `ServerCnx` state to `Failed` when there is an authentication
exception during `handleConnect` and during `handleAuthResponse`.
* Update `handleAuthResponse` reply to `"Unable to authenticate"` instead
of the `AuthenticationState` exception.
### Verifying this change
A new test is added. The added test covers the change made in
https://github.com/apache/pulsar/pull/19295 where we updated `ServerCnx` so
that we call `AuthState#authenticate` instead of relying on the implementation
detail that the initialization calls `authenticate`. That PR should have added
a test.
### Does this pull request potentially affect one of the following parts:
This is not a breaking change.
### Documentation
- [x] `doc-not-needed`
### Matching PR in forked repository
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/18
---
.../apache/pulsar/broker/service/ServerCnx.java | 16 +++--
.../auth/MockMultiStageAuthenticationProvider.java | 45 +++++++++++++
.../auth/MockMultiStageAuthenticationState.java | 76 +++++++++++++++++++++
.../pulsar/broker/service/ServerCnxTest.java | 78 +++++++++++++++++++++-
.../broker/service/utils/ClientChannelHelper.java | 6 ++
5 files changed, 213 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 935934f6bdf..2a83252c309 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -969,10 +969,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
+ state = State.Failed;
logAuthException(remoteAddress, "connect", getPrincipal(),
Optional.empty(), e);
- String msg = "Unable to authenticate";
- writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, msg));
- close();
+ ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Unable to authenticate");
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
}
}
@@ -994,15 +994,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
authResponse.hasClientVersion() ?
authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
+ state = State.Failed;
log.warn("[{}] Authentication failed: {} ", remoteAddress,
e.getMessage());
- writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, e.getMessage()));
- close();
+ ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Unable to authenticate");
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
+ state = State.Failed;
String msg = "Unable to handleAuthResponse";
log.warn("[{}] {} ", remoteAddress, msg, e);
- writeAndFlush(Commands.newError(-1, ServerError.UnknownError,
msg));
- close();
+ ByteBuf command = Commands.newError(-1, ServerError.UnknownError,
msg);
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java
new file mode 100644
index 00000000000..c62ff537bb0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import java.net.SocketAddress;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+/**
+ * Class that provides the same authentication semantics as the {@link
MockAuthenticationProvider} except
+ * that this one initializes the {@link MockMultiStageAuthenticationState}
class to support testing
+ * multistage authentication.
+ */
+public class MockMultiStageAuthenticationProvider extends
MockAuthenticationProvider {
+
+ @Override
+ public String getAuthMethodName() {
+ return "multi-stage";
+ }
+
+ @Override
+ public AuthenticationState newAuthState(AuthData authData,
+ SocketAddress remoteAddress,
+ SSLSession sslSession) throws
AuthenticationException {
+ return new MockMultiStageAuthenticationState(this);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java
new file mode 100644
index 00000000000..1ea56748867
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+import javax.naming.AuthenticationException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Performs multistage authentication by extending the paradigm created in
{@link MockAuthenticationProvider}.
+ */
+public class MockMultiStageAuthenticationState implements AuthenticationState {
+
+ private final MockMultiStageAuthenticationProvider provider;
+ private String authRole = null;
+
+ MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider
provider) {
+ this.provider = provider;
+ }
+
+ @Override
+ public String getAuthRole() throws AuthenticationException {
+ if (authRole == null) {
+ throw new AuthenticationException("Must authenticate first");
+ }
+ return null;
+ }
+
+ @Override
+ public AuthData authenticate(AuthData authData) throws
AuthenticationException {
+ String data = new String(authData.getBytes(), UTF_8);
+ String[] parts = data.split("\\.");
+ if (parts.length == 2) {
+ if ("challenge".equals(parts[0])) {
+ return AuthData.of("challenged".getBytes());
+ } else {
+ AuthenticationDataCommand command = new
AuthenticationDataCommand(data);
+ authRole = provider.authenticate(command);
+ // Auth successful, no more auth required
+ return null;
+ }
+ }
+ throw new AuthenticationException("Failed to authenticate");
+ }
+
+ @Override
+ public AuthenticationDataSource getAuthDataSource() {
+ return null;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return authRole != null;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1a98822340f..665ac51446b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -80,6 +80,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
+import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -103,6 +105,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -450,11 +453,84 @@ public class ServerCnxTest {
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);
- assertEquals(serverCnx.getState(), State.Start);
+ assertEquals(serverCnx.getState(), State.Failed);
assertTrue(getResponse() instanceof CommandError);
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testConnectCommandWithFailingOriginalAuthData() throws
Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticateOriginalAuthData(true);
+ svcConfig.setProxyRoles(Collections.singleton("proxy"));
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.proxy", 1,null,
+ null, "client", "fail", authMethodName);
+ channel.writeInbound(clientCommand);
+
+ // We currently expect two responses because the originalAuthData is
verified after sending
+ // a successful response to the proxy. Because this is a synchronous
operation, there is currently
+ // no risk. It would be better to fix this. See
https://github.com/apache/pulsar/issues/19311.
+ Object response1 = getResponse();
+ assertTrue(response1 instanceof CommandConnected);
+ Object response2 = getResponse();
+ assertTrue(response2 instanceof CommandError);
+ assertEquals(((CommandError) response2).getMessage(), "Unable to
authenticate");
+ assertEquals(serverCnx.getState(), State.Failed);
+ assertFalse(serverCnx.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testAuthResponseWithFailingAuthData() throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockMultiStageAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // Trigger connect command to result in AuthChallenge
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"challenge.client", "1");
+ channel.writeInbound(clientCommand);
+
+ Object challenge1 = getResponse();
+ assertTrue(challenge1 instanceof CommandAuthChallenge);
+
+ // Trigger another AuthChallenge to verify that code path continues to
challenge
+ ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName,
AuthData.of("challenge.client".getBytes()), 1, "1");
+ channel.writeInbound(authResponse1);
+
+ Object challenge2 = getResponse();
+ assertTrue(challenge2 instanceof CommandAuthChallenge);
+
+ // Trigger failure
+ ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName,
AuthData.of("fail.client".getBytes()), 1, "1");
+ channel.writeInbound(authResponse2);
+
+ Object response3 = getResponse();
+ assertTrue(response3 instanceof CommandError);
+ assertEquals(((CommandError) response3).getMessage(), "Unable to
authenticate");
+ assertEquals(serverCnx.getState(), State.Failed);
+ assertFalse(serverCnx.isActive());
+ channel.finish();
+ }
+
@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 2dc56282a79..bf0dd3aa9c1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils;
import java.util.Queue;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
@@ -83,6 +84,11 @@ public class ClientChannelHelper {
queue.offer(new CommandConnected().copyFrom(connected));
}
+ @Override
+ protected void handleAuthChallenge(CommandAuthChallenge challenge) {
+ queue.offer(new CommandAuthChallenge().copyFrom(challenge));
+ }
+
@Override
protected void handleSubscribe(CommandSubscribe subscribe) {
queue.offer(new CommandSubscribe().copyFrom(subscribe));