This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 153e4d4cc3b [fix][broker] Make authentication refresh threadsafe  
(#19506)
153e4d4cc3b is described below

commit 153e4d4cc3b56aaee224b0a68e0186c08125c975
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 14 03:09:55 2023 -0600

    [fix][broker] Make authentication refresh threadsafe  (#19506)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../broker/service/PulsarChannelInitializer.java   |  30 ------
 .../apache/pulsar/broker/service/ServerCnx.java    | 108 +++++++++++++--------
 .../broker/service/PersistentTopicE2ETest.java     |   2 -
 .../pulsar/broker/service/ServerCnxTest.java       |  14 ++-
 4 files changed, 75 insertions(+), 79 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index a96625af468..5308b3c981e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
@@ -30,8 +27,6 @@ import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslProvider;
-import java.net.SocketAddress;
-import java.util.concurrent.TimeUnit;
 import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -57,15 +52,6 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final ServiceConfiguration brokerConf;
     private NettySSLContextAutoRefreshBuilder 
nettySSLContextAutoRefreshBuilder;
 
-    // This cache is used to maintain a list of active connections to iterate 
over them
-    // We keep weak references to have the cache to be auto cleaned up when 
the connections
-    // objects are GCed.
-    @VisibleForTesting
-    protected final Cache<SocketAddress, ServerCnx> connections = 
Caffeine.newBuilder()
-            .weakKeys()
-            .weakValues()
-            .build();
-
     /**
      * @param pulsar
      *              An instance of {@link PulsarService}
@@ -114,10 +100,6 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             this.sslCtxRefresher = null;
         }
         this.brokerConf = pulsar.getConfiguration();
-
-        
pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::refreshAuthenticationCredentials),
-                pulsar.getConfig().getAuthenticationRefreshCheckSeconds(),
-                pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), 
TimeUnit.SECONDS);
     }
 
     @Override
@@ -148,18 +130,6 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         ch.pipeline().addLast("flowController", new FlowControlHandler());
         ServerCnx cnx = newServerCnx(pulsar, listenerName);
         ch.pipeline().addLast("handler", cnx);
-
-        connections.put(ch.remoteAddress(), cnx);
-    }
-
-    private void refreshAuthenticationCredentials() {
-        connections.asMap().values().forEach(cnx -> {
-            try {
-                cnx.refreshAuthenticationCredentials();
-            } catch (Throwable t) {
-                log.warn("[{}] Failed to refresh auth credentials", 
cnx.clientAddress());
-            }
-        });
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4c81b46601e..71b31eeeeda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -39,6 +39,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.FastThreadLocal;
 import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
 import io.prometheus.client.Gauge;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -199,6 +200,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     // Keep temporarily in order to verify after verifying proxy's authData
     private AuthData originalAuthDataCopy;
     private boolean pendingAuthChallengeResponse = false;
+    private ScheduledFuture<?> authRefreshTask;
 
     // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write 
spikes on the broker.
@@ -332,6 +334,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
 
         cnxsPerThread.get().remove(this);
+        if (authRefreshTask != null) {
+            authRefreshTask.cancel(false);
+        }
 
         // Connection is gone, close the producers immediately
         producers.forEach((__, producerFuture) -> {
@@ -665,15 +670,18 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion) 
{
-        if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
-            if (!service.getAuthorizationService()
+        if (service.isAuthenticationEnabled()) {
+            if (service.isAuthorizationEnabled()) {
+                if (!service.getAuthorizationService()
                     .isValidOriginalPrincipal(authRole, originalPrincipal, 
remoteAddress)) {
-                state = State.Failed;
-                service.getPulsarStats().recordConnectionCreateFail();
-                final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthorizationError, "Invalid roles.");
-                NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
-                return;
+                    state = State.Failed;
+                    service.getPulsarStats().recordConnectionCreateFail();
+                    final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthorizationError, "Invalid roles.");
+                    NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
+                    return;
+                }
             }
+            maybeScheduleAuthenticationCredentialsRefresh();
         }
         writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, enableSubscriptionPatternEvaluation));
         state = State.Connected;
@@ -772,7 +780,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     log.debug("[{}] Authentication in progress client by 
method {}.", remoteAddress, authMethod);
                 }
             }
-        } catch (Exception e) {
+        } catch (Exception | AssertionError e) {
             authenticationFailed(e);
         }
     }
@@ -799,7 +807,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                         remoteAddress, originalPrincipal);
                             }
                             completeConnect(clientProtoVersion, clientVersion);
-                        } catch (Exception e) {
+                        } catch (Exception | AssertionError e) {
                             authenticationFailed(e);
                         }
                     }
@@ -821,61 +829,75 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
     }
 
-    public void refreshAuthenticationCredentials() {
-        AuthenticationState authState = this.originalAuthState != null ? 
originalAuthState : this.authState;
-
+    /**
+     * Method to initialize the {@link #authRefreshTask} task.
+     */
+    private void maybeScheduleAuthenticationCredentialsRefresh() {
+        assert ctx.executor().inEventLoop();
+        assert authRefreshTask == null;
         if (authState == null) {
             // Authentication is disabled or there's no local state to refresh
             return;
-        } else if (getState() != State.Connected || !isActive) {
-            // Connection is either still being established or already closed.
+        }
+        authRefreshTask = 
ctx.executor().scheduleAtFixedRate(this::refreshAuthenticationCredentials,
+                
service.getPulsar().getConfig().getAuthenticationRefreshCheckSeconds(),
+                
service.getPulsar().getConfig().getAuthenticationRefreshCheckSeconds(),
+                TimeUnit.SECONDS);
+    }
+
+    private void refreshAuthenticationCredentials() {
+        assert ctx.executor().inEventLoop();
+        AuthenticationState authState = this.originalAuthState != null ? 
originalAuthState : this.authState;
+        if (getState() == State.Failed) {
+            // Happens when an exception is thrown that causes this connection 
to close.
             return;
         } else if (!authState.isExpired()) {
             // Credentials are still valid. Nothing to do at this point
             return;
         } else if (originalPrincipal != null && originalAuthState == null) {
+            // This case is only checked when the authState is expired because 
we've reached a point where
+            // authentication needs to be refreshed, but the protocol does not 
support it unless the proxy forwards
+            // the originalAuthData.
             log.info(
                     "[{}] Cannot revalidate user credential when using proxy 
and"
                             + " not forwarding the credentials. Closing 
connection",
                     remoteAddress);
+            ctx.close();
             return;
         }
 
-        ctx.executor().execute(SafeRun.safeRun(() -> {
-            log.info("[{}] Refreshing authentication credentials for 
originalPrincipal {} and authRole {}",
-                    remoteAddress, originalPrincipal, this.authRole);
-
-            if (!supportsAuthenticationRefresh()) {
-                log.warn("[{}] Closing connection because client doesn't 
support auth credentials refresh",
-                        remoteAddress);
-                ctx.close();
-                return;
-            }
+        if (!supportsAuthenticationRefresh()) {
+            log.warn("[{}] Closing connection because client doesn't support 
auth credentials refresh",
+                    remoteAddress);
+            ctx.close();
+            return;
+        }
 
-            if (pendingAuthChallengeResponse) {
-                log.warn("[{}] Closing connection after timeout on refreshing 
auth credentials",
-                        remoteAddress);
-                ctx.close();
-                return;
-            }
+        if (pendingAuthChallengeResponse) {
+            log.warn("[{}] Closing connection after timeout on refreshing auth 
credentials",
+                    remoteAddress);
+            ctx.close();
+            return;
+        }
 
-            try {
-                AuthData brokerData = authState.refreshAuthentication();
+        log.info("[{}] Refreshing authentication credentials for 
originalPrincipal {} and authRole {}",
+                remoteAddress, originalPrincipal, this.authRole);
+        try {
+            AuthData brokerData = authState.refreshAuthentication();
 
-                writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
-                        getRemoteEndpointProtocolVersion()));
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Sent auth challenge to client to refresh 
credentials with method: {}.",
+            writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
+                    getRemoteEndpointProtocolVersion()));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Sent auth challenge to client to refresh 
credentials with method: {}.",
                         remoteAddress, authMethod);
-                }
+            }
 
-                pendingAuthChallengeResponse = true;
+            pendingAuthChallengeResponse = true;
 
-            } catch (AuthenticationException e) {
-                log.warn("[{}] Failed to refresh authentication: {}", 
remoteAddress, e);
-                ctx.close();
-            }
-        }));
+        } catch (AuthenticationException e) {
+            log.warn("[{}] Failed to refresh authentication: {}", 
remoteAddress, e);
+            ctx.close();
+        }
     }
 
     private static final byte[] emptyArray = new byte[0];
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 7506053b28d..63f80911ae6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1948,8 +1948,6 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
             ch.pipeline().remove("handler");
             PersistentTopicE2ETest.ServerCnxForTest serverCnxForTest = new 
PersistentTopicE2ETest.ServerCnxForTest(this.pulsar, 
this.opts.getListenerName());
             ch.pipeline().addAfter("flowController", "testHandler", 
serverCnxForTest);
-            //override parent
-            connections.put(ch.remoteAddress(), serverCnxForTest);
         }
 
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index afa1cd4e252..7e7c2110533 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -487,10 +487,13 @@ public class ServerCnxTest {
         
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
         
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
         svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticationRefreshCheckSeconds(30);
 
         resetChannel();
         assertTrue(channel.isActive());
         assertEquals(serverCnx.getState(), State.Start);
+        // Don't want the keep alive task affecting which messages are handled
+        serverCnx.cancelKeepAliveTask();
 
         ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.client", "");
         channel.writeInbound(clientCommand);
@@ -503,7 +506,7 @@ public class ServerCnxTest {
 
         // Trigger the ServerCnx to check if authentication is expired (it is 
because of our special implementation)
         // and then force channel to run the task
-        serverCnx.refreshAuthenticationCredentials();
+        channel.advanceTimeBy(30, TimeUnit.SECONDS);
         channel.runPendingTasks();
         Object responseAuthChallenge1 = getResponse();
         assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge);
@@ -513,7 +516,7 @@ public class ServerCnxTest {
         channel.writeInbound(authResponse1);
 
         // Trigger the ServerCnx to check if authentication is expired again
-        serverCnx.refreshAuthenticationCredentials();
+        channel.advanceTimeBy(30, TimeUnit.SECONDS);
         assertTrue(channel.hasPendingTasks(), "This test assumes there are 
pending tasks to run.");
         channel.runPendingTasks();
         Object responseAuthChallenge2 = getResponse();
@@ -539,10 +542,13 @@ public class ServerCnxTest {
         svcConfig.setAuthenticationEnabled(true);
         svcConfig.setAuthenticateOriginalAuthData(true);
         svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+        svcConfig.setAuthenticationRefreshCheckSeconds(30);
 
         resetChannel();
         assertTrue(channel.isActive());
         assertEquals(serverCnx.getState(), State.Start);
+        // Don't want the keep alive task affecting which messages are handled
+        serverCnx.cancelKeepAliveTask();
 
         ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1, null,
                 null, "pass.client", "pass.client", authMethodName);
@@ -559,7 +565,7 @@ public class ServerCnxTest {
 
         // Trigger the ServerCnx to check if authentication is expired (it is 
because of our special implementation)
         // and then force channel to run the task
-        serverCnx.refreshAuthenticationCredentials();
+        channel.advanceTimeBy(30, TimeUnit.SECONDS);
         assertTrue(channel.hasPendingTasks(), "This test assumes there are 
pending tasks to run.");
         channel.runPendingTasks();
         Object responseAuthChallenge1 = getResponse();
@@ -570,7 +576,7 @@ public class ServerCnxTest {
         channel.writeInbound(authResponse1);
 
         // Trigger the ServerCnx to check if authentication is expired again
-        serverCnx.refreshAuthenticationCredentials();
+        channel.advanceTimeBy(30, TimeUnit.SECONDS);
         channel.runPendingTasks();
         Object responseAuthChallenge2 = getResponse();
         assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge);

Reply via email to