This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new f7a0667db4 QPID-8664: [Broker-J] Guava removal (4/10)
f7a0667db4 is described below
commit f7a0667db47d4b40e024414b2464e33b475f7d61
Author: dakirily <[email protected]>
AuthorDate: Thu Mar 27 09:47:07 2025 +0100
QPID-8664: [Broker-J] Guava removal (4/10)
This commit replaces guava ListenableFuture with CompletableFuture in
broker TaskExecutor and adds Caffeine cache dependency
---
.../dependency-verification/DEPENDENCIES_REFERENCE | 12 +
broker-core/pom.xml | 5 +
.../server/configuration/updater/TaskExecutor.java | 5 +-
.../configuration/updater/TaskExecutorImpl.java | 303 ++++++++-------------
.../server/model/preferences/UserPreferences.java | 17 +-
.../model/preferences/UserPreferencesImpl.java | 16 +-
.../updater/CurrentThreadTaskExecutor.java | 8 +-
.../updater/TaskExecutorWithPrincipalTest.java | 163 +++++++++++
.../testmodels/singleton/PreferencesTest.java | 6 +-
.../servlet/rest/RestUserPreferenceHandler.java | 7 +-
pom.xml | 6 +
11 files changed, 334 insertions(+), 214 deletions(-)
diff --git
a/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
b/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
index d6113bbd21..0d9dffeae3 100644
---
a/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
+++
b/apache-qpid-broker-j/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE
@@ -26,6 +26,9 @@ Apache Qpid Broker-J Bundles
From: 'an unknown organization'
+ - Caffeine cache (https://github.com/ben-manes/caffeine)
com.github.ben-manes.caffeine:caffeine:jar:3.1.8
+ License: Apache License, Version 2.0
(https://www.apache.org/licenses/LICENSE-2.0.txt)
+
- Guava InternalFutureFailureAccess and InternalFutures
(https://github.com/google/guava/failureaccess)
com.google.guava:failureaccess:bundle:1.0.2
License: The Apache Software License, Version 2.0
(http://www.apache.org/licenses/LICENSE-2.0.txt)
@@ -59,6 +62,9 @@ From: 'an unknown organization'
- Bouncy Castle ASN.1 Extension and Utility APIs
(https://www.bouncycastle.org/download/bouncy-castle-java/)
org.bouncycastle:bcutil-jdk18on:jar:1.80
License: Bouncy Castle Licence (https://www.bouncycastle.org/licence.html)
+ - Checker Qual (https://checkerframework.org/)
org.checkerframework:checker-qual:jar:3.37.0
+ License: The MIT License (http://opensource.org/licenses/MIT)
+
- dgrid (https://www.webjars.org) org.webjars.bower:dgrid:jar:1.3.3
License: BSD 3-Clause (https://spdx.org/licenses/BSD 3-Clause#licenseText)
@@ -90,6 +96,12 @@ From: 'FasterXML' (http://fasterxml.com/)
License: The Apache Software License, Version 2.0
(https://www.apache.org/licenses/LICENSE-2.0.txt)
+From: 'Google LLC' (http://www.google.com)
+
+ - error-prone annotations (https://errorprone.info/error_prone_annotations)
com.google.errorprone:error_prone_annotations:jar:2.21.1
+ License: Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+
From: 'Mort Bay Consulting' (http://www.mortbay.com)
- Jetty :: Jakarta Servlet API and Schemas for JPMS and OSGi
(https://eclipse.org/jetty/jetty-jakarta-servlet-api)
org.eclipse.jetty.toolchain:jetty-jakarta-servlet-api:jar:5.0.2
diff --git a/broker-core/pom.xml b/broker-core/pom.xml
index 81bc3c3432..6cfde71084 100644
--- a/broker-core/pom.xml
+++ b/broker-core/pom.xml
@@ -60,6 +60,11 @@
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index c38c0334c4..37340a0197 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -22,10 +22,9 @@ package org.apache.qpid.server.configuration.updater;
import java.security.Principal;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import com.google.common.util.concurrent.ListenableFuture;
-
public interface TaskExecutor extends Executor
{
interface Factory
@@ -49,7 +48,7 @@ public interface TaskExecutor extends Executor
<T, E extends Exception> T run(Task<T, E> task) throws
CancellationException, E;
- <T, E extends Exception> ListenableFuture<T> submit(Task<T, E> task)
throws CancellationException, E;
+ <T, E extends Exception> CompletableFuture<T> submit(Task<T, E> task)
throws CancellationException, E;
Factory getFactory();
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index f2b83f201a..fb23e9d3aa 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -18,21 +18,23 @@
* under the License.
*
*/
+
package org.apache.qpid.server.configuration.updater;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,10 +42,8 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,20 +54,25 @@ public class TaskExecutorImpl implements TaskExecutor
{
private static final String TASK_EXECUTION_THREAD_NAME = "Broker-Config";
private static final Logger LOGGER =
LoggerFactory.getLogger(TaskExecutorImpl.class);
- private final PrincipalAccessor _principalAccessor;
+ private static final Cache<Set<Principal>, Subject> SUBJECT_CACHE =
Caffeine.newBuilder()
+ .expireAfterAccess(Duration.ofMinutes(5))
+ .maximumSize(1000)
+ .build();
- private volatile Thread _taskThread;
+ private final PrincipalAccessor _principalAccessor;
private final AtomicBoolean _running = new AtomicBoolean();
- private volatile ListeningExecutorService _executor;
private final ImmediateIfSameThreadExecutor _wrappedExecutor = new
ImmediateIfSameThreadExecutor();
private final String _name;
+ private volatile Thread _taskThread;
+ private volatile ExecutorService _executor;
+
public TaskExecutorImpl()
{
this(TASK_EXECUTION_THREAD_NAME, null);
}
- public TaskExecutorImpl(final String name, PrincipalAccessor
principalAccessor)
+ public TaskExecutorImpl(final String name, final PrincipalAccessor
principalAccessor)
{
_name = name;
_principalAccessor = principalAccessor;
@@ -85,22 +90,14 @@ public class TaskExecutorImpl implements TaskExecutor
if (_running.compareAndSet(false, true))
{
LOGGER.debug("Starting task executor {}", _name);
- final java.util.concurrent.BlockingQueue<Runnable> workQueue = new
LinkedBlockingQueue<>();
- final java.util.concurrent.ThreadFactory factory = r ->
+ final BlockingQueue<Runnable> workQueue = new
LinkedBlockingQueue<>();
+ final ThreadFactory factory =
+
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable ->
{
- _taskThread =
- new TaskThread(
- r,
- _name,
- TaskExecutorImpl.this);
+ _taskThread = new TaskThread(runnable, _name,
TaskExecutorImpl.this);
return _taskThread;
- };
- _executor = MoreExecutors.listeningDecorator(new
ThreadPoolExecutor(1,
-
1,
-
0L,
-
TimeUnit.MILLISECONDS,
-
workQueue,
-
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(factory)));
+ });
+ _executor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, workQueue, factory);
LOGGER.debug("Task executor is started");
}
}
@@ -108,25 +105,19 @@ public class TaskExecutorImpl implements TaskExecutor
@Override
public void stopImmediately()
{
- if (_running.compareAndSet(true,false))
+ if (_running.compareAndSet(true, false))
{
- ExecutorService executor = _executor;
+ final ExecutorService executor = _executor;
if (executor != null)
{
LOGGER.debug("Stopping task executor {} immediately", _name);
- List<Runnable> cancelledTasks = executor.shutdownNow();
- for (Runnable runnable : cancelledTasks)
- {
- if (runnable instanceof RunnableFuture<?>)
- {
- ((RunnableFuture<?>) runnable).cancel(true);
- }
- }
-
+ final List<Runnable> cancelledTasks = executor.shutdownNow();
+ cancelledTasks.forEach(runnable -> ((RunnableWrapper)
runnable).cancel());
_executor = null;
_taskThread = null;
- LOGGER.debug("Task executor was stopped immediately. Number of
unfinished tasks: " + cancelledTasks.size());
+ LOGGER.debug("Task executor was stopped immediately. Number of
unfinished tasks: {}", cancelledTasks.size());
}
+ SUBJECT_CACHE.invalidateAll();
}
}
@@ -135,7 +126,7 @@ public class TaskExecutorImpl implements TaskExecutor
{
if (_running.compareAndSet(true, false))
{
- ExecutorService executor = _executor;
+ final ExecutorService executor = _executor;
if (executor != null)
{
LOGGER.debug("Stopping task executor {}", _name);
@@ -144,16 +135,17 @@ public class TaskExecutorImpl implements TaskExecutor
_taskThread = null;
LOGGER.debug("Task executor is stopped");
}
+ SUBJECT_CACHE.invalidateAll();
}
}
@Override
- public <T, E extends Exception> ListenableFuture<T> submit(Task<T, E>
userTask) throws E
+ public <T, E extends Exception> CompletableFuture<T> submit(final Task<T,
E> userTask) throws E
{
- return submitWrappedTask(new TaskLoggingWrapper<>(userTask));
+ return submitWrappedTask(userTask);
}
- private <T, E extends Exception> ListenableFuture<T>
submitWrappedTask(TaskLoggingWrapper<T, E> task) throws E
+ private <T, E extends Exception> CompletableFuture<T>
submitWrappedTask(final Task<T, E> task) throws E
{
checkState(task);
if (isTaskExecutorThread())
@@ -162,8 +154,8 @@ public class TaskExecutorImpl implements TaskExecutor
{
LOGGER.trace("Running {} immediately", task);
}
- T result = task.execute();
- return Futures.immediateFuture(result);
+ final T result = task.execute();
+ return CompletableFuture.completedFuture(result);
}
else
{
@@ -172,22 +164,26 @@ public class TaskExecutorImpl implements TaskExecutor
LOGGER.trace("Submitting {} to executor {}", task, _name);
}
- return _executor.submit(new CallableWrapper<>(task));
+ final CompletableFuture<T> future = new CompletableFuture<>();
+ _executor.execute(new RunnableWrapper<>(task, future));
+ return future;
}
}
@Override
public void execute(final Runnable command)
{
- LOGGER.trace("Running runnable {} through executor interface",
command);
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("Running runnable {} through executor interface",
command);
+ }
_wrappedExecutor.execute(command);
}
@Override
- public <T, E extends Exception> T run(Task<T, E> userTask) throws
CancellationException, E
+ public <T, E extends Exception> T run(final Task<T, E> userTask) throws
CancellationException, E
{
- TaskLoggingWrapper<T, E> task = new TaskLoggingWrapper<>(userTask);
- return FutureHelper.<T, E>await(submitWrappedTask(task));
+ return FutureHelper.<T, E>await(submitWrappedTask(userTask));
}
private boolean isTaskExecutorThread()
@@ -195,7 +191,7 @@ public class TaskExecutorImpl implements TaskExecutor
return Thread.currentThread() == _taskThread;
}
- private void checkState(Task<?, ?> task)
+ private void checkState(final Task<?, ?> task)
{
if (!_running.get())
{
@@ -204,213 +200,154 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- private Subject getContextSubject()
+ private Subject getCachedSubject()
{
- Subject contextSubject =
Subject.getSubject(AccessController.getContext());
- if (contextSubject != null && _principalAccessor != null)
+ final Subject contextSubject =
Subject.getSubject(AccessController.getContext());
+
+ if (contextSubject == null)
{
- Principal additionalPrincipal = _principalAccessor.getPrincipal();
- Set<Principal> principals = contextSubject.getPrincipals();
- if (additionalPrincipal != null &&
!principals.contains(additionalPrincipal))
- {
- Set<Principal> extendedPrincipals = new HashSet<>(principals);
- extendedPrincipals.add(additionalPrincipal);
- contextSubject = new Subject(contextSubject.isReadOnly(),
- extendedPrincipals,
- contextSubject.getPublicCredentials(),
- contextSubject.getPrivateCredentials());
- }
+ return null;
}
- return contextSubject;
- }
- private static class TaskLoggingWrapper<T, E extends Exception> implements
Task<T, E>
- {
- private final Task<T,E> _task;
-
- public TaskLoggingWrapper(Task<T, E> task)
+ if (_principalAccessor == null ||
contextSubject.getPrincipals().contains(_principalAccessor.getPrincipal()))
{
- _task = task;
+ return contextSubject;
}
- @Override
- public T execute() throws E
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Performing {}", this);
- }
+ final Set<Principal> principals = new
HashSet<>(contextSubject.getPrincipals());
+ principals.add(_principalAccessor.getPrincipal());
- boolean success = false;
- T result = null;
- try
- {
- result = _task.execute();
- success = true;
- }
- finally
- {
- if (LOGGER.isDebugEnabled())
- {
- if (success)
- {
- LOGGER.debug("{} performed successfully with result:
{}", this, result);
- } else
- {
- LOGGER.debug("{} failed to perform successfully",
this);
- }
- }
- }
- return result;
- }
+ return SUBJECT_CACHE.get(principals, key ->
createSubjectWithPrincipals(key, contextSubject));
+ }
- @Override
- public String getObject()
- {
- return _task.getObject();
- }
+ Subject createSubjectWithPrincipals(final Set<Principal> principals,
Subject subject)
+ {
+ return new Subject(subject.isReadOnly(), principals,
subject.getPublicCredentials(), subject.getPrivateCredentials());
+ }
- @Override
- public String getAction()
- {
- return _task.getAction();
- }
+ private class ImmediateWrapper<T, E extends Exception> extends
RunnableWrapper<T, E>
+ {
+ final Runnable _runnable;
+ final Subject _subject;
- @Override
- public String getArguments()
+ boolean _cancelled;
+
+ ImmediateWrapper(final Runnable runnable, final Subject subject)
{
- return _task.getArguments();
+ super(null, null);
+ _runnable = runnable;
+ _subject = subject;
}
@Override
- public String toString()
+ public void run()
{
- String arguments = getArguments();
- if (arguments == null)
+ if (_cancelled)
{
- return String.format("Task['%s' on '%s']", getAction(),
getObject());
+ return;
}
- return String.format("Task['%s' on '%s' with arguments '%s']",
getAction(), getObject(), arguments);
+ Subject.doAs(_subject, (PrivilegedAction<Void>) () ->
+ {
+ _runnable.run();
+ return null;
+ });
+ }
+
+ void cancel()
+ {
+ _cancelled = true;
}
}
- private class CallableWrapper<T, E extends Exception> implements
Callable<T>
+ private class RunnableWrapper<T, E extends Exception> implements Runnable
{
private final Task<T, E> _userTask;
+ private final CompletableFuture<T> _future;
private final Subject _contextSubject;
private final AtomicReference<Throwable> _throwable;
- public CallableWrapper(Task<T, E> userWork)
+ public RunnableWrapper(final Task<T, E> userWork, final
CompletableFuture<T> future)
{
_userTask = userWork;
- _contextSubject = getContextSubject();
+ _future = future;
+ _contextSubject = getCachedSubject();
_throwable = new AtomicReference<>();
}
- @Override
- public T call() throws Exception
+ public void run()
{
- T result = Subject.doAs(_contextSubject, (PrivilegedAction<T>) ()
->
+ if (_future.isCancelled() || _future.isCompletedExceptionally())
+ {
+ return;
+ }
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Performing {}", _userTask);
+ }
+ final T result = Subject.doAs(_contextSubject,
(PrivilegedAction<T>) () ->
{
try
{
return _userTask.execute();
}
- catch(Throwable t)
+ catch (Throwable t)
{
_throwable.set(t);
+ _future.obtrudeException(t);
}
return null;
});
- Throwable t = _throwable.get();
- if (t != null)
+
+ final Throwable throwable = _throwable.get();
+ if (throwable != null)
{
- if (t instanceof RuntimeException)
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} failed to perform successfully",
_userTask);
+ }
+ if (throwable instanceof RuntimeException)
{
- throw (RuntimeException) t;
+ throw (RuntimeException) throwable;
}
- else if (t instanceof Error)
+ else if (throwable instanceof Error)
{
- throw (Error) t;
+ throw (Error) throwable;
}
else
{
- throw (Exception) t;
+ throw new RuntimeException(throwable);
}
}
- return result;
- }
- }
- private static class ImmediateFuture<T> implements Future<T>
- {
- private final T _result;
-
- public ImmediateFuture(T result)
- {
- super();
- _result = result;
+ LOGGER.debug("{} performed successfully with result: {}",
_userTask, result);
+ _future.complete(result);
}
- @Override
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- return false;
- }
-
- @Override
- public boolean isCancelled()
+ void cancel()
{
- return false;
- }
-
- @Override
- public boolean isDone()
- {
- return true;
- }
-
- @Override
- public T get()
- {
- return _result;
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- {
- return get();
+ _future.completeExceptionally(new CancellationException("Task was
cancelled"));
}
}
private class ImmediateIfSameThreadExecutor implements Executor
{
-
@Override
public void execute(final Runnable command)
{
- if(isTaskExecutorThread()
- || (_executor == null && (Thread.currentThread() instanceof
TaskThread
- && ((TaskThread)Thread.currentThread()).getTaskExecutor()
== TaskExecutorImpl.this)))
+ if (isTaskExecutorThread() || (_executor == null &&
(Thread.currentThread() instanceof TaskThread &&
+ ((TaskThread)Thread.currentThread()).getTaskExecutor() ==
TaskExecutorImpl.this)))
{
command.run();
}
else
{
- final Subject subject = getContextSubject();
- _executor.execute(() -> Subject.doAs(subject,
(PrivilegedAction<Void>) () ->
- {
- command.run();
- return null;
- }));
+ _executor.execute(new ImmediateWrapper<>(command,
getCachedSubject()));
}
-
}
}
private static class TaskThread extends Thread
{
-
private final TaskExecutorImpl _taskExecutor;
public TaskThread(final Runnable r, final String name, final
TaskExecutorImpl taskExecutor)
@@ -437,7 +374,7 @@ public class TaskExecutorImpl implements TaskExecutor
}
@Override
- public TaskExecutor newInstance(final String name,
PrincipalAccessor principalAccessor)
+ public TaskExecutor newInstance(final String name, final
PrincipalAccessor principalAccessor)
{
return new TaskExecutorImpl(name, principalAccessor);
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
index 5fc991d4c3..c79c5ea7b3 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferences.java
@@ -23,22 +23,21 @@ package org.apache.qpid.server.model.preferences;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
-
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
public interface UserPreferences
{
- ListenableFuture<Void> updateOrAppend(Collection<Preference> preferences);
+ CompletableFuture<Void> updateOrAppend(Collection<Preference> preferences);
- ListenableFuture<Set<Preference>> getPreferences();
+ CompletableFuture<Set<Preference>> getPreferences();
- ListenableFuture<Void> replace(Collection<Preference> preferences);
+ CompletableFuture<Void> replace(Collection<Preference> preferences);
- ListenableFuture<Void> replaceByType(String type, Collection<Preference>
preferences);
+ CompletableFuture<Void> replaceByType(String type, Collection<Preference>
preferences);
- ListenableFuture<Void> replaceByTypeAndName(String type, String name,
Preference preference);
+ CompletableFuture<Void> replaceByTypeAndName(String type, String name,
Preference preference);
- ListenableFuture<Set<Preference>> getVisiblePreferences();
+ CompletableFuture<Set<Preference>> getVisiblePreferences();
- ListenableFuture<Void> delete(String type, String name, UUID id);
+ CompletableFuture<Void> delete(String type, String name, UUID id);
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
index f24f8606c1..6f3e387e1f 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/model/preferences/UserPreferencesImpl.java
@@ -41,11 +41,11 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import javax.security.auth.Subject;
import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -107,7 +107,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Void> updateOrAppend(final Collection<Preference>
preferences)
+ public CompletableFuture<Void> updateOrAppend(final Collection<Preference>
preferences)
{
return _executor.submit(new PreferencesTask<>("updateOrAppend",
preferences)
{
@@ -145,7 +145,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Set<Preference>> getPreferences()
+ public CompletableFuture<Set<Preference>> getPreferences()
{
return _executor.submit(new PreferencesTask<>("getPreferences")
{
@@ -173,7 +173,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Void> replace(final Collection<Preference>
preferences)
+ public CompletableFuture<Void> replace(final Collection<Preference>
preferences)
{
return _executor.submit(new PreferencesTask<>("replace", preferences)
{
@@ -187,7 +187,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Void> replaceByType(final String type, final
Collection<Preference> preferences)
+ public CompletableFuture<Void> replaceByType(final String type, final
Collection<Preference> preferences)
{
return _executor.submit(new PreferencesTask<>("replaceByType", type,
preferences)
{
@@ -237,7 +237,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Void> replaceByTypeAndName(final String type,
+ public CompletableFuture<Void> replaceByTypeAndName(final String type,
final String name,
final Preference
newPreference)
{
@@ -295,7 +295,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Void> delete(final String type, final String name,
final UUID id)
+ public CompletableFuture<Void> delete(final String type, final String
name, final UUID id)
{
return _executor.submit(new PreferencesTask<>("delete", type, name, id)
{
@@ -345,7 +345,7 @@ public class UserPreferencesImpl implements UserPreferences
}
@Override
- public ListenableFuture<Set<Preference>> getVisiblePreferences()
+ public CompletableFuture<Set<Preference>> getVisiblePreferences()
{
return _executor.submit(new PreferencesTask<>("getVisiblePreferences")
{
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
index 937f10cd1a..a7dd6d94a8 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
@@ -21,11 +21,9 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
public class CurrentThreadTaskExecutor implements TaskExecutor
{
private final AtomicReference<Thread> _thread = new AtomicReference<>();
@@ -77,11 +75,11 @@ public class CurrentThreadTaskExecutor implements
TaskExecutor
}
@Override
- public <T, E extends Exception> ListenableFuture<T> submit(Task<T, E>
task) throws CancellationException, E
+ public <T, E extends Exception> CompletableFuture<T> submit(Task<T, E>
task) throws CancellationException, E
{
checkThread();
final T result = task.execute();
- return Futures.immediateFuture(result);
+ return CompletableFuture.completedFuture(result);
}
public static TaskExecutor newStartedInstance()
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
new file mode 100644
index 0000000000..036942fc5b
--- /dev/null
+++
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.configuration.updater;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import org.junit.jupiter.api.AfterEach;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.BrokerPrincipal;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+class TaskExecutorWithPrincipalTest extends UnitTestBase
+{
+ final VirtualHost _virtualHost = mock(VirtualHost.class);
+ final VirtualHostPrincipal _virtualHostPrincipal = new
VirtualHostPrincipal(_virtualHost);
+
+ private TaskExecutorImpl _executor;
+
+ @BeforeEach
+ void setUp()
+ {
+ _executor = new TaskExecutorImpl("Broker-Config", () ->
_virtualHostPrincipal);
+ _executor.start();
+ }
+
+ @AfterEach
+ void tearDown()
+ {
+ _executor.stopImmediately();
+ }
+
+ @Test
+ void emptySubject()
+ {
+ final Subject subject = new Subject();
+ final AtomicReference<Subject> taskSubject = new AtomicReference<>();
+
+ runTask(subject, taskSubject, _executor);
+
+ assertEquals(Set.of(_virtualHostPrincipal),
taskSubject.get().getPrincipals(), "Unexpected security manager principal");
+ }
+
+ @Test
+ void subjectWithBrokerPrincipal()
+ {
+ final Broker<?> broker = mock(Broker.class);
+ final BrokerPrincipal brokerPrincipal = new BrokerPrincipal(broker);
+ final Subject subject = new Subject(true, Set.of(brokerPrincipal),
Set.of(), Set.of());
+ final AtomicReference<Subject> taskSubject = new AtomicReference<>();
+
+ runTask(subject, taskSubject, _executor);
+
+ assertEquals(2, taskSubject.get().getPrincipals().size(), "Unexpected
principals count");
+
assertTrue(taskSubject.get().getPrincipals().contains(brokerPrincipal),
"Expected to have broker principal");
+
assertTrue(taskSubject.get().getPrincipals().contains(_virtualHostPrincipal),
"Expected to have virtualhost principal");
+ }
+
+ @Test
+ void cachedSubjects()
+ {
+ final TaskExecutorImpl spy1 = spy(_executor);
+ final TaskExecutorImpl spy2 = spy(_executor);
+ final AuthenticationProvider authProvider =
mock(AuthenticationProvider.class);
+ when(authProvider.getName()).thenReturn("authProvider");
+ when(authProvider.getType()).thenReturn("mock");
+ final UsernamePrincipal usernamePrincipal1 = new
UsernamePrincipal("user1", authProvider);
+ final UsernamePrincipal usernamePrincipal2 = new
UsernamePrincipal("user2", authProvider);
+ final Subject subject1 = new Subject(true, Set.of(usernamePrincipal1),
Set.of(), Set.of());
+ final Subject subject2 = new Subject(true, Set.of(usernamePrincipal2),
Set.of(), Set.of());
+ final AtomicReference<Subject> taskSubject = new AtomicReference<>();
+
+ // subject1 should be created
+ runTask(subject1, taskSubject, spy1);
+ verify(spy1, times(1)).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
+
+ // repeated call should retrieve subject1 from cache
+ runTask(subject1, taskSubject, spy2);
+ verify(spy2, never()).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
+
+ // subject2 should be created
+ runTask(subject2, taskSubject, spy1);
+ verify(spy1, times(2)).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
+
+ // repeated call should retrieve subject2 from cache
+ runTask(subject2, taskSubject, spy2);
+ verify(spy2, never()).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
+ }
+
+ private void runTask(final Subject subject, final AtomicReference<Subject>
taskSubject, final TaskExecutorImpl executor)
+ {
+ Subject.doAs(subject, (PrivilegedAction<Object>) () ->
+ {
+ executor.run(new Task<Void, RuntimeException>()
+ {
+ @Override
+ public Void execute()
+ {
+
taskSubject.set(Subject.getSubject(AccessController.getContext()));
+ return null;
+ }
+
+ @Override
+ public String getObject()
+ {
+ return getTestName();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "test";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return null;
+ }
+ });
+ return null;
+ });
+ }
+}
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
b/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
index fb5733186e..b8d3111664 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/PreferencesTest.java
@@ -36,13 +36,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -319,7 +318,8 @@ public class PreferencesTest extends UnitTestBase
Subject user2Subject =
TestPrincipalUtils.createTestSubject(TEST_USERNAME2, testGroupName);
Subject.doAs(user2Subject, (PrivilegedAction<Void>) () ->
{
- final ListenableFuture<Set<Preference>> visiblePreferencesFuture =
_testObject.getUserPreferences().getVisiblePreferences();
+ final CompletableFuture<Set<Preference>>
+ visiblePreferencesFuture =
_testObject.getUserPreferences().getVisiblePreferences();
final Set<Preference> visiblePreferences =
awaitPreferenceFuture(visiblePreferencesFuture);
assertEquals(1, (long) visiblePreferences
diff --git
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
index 0295a5e726..6278b5d135 100644
---
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
+++
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestUserPreferenceHandler.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner;
@@ -191,7 +192,7 @@ public class RestUserPreferenceHandler
final Map<String, List<String>> queryParameters =
requestInfo.getQueryParameters();
UUID id = getIdFromQueryParameters(queryParameters);
- final ListenableFuture<Set<Preference>> allPreferencesFuture;
+ final CompletableFuture<Set<Preference>> allPreferencesFuture;
if (requestInfo.getType() == RequestType.USER_PREFERENCES)
{
allPreferencesFuture = userPreferences.getPreferences();
@@ -294,9 +295,9 @@ public class RestUserPreferenceHandler
}
}
- private <T> T awaitFuture(final ListenableFuture<T> future)
+ private <T> T awaitFuture(final CompletableFuture<T> future)
{
- return FutureHelper.<T, RuntimeException>await(future,
_preferenceOperationTimeout, TimeUnit.MILLISECONDS);
+ return FutureHelper.await(future, _preferenceOperationTimeout,
TimeUnit.MILLISECONDS);
}
private List<Preference> validateAndConvert(final ConfiguredObject<?>
target, final Map<String, Object> providedObjectMap)
diff --git a/pom.xml b/pom.xml
index 954ae32077..7cec99f427 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,7 @@
<logback-version>1.5.16</logback-version>
<logback-db-version>1.2.11.1</logback-db-version>
<guava-version>33.4.0-jre</guava-version>
+ <caffeine-version>3.1.8</caffeine-version>
<fasterxml-jackson-version>2.18.2</fasterxml-jackson-version>
<fasterxml-jackson-databind-version>2.18.2</fasterxml-jackson-databind-version>
<slf4j-version>2.0.16</slf4j-version>
@@ -577,6 +578,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine-version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]