This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 804969091ca [improve][broker] ServerCnx: go to Failed state when auth 
fails (#19312)
804969091ca is described below

commit 804969091ca0d38132f9938fd6ffc519699eaeb6
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Jan 24 21:34:51 2023 -0600

    [improve][broker] ServerCnx: go to Failed state when auth fails (#19312)
    
    PIP: #12105
    
    ### Motivation
    
    When authentication fails in the `ServerCnx`, the state is left in `Start` 
if the primary `authData` fails authentication and in `Connecting` or 
`Connected` if the `originalAuthData` authentication fails. To prevent any kind 
of unexpected behavior, we should go to `Failed` state.
    
    Note that the tests verify the current behavior where a failed 
`originalAuthData` results first in a `Connected` command from the broker and 
then an `Error` command. I documented that I think this is sub optimal here 
https://github.com/apache/pulsar/issues/19311.
    
    ### Modifications
    
    * Update `ServerCnx` state to `Failed` when there is an authentication 
exception during `handleConnect` and during `handleAuthResponse`.
    * Update `handleAuthResponse` reply to `"Unable to authenticate"` instead 
of the `AuthenticationState` exception.
    
    ### Verifying this change
    
    A new test is added. The added test covers the change made in 
https://github.com/apache/pulsar/pull/19295 where we updated `ServerCnx` so 
that we call `AuthState#authenticate` instead of relying on the implementation 
detail that the initialization calls `authenticate`. That PR should have added 
a test.
    
    ### Does this pull request potentially affect one of the following parts:
    
    This is not a breaking change.
    
    ### Documentation
    
    - [x] `doc-not-needed`
    
    ### Matching PR in forked repository
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/18
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 16 +++--
 .../auth/MockMultiStageAuthenticationProvider.java | 45 +++++++++++++
 .../auth/MockMultiStageAuthenticationState.java    | 76 +++++++++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       | 78 +++++++++++++++++++++-
 .../broker/service/utils/ClientChannelHelper.java  |  6 ++
 5 files changed, 213 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 935934f6bdf..2a83252c309 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -969,10 +969,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
         } catch (Exception e) {
             service.getPulsarStats().recordConnectionCreateFail();
+            state = State.Failed;
             logAuthException(remoteAddress, "connect", getPrincipal(), 
Optional.empty(), e);
-            String msg = "Unable to authenticate";
-            writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, msg));
-            close();
+            ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Unable to authenticate");
+            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
         }
     }
 
@@ -994,15 +994,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     authResponse.hasClientVersion() ? 
authResponse.getClientVersion() : EMPTY);
         } catch (AuthenticationException e) {
             service.getPulsarStats().recordConnectionCreateFail();
+            state = State.Failed;
             log.warn("[{}] Authentication failed: {} ", remoteAddress, 
e.getMessage());
-            writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, e.getMessage()));
-            close();
+            ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Unable to authenticate");
+            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
         } catch (Exception e) {
             service.getPulsarStats().recordConnectionCreateFail();
+            state = State.Failed;
             String msg = "Unable to handleAuthResponse";
             log.warn("[{}] {} ", remoteAddress, msg, e);
-            writeAndFlush(Commands.newError(-1, ServerError.UnknownError, 
msg));
-            close();
+            ByteBuf command = Commands.newError(-1, ServerError.UnknownError, 
msg);
+            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java
new file mode 100644
index 00000000000..c62ff537bb0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import java.net.SocketAddress;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+/**
+ * Class that provides the same authentication semantics as the {@link 
MockAuthenticationProvider} except
+ * that this one initializes the {@link MockMultiStageAuthenticationState} 
class to support testing
+ * multistage authentication.
+ */
+public class MockMultiStageAuthenticationProvider extends 
MockAuthenticationProvider {
+
+    @Override
+    public String getAuthMethodName() {
+        return "multi-stage";
+    }
+
+    @Override
+    public AuthenticationState newAuthState(AuthData authData,
+                                            SocketAddress remoteAddress,
+                                            SSLSession sslSession) throws 
AuthenticationException {
+        return new MockMultiStageAuthenticationState(this);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java
new file mode 100644
index 00000000000..1ea56748867
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+import javax.naming.AuthenticationException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Performs multistage authentication by extending the paradigm created in 
{@link MockAuthenticationProvider}.
+ */
+public class MockMultiStageAuthenticationState implements AuthenticationState {
+
+    private final MockMultiStageAuthenticationProvider provider;
+    private String authRole = null;
+
+    MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider 
provider) {
+        this.provider = provider;
+    }
+
+    @Override
+    public String getAuthRole() throws AuthenticationException {
+        if (authRole == null) {
+            throw new AuthenticationException("Must authenticate first");
+        }
+        return null;
+    }
+
+    @Override
+    public AuthData authenticate(AuthData authData) throws 
AuthenticationException {
+        String data = new String(authData.getBytes(), UTF_8);
+        String[] parts = data.split("\\.");
+        if (parts.length == 2) {
+            if ("challenge".equals(parts[0])) {
+                return AuthData.of("challenged".getBytes());
+            } else {
+                AuthenticationDataCommand command = new 
AuthenticationDataCommand(data);
+                authRole = provider.authenticate(command);
+                // Auth successful, no more auth required
+                return null;
+            }
+        }
+        throw new AuthenticationException("Failed to authenticate");
+    }
+
+    @Override
+    public AuthenticationDataSource getAuthDataSource() {
+        return null;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return authRole != null;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1a98822340f..665ac51446b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -80,6 +80,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
+import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -103,6 +105,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -450,11 +453,84 @@ public class ServerCnxTest {
         ByteBuf clientCommand = Commands.newConnect("none", "", null);
         channel.writeInbound(clientCommand);
 
-        assertEquals(serverCnx.getState(), State.Start);
+        assertEquals(serverCnx.getState(), State.Failed);
         assertTrue(getResponse() instanceof CommandError);
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testConnectCommandWithFailingOriginalAuthData() throws 
Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(true);
+        svcConfig.setProxyRoles(Collections.singleton("proxy"));
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1,null,
+                null, "client", "fail", authMethodName);
+        channel.writeInbound(clientCommand);
+
+        // We currently expect two responses because the originalAuthData is 
verified after sending
+        // a successful response to the proxy. Because this is a synchronous 
operation, there is currently
+        // no risk. It would be better to fix this. See 
https://github.com/apache/pulsar/issues/19311.
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandConnected);
+        Object response2 = getResponse();
+        assertTrue(response2 instanceof CommandError);
+        assertEquals(((CommandError) response2).getMessage(), "Unable to 
authenticate");
+        assertEquals(serverCnx.getState(), State.Failed);
+        assertFalse(serverCnx.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testAuthResponseWithFailingAuthData() throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockMultiStageAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // Trigger connect command to result in AuthChallenge
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"challenge.client", "1");
+        channel.writeInbound(clientCommand);
+
+        Object challenge1 = getResponse();
+        assertTrue(challenge1 instanceof CommandAuthChallenge);
+
+        // Trigger another AuthChallenge to verify that code path continues to 
challenge
+        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, 
AuthData.of("challenge.client".getBytes()), 1, "1");
+        channel.writeInbound(authResponse1);
+
+        Object challenge2 = getResponse();
+        assertTrue(challenge2 instanceof CommandAuthChallenge);
+
+        // Trigger failure
+        ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, 
AuthData.of("fail.client".getBytes()), 1, "1");
+        channel.writeInbound(authResponse2);
+
+        Object response3 = getResponse();
+        assertTrue(response3 instanceof CommandError);
+        assertEquals(((CommandError) response3).getMessage(), "Unable to 
authenticate");
+        assertEquals(serverCnx.getState(), State.Failed);
+        assertFalse(serverCnx.isActive());
+        channel.finish();
+    }
+
     @Test(timeOut = 30000)
     public void testProducerCommand() throws Exception {
         resetChannel();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 2dc56282a79..bf0dd3aa9c1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils;
 import java.util.Queue;
 import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
@@ -83,6 +84,11 @@ public class ClientChannelHelper {
             queue.offer(new CommandConnected().copyFrom(connected));
         }
 
+        @Override
+        protected void handleAuthChallenge(CommandAuthChallenge challenge) {
+            queue.offer(new CommandAuthChallenge().copyFrom(challenge));
+        }
+
         @Override
         protected void handleSubscribe(CommandSubscribe subscribe) {
             queue.offer(new CommandSubscribe().copyFrom(subscribe));

Reply via email to