This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new f7a0667db4 QPID-8664: [Broker-J] Guava removal (4/10)
f7a0667db4 is described below

commit f7a0667db47d4b40e024414b2464e33b475f7d61
Author: dakirily <[email protected]>
AuthorDate: Thu Mar 27 09:47:07 2025 +0100

    QPID-8664: [Broker-J] Guava removal (4/10)
    
    This commit replaces guava ListenableFuture with CompletableFuture in 
broker TaskExecutor and adds Caffeine cache dependency
---
 .../dependency-verification/DEPENDENCIES_REFERENCE |  12 +
 broker-core/pom.xml                                |   5 +
 .../server/configuration/updater/TaskExecutor.java |   5 +-
 .../configuration/updater/TaskExecutorImpl.java    | 303 ++++++++-------------
 .../server/model/preferences/UserPreferences.java  |  17 +-
 .../model/preferences/UserPreferencesImpl.java     |  16 +-
 .../updater/CurrentThreadTaskExecutor.java         |   8 +-
 .../updater/TaskExecutorWithPrincipalTest.java     | 163 +++++++++++
 .../testmodels/singleton/PreferencesTest.java      |   6 +-
 .../servlet/rest/RestUserPreferenceHandler.java    |   7 +-
 pom.xml                                            |   6 +
 11 files changed, 334 insertions(+), 214 deletions(-)

diff --git 
a/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
 
b/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
index d6113bbd21..0d9dffeae3 100644
--- 
a/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
+++ 
b/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
@@ -26,6 +26,9 @@ Apache Qpid Broker-J Bundles
 
 From: 'an unknown organization'
 
+  - Caffeine cache (https://github.com/ben-manes/caffeine) 
com.github.ben-manes.caffeine:caffeine:jar:3.1.8
+    License: Apache License, Version 2.0  
(https://www.apache.org/licenses/LICENSE-2.0.txt)
+
   - Guava InternalFutureFailureAccess and InternalFutures 
(https://github.com/google/guava/failureaccess) 
com.google.guava:failureaccess:bundle:1.0.2
     License: The Apache Software License, Version 2.0  
(http://www.apache.org/licenses/LICENSE-2.0.txt)
 
@@ -59,6 +62,9 @@ From: 'an unknown organization'
   - Bouncy Castle ASN.1 Extension and Utility APIs 
(https://www.bouncycastle.org/download/bouncy-castle-java/) 
org.bouncycastle:bcutil-jdk18on:jar:1.80
     License: Bouncy Castle Licence  (https://www.bouncycastle.org/licence.html)
 
+  - Checker Qual (https://checkerframework.org/) 
org.checkerframework:checker-qual:jar:3.37.0
+    License: The MIT License  (http://opensource.org/licenses/MIT)
+
   - dgrid (https://www.webjars.org) org.webjars.bower:dgrid:jar:1.3.3
     License: BSD 3-Clause  (https://spdx.org/licenses/BSD 3-Clause#licenseText)
 
@@ -90,6 +96,12 @@ From: 'FasterXML' (http://fasterxml.com/)
     License: The Apache Software License, Version 2.0  
(https://www.apache.org/licenses/LICENSE-2.0.txt)
 
 
+From: 'Google LLC' (http://www.google.com)
+
+  - error-prone annotations (https://errorprone.info/error_prone_annotations) 
com.google.errorprone:error_prone_annotations:jar:2.21.1
+    License: Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+
 From: 'Mort Bay Consulting' (http://www.mortbay.com)
 
   - Jetty :: Jakarta Servlet API and Schemas for JPMS and OSGi 
(https://eclipse.org/jetty/jetty-jakarta-servlet-api) 
org.eclipse.jetty.toolchain:jetty-jakarta-servlet-api:jar:5.0.2
diff --git a/broker-core/pom.xml b/broker-core/pom.xml
index 81bc3c3432..6cfde71084 100644
--- a/broker-core/pom.xml
+++ b/broker-core/pom.xml
@@ -60,6 +60,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk18on</artifactId>
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index c38c0334c4..37340a0197 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -22,10 +22,9 @@ package org.apache.qpid.server.configuration.updater;
 
 import java.security.Principal;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 public interface TaskExecutor extends Executor
 {
     interface Factory
@@ -49,7 +48,7 @@ public interface TaskExecutor extends Executor
 
     <T, E extends Exception> T run(Task<T, E> task) throws 
CancellationException, E;
 
-    <T, E extends Exception> ListenableFuture<T> submit(Task<T, E> task) 
throws CancellationException, E;
+    <T, E extends Exception> CompletableFuture<T> submit(Task<T, E> task) 
throws CancellationException, E;
 
     Factory getFactory();
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index f2b83f201a..fb23e9d3aa 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -18,21 +18,23 @@
  * under the License.
  *
  */
+
 package org.apache.qpid.server.configuration.updater;
 
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,10 +42,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,20 +54,25 @@ public class TaskExecutorImpl implements TaskExecutor
 {
     private static final String TASK_EXECUTION_THREAD_NAME = "Broker-Config";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskExecutorImpl.class);
-    private final PrincipalAccessor _principalAccessor;
+    private static final Cache<Set<Principal>, Subject> SUBJECT_CACHE = 
Caffeine.newBuilder()
+            .expireAfterAccess(Duration.ofMinutes(5))
+            .maximumSize(1000)
+            .build();
 
-    private volatile Thread _taskThread;
+    private final PrincipalAccessor _principalAccessor;
     private final AtomicBoolean _running = new AtomicBoolean();
-    private volatile ListeningExecutorService _executor;
     private final ImmediateIfSameThreadExecutor _wrappedExecutor = new 
ImmediateIfSameThreadExecutor();
     private final String _name;
 
+    private volatile Thread _taskThread;
+    private volatile ExecutorService _executor;
+
     public TaskExecutorImpl()
     {
         this(TASK_EXECUTION_THREAD_NAME, null);
     }
 
-    public TaskExecutorImpl(final String name, PrincipalAccessor 
principalAccessor)
+    public TaskExecutorImpl(final String name, final PrincipalAccessor 
principalAccessor)
     {
         _name = name;
         _principalAccessor = principalAccessor;
@@ -85,22 +90,14 @@ public class TaskExecutorImpl implements TaskExecutor
         if (_running.compareAndSet(false, true))
         {
             LOGGER.debug("Starting task executor {}", _name);
-            final java.util.concurrent.BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<>();
-            final java.util.concurrent.ThreadFactory factory = r ->
+            final BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<>();
+            final ThreadFactory factory =
+                    
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable ->
             {
-                _taskThread =
-                        new TaskThread(
-                                r,
-                                _name,
-                                TaskExecutorImpl.this);
+                _taskThread = new TaskThread(runnable, _name, 
TaskExecutorImpl.this);
                 return _taskThread;
-            };
-            _executor = MoreExecutors.listeningDecorator(new 
ThreadPoolExecutor(1,
-                                                                               
 1,
-                                                                               
 0L,
-                                                                               
 TimeUnit.MILLISECONDS,
-                                                                               
 workQueue,
-                                                                               
 QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(factory)));
+            });
+            _executor = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, workQueue, factory);
             LOGGER.debug("Task executor is started");
         }
     }
@@ -108,25 +105,19 @@ public class TaskExecutorImpl implements TaskExecutor
     @Override
     public void stopImmediately()
     {
-        if (_running.compareAndSet(true,false))
+        if (_running.compareAndSet(true, false))
         {
-            ExecutorService executor = _executor;
+            final ExecutorService executor = _executor;
             if (executor != null)
             {
                 LOGGER.debug("Stopping task executor {} immediately", _name);
-                List<Runnable> cancelledTasks = executor.shutdownNow();
-                for (Runnable runnable : cancelledTasks)
-                {
-                    if (runnable instanceof RunnableFuture<?>)
-                    {
-                        ((RunnableFuture<?>) runnable).cancel(true);
-                    }
-                }
-
+                final List<Runnable> cancelledTasks = executor.shutdownNow();
+                cancelledTasks.forEach(runnable -> ((RunnableWrapper) 
runnable).cancel());
                 _executor = null;
                 _taskThread = null;
-                LOGGER.debug("Task executor was stopped immediately. Number of 
unfinished tasks: " + cancelledTasks.size());
+                LOGGER.debug("Task executor was stopped immediately. Number of 
unfinished tasks: {}", cancelledTasks.size());
             }
+            SUBJECT_CACHE.invalidateAll();
         }
     }
 
@@ -135,7 +126,7 @@ public class TaskExecutorImpl implements TaskExecutor
     {
         if (_running.compareAndSet(true, false))
         {
-            ExecutorService executor = _executor;
+            final ExecutorService executor = _executor;
             if (executor != null)
             {
                 LOGGER.debug("Stopping task executor {}", _name);
@@ -144,16 +135,17 @@ public class TaskExecutorImpl implements TaskExecutor
                 _taskThread = null;
                 LOGGER.debug("Task executor is stopped");
             }
+            SUBJECT_CACHE.invalidateAll();
         }
     }
 
     @Override
-    public <T, E extends Exception> ListenableFuture<T> submit(Task<T, E> 
userTask) throws E
+    public <T, E extends Exception> CompletableFuture<T> submit(final Task<T, 
E> userTask) throws E
     {
-        return submitWrappedTask(new TaskLoggingWrapper<>(userTask));
+        return submitWrappedTask(userTask);
     }
 
-    private <T, E extends Exception> ListenableFuture<T> 
submitWrappedTask(TaskLoggingWrapper<T, E> task) throws E
+    private <T, E extends Exception> CompletableFuture<T> 
submitWrappedTask(final Task<T, E> task) throws E
     {
         checkState(task);
         if (isTaskExecutorThread())
@@ -162,8 +154,8 @@ public class TaskExecutorImpl implements TaskExecutor
             {
                 LOGGER.trace("Running {} immediately", task);
             }
-            T result = task.execute();
-            return Futures.immediateFuture(result);
+            final T result = task.execute();
+            return CompletableFuture.completedFuture(result);
         }
         else
         {
@@ -172,22 +164,26 @@ public class TaskExecutorImpl implements TaskExecutor
                 LOGGER.trace("Submitting {} to executor {}", task, _name);
             }
 
-            return _executor.submit(new CallableWrapper<>(task));
+            final CompletableFuture<T> future = new CompletableFuture<>();
+            _executor.execute(new RunnableWrapper<>(task, future));
+            return future;
         }
     }
 
     @Override
     public void execute(final Runnable command)
     {
-        LOGGER.trace("Running runnable {} through executor interface", 
command);
+        if (LOGGER.isTraceEnabled())
+        {
+            LOGGER.trace("Running runnable {} through executor interface", 
command);
+        }
         _wrappedExecutor.execute(command);
     }
 
     @Override
-    public <T, E extends Exception> T run(Task<T, E> userTask) throws 
CancellationException, E
+    public <T, E extends Exception> T run(final Task<T, E> userTask) throws 
CancellationException, E
     {
-        TaskLoggingWrapper<T, E> task = new TaskLoggingWrapper<>(userTask);
-        return FutureHelper.<T, E>await(submitWrappedTask(task));
+        return FutureHelper.<T, E>await(submitWrappedTask(userTask));
     }
 
     private boolean isTaskExecutorThread()
@@ -195,7 +191,7 @@ public class TaskExecutorImpl implements TaskExecutor
         return Thread.currentThread() == _taskThread;
     }
 
-    private void checkState(Task<?, ?> task)
+    private void checkState(final Task<?, ?> task)
     {
         if (!_running.get())
         {
@@ -204,213 +200,154 @@ public class TaskExecutorImpl implements TaskExecutor
         }
     }
 
-    private Subject getContextSubject()
+    private Subject getCachedSubject()
     {
-        Subject contextSubject = 
Subject.getSubject(AccessController.getContext());
-        if (contextSubject != null && _principalAccessor != null)
+        final Subject contextSubject = 
Subject.getSubject(AccessController.getContext());
+
+        if (contextSubject == null)
         {
-            Principal additionalPrincipal = _principalAccessor.getPrincipal();
-            Set<Principal> principals = contextSubject.getPrincipals();
-            if (additionalPrincipal != null && 
!principals.contains(additionalPrincipal))
-            {
-                Set<Principal> extendedPrincipals = new HashSet<>(principals);
-                extendedPrincipals.add(additionalPrincipal);
-                contextSubject = new Subject(contextSubject.isReadOnly(),
-                        extendedPrincipals,
-                        contextSubject.getPublicCredentials(),
-                        contextSubject.getPrivateCredentials());
-            }
+            return null;
         }
-        return contextSubject;
-    }
 
-    private static class TaskLoggingWrapper<T, E extends Exception> implements 
Task<T, E>
-    {
-        private final Task<T,E> _task;
-
-        public TaskLoggingWrapper(Task<T, E> task)
+        if (_principalAccessor == null || 
contextSubject.getPrincipals().contains(_principalAccessor.getPrincipal()))
         {
-            _task = task;
+            return contextSubject;
         }
 
-        @Override
-        public T execute() throws E
-        {
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("Performing {}", this);
-            }
+        final Set<Principal> principals = new 
HashSet<>(contextSubject.getPrincipals());
+        principals.add(_principalAccessor.getPrincipal());
 
-            boolean success = false;
-            T result = null;
-            try
-            {
-                result = _task.execute();
-                success = true;
-            }
-            finally
-            {
-                if (LOGGER.isDebugEnabled())
-                {
-                    if (success)
-                    {
-                        LOGGER.debug("{} performed successfully with result: 
{}", this, result);
-                    } else
-                    {
-                        LOGGER.debug("{} failed to perform successfully", 
this);
-                    }
-                }
-            }
-            return result;
-        }
+        return SUBJECT_CACHE.get(principals, key -> 
createSubjectWithPrincipals(key, contextSubject));
+    }
 
-        @Override
-        public String getObject()
-        {
-            return _task.getObject();
-        }
+    Subject createSubjectWithPrincipals(final Set<Principal> principals, 
Subject subject)
+    {
+        return new Subject(subject.isReadOnly(), principals, 
subject.getPublicCredentials(), subject.getPrivateCredentials());
+    }
 
-        @Override
-        public String getAction()
-        {
-            return _task.getAction();
-        }
+    private class ImmediateWrapper<T, E extends Exception> extends 
RunnableWrapper<T, E>
+    {
+        final Runnable _runnable;
+        final Subject _subject;
 
-        @Override
-        public String getArguments()
+        boolean _cancelled;
+
+        ImmediateWrapper(final Runnable runnable, final Subject subject)
         {
-            return _task.getArguments();
+            super(null, null);
+            _runnable = runnable;
+            _subject = subject;
         }
 
         @Override
-        public String toString()
+        public void run()
         {
-            String arguments =  getArguments();
-            if (arguments == null)
+            if (_cancelled)
             {
-                return String.format("Task['%s' on '%s']", getAction(), 
getObject());
+                return;
             }
-            return String.format("Task['%s' on '%s' with arguments '%s']", 
getAction(), getObject(), arguments);
+            Subject.doAs(_subject, (PrivilegedAction<Void>) () ->
+            {
+                _runnable.run();
+                return null;
+            });
+        }
+
+        void cancel()
+        {
+            _cancelled = true;
         }
     }
 
-    private class CallableWrapper<T, E extends Exception> implements 
Callable<T>
+    private class RunnableWrapper<T, E extends Exception> implements Runnable
     {
         private final Task<T, E> _userTask;
+        private final CompletableFuture<T> _future;
         private final Subject _contextSubject;
         private final AtomicReference<Throwable> _throwable;
 
-        public CallableWrapper(Task<T, E> userWork)
+        public RunnableWrapper(final Task<T, E> userWork, final 
CompletableFuture<T> future)
         {
             _userTask = userWork;
-            _contextSubject = getContextSubject();
+            _future = future;
+            _contextSubject = getCachedSubject();
             _throwable = new AtomicReference<>();
         }
 
-        @Override
-        public T call() throws Exception
+        public void run()
         {
-            T result =  Subject.doAs(_contextSubject, (PrivilegedAction<T>) () 
->
+            if (_future.isCancelled() || _future.isCompletedExceptionally())
+            {
+                return;
+            }
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Performing {}", _userTask);
+            }
+            final T result = Subject.doAs(_contextSubject, 
(PrivilegedAction<T>) () ->
             {
                 try
                 {
                     return _userTask.execute();
                 }
-                catch(Throwable t)
+                catch (Throwable t)
                 {
                     _throwable.set(t);
+                    _future.obtrudeException(t);
                 }
                 return null;
             });
-            Throwable t = _throwable.get();
-            if (t != null)
+
+            final Throwable throwable = _throwable.get();
+            if (throwable != null)
             {
-                if (t instanceof RuntimeException)
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("{} failed to perform successfully", 
_userTask);
+                }
+                if (throwable instanceof RuntimeException)
                 {
-                    throw (RuntimeException) t;
+                    throw (RuntimeException) throwable;
                 }
-                else if (t instanceof Error)
+                else if (throwable instanceof Error)
                 {
-                    throw (Error) t;
+                    throw (Error) throwable;
                 }
                 else
                 {
-                    throw (Exception) t;
+                    throw new RuntimeException(throwable);
                 }
             }
-            return result;
-        }
-    }
 
-    private static class ImmediateFuture<T> implements Future<T>
-    {
-        private final T _result;
-
-        public ImmediateFuture(T result)
-        {
-            super();
-            _result = result;
+            LOGGER.debug("{} performed successfully with result: {}", 
_userTask, result);
+            _future.complete(result);
         }
 
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning)
-        {
-            return false;
-        }
-
-        @Override
-        public boolean isCancelled()
+        void cancel()
         {
-            return false;
-        }
-
-        @Override
-        public boolean isDone()
-        {
-            return true;
-        }
-
-        @Override
-        public T get()
-        {
-            return _result;
-        }
-
-        @Override
-        public T get(long timeout, TimeUnit unit)
-        {
-            return get();
+            _future.completeExceptionally(new CancellationException("Task was 
cancelled"));
         }
     }
 
     private class ImmediateIfSameThreadExecutor implements Executor
     {
-
         @Override
         public void execute(final Runnable command)
         {
-            if(isTaskExecutorThread()
-               || (_executor == null && (Thread.currentThread() instanceof 
TaskThread
-                   && ((TaskThread)Thread.currentThread()).getTaskExecutor() 
== TaskExecutorImpl.this)))
+            if (isTaskExecutorThread() || (_executor == null && 
(Thread.currentThread() instanceof TaskThread &&
+                    ((TaskThread)Thread.currentThread()).getTaskExecutor() == 
TaskExecutorImpl.this)))
             {
                 command.run();
             }
             else
             {
-                final Subject subject = getContextSubject();
-                _executor.execute(() -> Subject.doAs(subject, 
(PrivilegedAction<Void>) () ->
-                {
-                    command.run();
-                    return null;
-                }));
+                _executor.execute(new ImmediateWrapper<>(command, 
getCachedSubject()));
             }
-
         }
     }
 
     private static class TaskThread extends Thread
     {
-
         private final TaskExecutorImpl _taskExecutor;
 
         public TaskThread(final Runnable r, final String name, final 
TaskExecutorImpl taskExecutor)
@@ -437,7 +374,7 @@ public class TaskExecutorImpl implements TaskExecutor
             }
 
             @Override
-            public TaskExecutor newInstance(final String name, 
PrincipalAccessor principalAccessor)
+            public TaskExecutor newInstance(final String name, final 
PrincipalAccessor principalAccessor)
             {
                 return new TaskExecutorImpl(name, principalAccessor);
             }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
 
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
index 5fc991d4c3..c79c5ea7b3 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
@@ -23,22 +23,21 @@ package org.apache.qpid.server.model.preferences;
 import java.util.Collection;
 import java.util.Set;
 import java.util.UUID;
-
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
 
 public interface UserPreferences
 {
-    ListenableFuture<Void> updateOrAppend(Collection<Preference> preferences);
+    CompletableFuture<Void> updateOrAppend(Collection<Preference> preferences);
 
-    ListenableFuture<Set<Preference>> getPreferences();
+    CompletableFuture<Set<Preference>> getPreferences();
 
-    ListenableFuture<Void> replace(Collection<Preference> preferences);
+    CompletableFuture<Void> replace(Collection<Preference> preferences);
 
-    ListenableFuture<Void> replaceByType(String type, Collection<Preference> 
preferences);
+    CompletableFuture<Void> replaceByType(String type, Collection<Preference> 
preferences);
 
-    ListenableFuture<Void> replaceByTypeAndName(String type, String name, 
Preference preference);
+    CompletableFuture<Void> replaceByTypeAndName(String type, String name, 
Preference preference);
 
-    ListenableFuture<Set<Preference>> getVisiblePreferences();
+    CompletableFuture<Set<Preference>> getVisiblePreferences();
 
-    ListenableFuture<Void> delete(String type, String name, UUID id);
+    CompletableFuture<Void> delete(String type, String name, UUID id);
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
 
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
index f24f8606c1..6f3e387e1f 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
@@ -41,11 +41,11 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 import javax.security.auth.Subject;
 
 import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -107,7 +107,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Void> updateOrAppend(final Collection<Preference> 
preferences)
+    public CompletableFuture<Void> updateOrAppend(final Collection<Preference> 
preferences)
     {
         return _executor.submit(new PreferencesTask<>("updateOrAppend", 
preferences)
         {
@@ -145,7 +145,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Set<Preference>> getPreferences()
+    public CompletableFuture<Set<Preference>> getPreferences()
     {
         return _executor.submit(new PreferencesTask<>("getPreferences")
         {
@@ -173,7 +173,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Void> replace(final Collection<Preference> 
preferences)
+    public CompletableFuture<Void> replace(final Collection<Preference> 
preferences)
     {
         return _executor.submit(new PreferencesTask<>("replace", preferences)
         {
@@ -187,7 +187,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Void> replaceByType(final String type, final 
Collection<Preference> preferences)
+    public CompletableFuture<Void> replaceByType(final String type, final 
Collection<Preference> preferences)
     {
         return _executor.submit(new PreferencesTask<>("replaceByType", type, 
preferences)
         {
@@ -237,7 +237,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Void> replaceByTypeAndName(final String type,
+    public CompletableFuture<Void> replaceByTypeAndName(final String type,
                                                        final String name,
                                                        final Preference 
newPreference)
     {
@@ -295,7 +295,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Void> delete(final String type, final String name, 
final UUID id)
+    public CompletableFuture<Void> delete(final String type, final String 
name, final UUID id)
     {
         return _executor.submit(new PreferencesTask<>("delete", type, name, id)
         {
@@ -345,7 +345,7 @@ public class UserPreferencesImpl implements UserPreferences
     }
 
     @Override
-    public ListenableFuture<Set<Preference>> getVisiblePreferences()
+    public CompletableFuture<Set<Preference>> getVisiblePreferences()
     {
         return _executor.submit(new PreferencesTask<>("getVisiblePreferences")
         {
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
 
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
index 937f10cd1a..a7dd6d94a8 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
@@ -21,11 +21,9 @@
 package org.apache.qpid.server.configuration.updater;
 
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
 public class CurrentThreadTaskExecutor implements TaskExecutor
 {
     private final AtomicReference<Thread> _thread = new AtomicReference<>();
@@ -77,11 +75,11 @@ public class CurrentThreadTaskExecutor implements 
TaskExecutor
     }
 
     @Override
-    public <T, E extends Exception> ListenableFuture<T> submit(Task<T, E> 
task) throws CancellationException, E
+    public <T, E extends Exception> CompletableFuture<T> submit(Task<T, E> 
task) throws CancellationException, E
     {
         checkThread();
         final T result = task.execute();
-        return Futures.immediateFuture(result);
+        return CompletableFuture.completedFuture(result);
     }
 
     public static TaskExecutor newStartedInstance()
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
new file mode 100644
index 0000000000..036942fc5b
--- /dev/null
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.configuration.updater;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import org.junit.jupiter.api.AfterEach;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.BrokerPrincipal;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+class TaskExecutorWithPrincipalTest extends UnitTestBase
+{
+    final VirtualHost _virtualHost = mock(VirtualHost.class);
+    final VirtualHostPrincipal _virtualHostPrincipal = new 
VirtualHostPrincipal(_virtualHost);
+
+    private TaskExecutorImpl _executor;
+
+    @BeforeEach
+    void setUp()
+    {
+        _executor = new TaskExecutorImpl("Broker-Config", () -> 
_virtualHostPrincipal);
+        _executor.start();
+    }
+
+    @AfterEach
+    void tearDown()
+    {
+        _executor.stopImmediately();
+    }
+
+    @Test
+    void emptySubject()
+    {
+        final Subject subject = new Subject();
+        final AtomicReference<Subject> taskSubject = new AtomicReference<>();
+
+        runTask(subject, taskSubject, _executor);
+
+        assertEquals(Set.of(_virtualHostPrincipal), 
taskSubject.get().getPrincipals(), "Unexpected security manager principal");
+    }
+
+    @Test
+    void subjectWithBrokerPrincipal()
+    {
+        final Broker<?> broker = mock(Broker.class);
+        final BrokerPrincipal brokerPrincipal = new BrokerPrincipal(broker);
+        final Subject subject = new Subject(true, Set.of(brokerPrincipal), 
Set.of(), Set.of());
+        final AtomicReference<Subject> taskSubject = new AtomicReference<>();
+
+        runTask(subject, taskSubject, _executor);
+
+        assertEquals(2, taskSubject.get().getPrincipals().size(), "Unexpected 
principals count");
+        
assertTrue(taskSubject.get().getPrincipals().contains(brokerPrincipal), 
"Expected to have broker principal");
+        
assertTrue(taskSubject.get().getPrincipals().contains(_virtualHostPrincipal), 
"Expected to have virtualhost principal");
+    }
+
+    @Test
+    void cachedSubjects()
+    {
+        final TaskExecutorImpl spy1 = spy(_executor);
+        final TaskExecutorImpl spy2 = spy(_executor);
+        final AuthenticationProvider authProvider = 
mock(AuthenticationProvider.class);
+        when(authProvider.getName()).thenReturn("authProvider");
+        when(authProvider.getType()).thenReturn("mock");
+        final UsernamePrincipal usernamePrincipal1 = new 
UsernamePrincipal("user1", authProvider);
+        final UsernamePrincipal usernamePrincipal2 = new 
UsernamePrincipal("user2", authProvider);
+        final Subject subject1 = new Subject(true, Set.of(usernamePrincipal1), 
Set.of(), Set.of());
+        final Subject subject2 = new Subject(true, Set.of(usernamePrincipal2), 
Set.of(), Set.of());
+        final AtomicReference<Subject> taskSubject = new AtomicReference<>();
+
+        // subject1 should be created
+        runTask(subject1, taskSubject, spy1);
+        verify(spy1, times(1)).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
+
+        // repeated call should retrieve subject1 from cache
+        runTask(subject1, taskSubject, spy2);
+        verify(spy2, never()).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
+
+        // subject2 should be created
+        runTask(subject2, taskSubject, spy1);
+        verify(spy1, times(2)).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
+
+        // repeated call should retrieve subject2 from cache
+        runTask(subject2, taskSubject, spy2);
+        verify(spy2, never()).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
+    }
+
+    private void runTask(final Subject subject, final AtomicReference<Subject> 
taskSubject, final TaskExecutorImpl executor)
+    {
+        Subject.doAs(subject, (PrivilegedAction<Object>) () ->
+        {
+            executor.run(new Task<Void, RuntimeException>()
+            {
+                @Override
+                public Void execute()
+                {
+                    
taskSubject.set(Subject.getSubject(AccessController.getContext()));
+                    return null;
+                }
+
+                @Override
+                public String getObject()
+                {
+                    return getTestName();
+                }
+
+                @Override
+                public String getAction()
+                {
+                    return "test";
+                }
+
+                @Override
+                public String getArguments()
+                {
+                    return null;
+                }
+            });
+            return null;
+        });
+    }
+}
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
index fb5733186e..b8d3111664 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
@@ -36,13 +36,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.security.auth.Subject;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -319,7 +318,8 @@ public class PreferencesTest extends UnitTestBase
         Subject user2Subject = 
TestPrincipalUtils.createTestSubject(TEST_USERNAME2, testGroupName);
         Subject.doAs(user2Subject, (PrivilegedAction<Void>) () ->
         {
-            final ListenableFuture<Set<Preference>> visiblePreferencesFuture = 
_testObject.getUserPreferences().getVisiblePreferences();
+            final CompletableFuture<Set<Preference>>
+                    visiblePreferencesFuture = 
_testObject.getUserPreferences().getVisiblePreferences();
             final Set<Preference> visiblePreferences = 
awaitPreferenceFuture(visiblePreferencesFuture);
 
             assertEquals(1, (long) visiblePreferences
diff --git 
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
 
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
index 0295a5e726..6278b5d135 100644
--- 
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
+++ 
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Joiner;
@@ -191,7 +192,7 @@ public class RestUserPreferenceHandler
         final Map<String, List<String>> queryParameters = 
requestInfo.getQueryParameters();
         UUID id = getIdFromQueryParameters(queryParameters);
 
-        final ListenableFuture<Set<Preference>> allPreferencesFuture;
+        final CompletableFuture<Set<Preference>> allPreferencesFuture;
         if (requestInfo.getType() == RequestType.USER_PREFERENCES)
         {
             allPreferencesFuture = userPreferences.getPreferences();
@@ -294,9 +295,9 @@ public class RestUserPreferenceHandler
         }
     }
 
-    private <T> T awaitFuture(final ListenableFuture<T> future)
+    private <T> T awaitFuture(final CompletableFuture<T> future)
     {
-        return FutureHelper.<T, RuntimeException>await(future, 
_preferenceOperationTimeout, TimeUnit.MILLISECONDS);
+        return FutureHelper.await(future, _preferenceOperationTimeout, 
TimeUnit.MILLISECONDS);
     }
 
     private List<Preference> validateAndConvert(final ConfiguredObject<?> 
target, final Map<String, Object> providedObjectMap)
diff --git a/pom.xml b/pom.xml
index 954ae32077..7cec99f427 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,7 @@
     <logback-version>1.5.16</logback-version>
     <logback-db-version>1.2.11.1</logback-db-version>
     <guava-version>33.4.0-jre</guava-version>
+    <caffeine-version>3.1.8</caffeine-version>
     <fasterxml-jackson-version>2.18.2</fasterxml-jackson-version>
     
<fasterxml-jackson-databind-version>2.18.2</fasterxml-jackson-databind-version>
     <slf4j-version>2.0.16</slf4j-version>
@@ -577,6 +578,11 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>com.github.ben-manes.caffeine</groupId>
+        <artifactId>caffeine</artifactId>
+        <version>${caffeine-version}</version>
+      </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to