QPID-7830: [Broker-J] [AMQP 0-8..0-91] Move caching responsubility to virtualhost
(cherry picked from commit ddc519a551061c682877784068e755677e2c6313) Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d79537d2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d79537d2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d79537d2 Branch: refs/heads/7.0.x Commit: d79537d2951dc6eabc7fabe6fa0760cb0f07c11d Parents: 995d535 Author: Keith Wall <kw...@apache.org> Authored: Fri Apr 27 12:54:01 2018 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Fri May 11 16:10:04 2018 +0100 ---------------------------------------------------------------------- .../server/protocol/v0_8/AMQShortString.java | 51 +++++++-- .../qpid/server/security/QpidPrincipal.java | 38 +++++++ .../security/auth/AuthenticatedPrincipal.java | 24 +---- .../server/virtualhost/AbstractVirtualHost.java | 19 ++++ .../qpid/server/virtualhost/CacheProvider.java | 28 +++++ .../qpid/server/virtualhost/NullCache.java | 107 +++++++++++++++++++ .../virtualhost/QueueManagingVirtualHost.java | 15 ++- .../virtualhost/VirtualHostPrincipal.java | 5 + .../protocol/v0_8/AMQShortStringTest.java | 33 ++++-- 9 files changed, 274 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java index 0ee98c3..6d17f8e 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java +++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQShortString.java @@ -23,15 +23,20 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.security.AccessController; import java.util.Arrays; -import java.util.concurrent.TimeUnit; + +import javax.security.auth.Subject; import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.security.QpidPrincipal; +import org.apache.qpid.server.virtualhost.CacheProvider; +import org.apache.qpid.server.virtualhost.NullCache; +import org.apache.qpid.server.virtualhost.VirtualHostPrincipal; /** * A short string is a representation of an AMQ Short String @@ -47,14 +52,11 @@ public final class AMQShortString implements Comparable<AMQShortString> public static final int MAX_LENGTH = 255; private static final Logger LOGGER = LoggerFactory.getLogger(AMQShortString.class); + private static final NullCache<ByteBuffer, AMQShortString> NULL_CACHE = new NullCache<>(); // Unfortunately CacheBuilder does not yet support keyEquivalence, so we have to wrap the keys in ByteBuffers // rather than using the byte arrays as keys. - private static ThreadLocal<Cache<ByteBuffer, AMQShortString>> CACHE = - ThreadLocal.withInitial(() -> CacheBuilder.newBuilder() - .maximumSize(100) - .expireAfterAccess(300, TimeUnit.SECONDS) - .build()); + private static ThreadLocal<Cache<ByteBuffer, AMQShortString>> CACHE = new ThreadLocal<>(); private final byte[] _data; private int _hashCode; @@ -100,7 +102,7 @@ public final class AMQShortString implements Comparable<AMQShortString> byte[] data = new byte[length]; buffer.get(data); - final AMQShortString cached = CACHE.get().getIfPresent(ByteBuffer.wrap(data)); + final AMQShortString cached = getShortStringCache().getIfPresent(ByteBuffer.wrap(data)); return cached != null ? cached : new AMQShortString(data); } } @@ -112,7 +114,7 @@ public final class AMQShortString implements Comparable<AMQShortString> throw new NullPointerException("Cannot create AMQShortString with null data[]"); } - final AMQShortString cached = CACHE.get().getIfPresent(ByteBuffer.wrap(data)); + final AMQShortString cached = getShortStringCache().getIfPresent(ByteBuffer.wrap(data)); return cached != null ? cached : new AMQShortString(data); } @@ -120,7 +122,7 @@ public final class AMQShortString implements Comparable<AMQShortString> { final byte[] data = EncodingUtils.asUTF8Bytes(string); - final AMQShortString cached = CACHE.get().getIfPresent(ByteBuffer.wrap(data)); + final AMQShortString cached = getShortStringCache().getIfPresent(ByteBuffer.wrap(data)); if (cached != null) { return cached; @@ -301,7 +303,7 @@ public final class AMQShortString implements Comparable<AMQShortString> public void intern() { - CACHE.get().put(ByteBuffer.wrap(_data), this); + getShortStringCache().put(ByteBuffer.wrap(_data), this); } public static AMQShortString validValueOf(Object obj) @@ -361,4 +363,31 @@ public final class AMQShortString implements Comparable<AMQShortString> return amqShortString == null ? null : amqShortString.toString(); } + private static Cache<ByteBuffer, AMQShortString> getShortStringCache() + { + Cache<ByteBuffer, AMQShortString> cache = CACHE.get(); + if (cache == null) + { + cache = NULL_CACHE; + Subject subject = Subject.getSubject(AccessController.getContext()); + if (subject != null) + { + VirtualHostPrincipal principal = QpidPrincipal.getSingletonPrincipal(subject, true, VirtualHostPrincipal.class); + + if (principal != null && principal.getVirtualHost() instanceof CacheProvider) + { + CacheProvider cacheProvider = (CacheProvider) principal.getVirtualHost(); + cache = cacheProvider.getNamedCache("amqShortStringCache"); + } + } + CACHE.set(cache); + } + return cache; + } + + /** Unit testing only */ + static void setCache(final Cache<ByteBuffer, AMQShortString> cache) + { + CACHE.set(cache); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/security/QpidPrincipal.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/QpidPrincipal.java b/broker-core/src/main/java/org/apache/qpid/server/security/QpidPrincipal.java index 46c717f..382fea8 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/security/QpidPrincipal.java +++ b/broker-core/src/main/java/org/apache/qpid/server/security/QpidPrincipal.java @@ -21,11 +21,49 @@ package org.apache.qpid.server.security; import java.io.Serializable; import java.security.Principal; +import java.util.Set; + +import javax.security.auth.Subject; import org.apache.qpid.server.model.ConfiguredObject; public interface QpidPrincipal extends Principal, Serializable { + static <P extends Principal> P getSingletonPrincipal(final Subject authSubject, + final boolean isPrincipalOptional, + final Class<P> principalClazz) + { + if (authSubject == null) + { + throw new IllegalArgumentException("No authenticated subject."); + } + + final Set<P> principals = authSubject.getPrincipals(principalClazz); + int numberOfAuthenticatedPrincipals = principals.size(); + + if(numberOfAuthenticatedPrincipals == 0 && isPrincipalOptional) + { + return null; + } + else + { + if (numberOfAuthenticatedPrincipals != 1) + { + throw new IllegalArgumentException( + String.format( + "Can't find single %s in the authenticated subject. There were %d " + + "%s principals out of a total number of principals of: %s", + principalClazz.getSimpleName(), + principalClazz.getSimpleName(), + numberOfAuthenticatedPrincipals, + authSubject.getPrincipals())); + } + return principals.iterator().next(); + } + } + ConfiguredObject<?> getOrigin(); + + } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/security/auth/AuthenticatedPrincipal.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/auth/AuthenticatedPrincipal.java b/broker-core/src/main/java/org/apache/qpid/server/security/auth/AuthenticatedPrincipal.java index 3e4d688..3ead31c 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/security/auth/AuthenticatedPrincipal.java +++ b/broker-core/src/main/java/org/apache/qpid/server/security/auth/AuthenticatedPrincipal.java @@ -126,29 +126,7 @@ public final class AuthenticatedPrincipal implements QpidPrincipal private static AuthenticatedPrincipal getAuthenticatedPrincipalFromSubject(final Subject authSubject, boolean isPrincipalOptional) { - if (authSubject == null) - { - throw new IllegalArgumentException("No authenticated subject."); - } - - final Set<AuthenticatedPrincipal> principals = authSubject.getPrincipals(AuthenticatedPrincipal.class); - int numberOfAuthenticatedPrincipals = principals.size(); - - if(numberOfAuthenticatedPrincipals == 0 && isPrincipalOptional) - { - return null; - } - else - { - if (numberOfAuthenticatedPrincipals != 1) - { - throw new IllegalArgumentException( - "Can't find single AuthenticatedPrincipal in authenticated subject. There were " - + numberOfAuthenticatedPrincipals - + " authenticated principals out of a total number of principals of: " + authSubject.getPrincipals()); - } - return principals.iterator().next(); - } + return QpidPrincipal.getSingletonPrincipal(authSubject, isPrincipalOptional, AuthenticatedPrincipal.class); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 7fab645..f097644 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -53,6 +53,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; @@ -65,6 +66,8 @@ import java.util.regex.PatternSyntaxException; import javax.security.auth.Subject; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; @@ -156,6 +159,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false); private volatile TaskExecutor _preferenceTaskExecutor; private volatile boolean _deleteRequested; + private final ConcurrentMap<String, Cache> _caches = new ConcurrentHashMap<>(); private enum BlockingType { STORE, FILESYSTEM }; @@ -3010,6 +3014,21 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte })); } + @Override + public <K, V> Cache<K, V> getNamedCache(final String cacheName) + { + final String maxSizeContextVarName = String.format(NAMED_CACHE_MAXIMUM_SIZE_FORMAT, cacheName); + final String expirationContextVarName = String.format(NAMED_CACHE_EXPIRATION_FORMAT, cacheName); + Set<String> contextKeys = getContextKeys(false); + int maxSize = contextKeys.contains(maxSizeContextVarName) ? getContextValue(Integer.class, maxSizeContextVarName) : getContextValue(Integer.class, NAMED_CACHE_MAXIMUM_SIZE); + long expiration = contextKeys.contains(expirationContextVarName) ? getContextValue(Long.class, expirationContextVarName) : getContextValue(Long.class, NAMED_CACHE_EXPIRATION); + + return _caches.computeIfAbsent(cacheName, (k) -> CacheBuilder.<K, V>newBuilder() + .maximumSize(maxSize) + .expireAfterAccess(expiration, TimeUnit.MILLISECONDS) + .build()); + } + private boolean hasDifferentBindings(final Exchange<?> exchange, final Queue queue, final Map<String, Map<String,Object>> bindings) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/virtualhost/CacheProvider.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/CacheProvider.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/CacheProvider.java new file mode 100644 index 0000000..64a6233 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/CacheProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.virtualhost; + +import com.google.common.cache.Cache; + +public interface CacheProvider +{ + <K, V> Cache<K, V> getNamedCache(String cacheName); +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NullCache.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NullCache.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NullCache.java new file mode 100644 index 0000000..3d2ef6f --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NullCache.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.virtualhost; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheStats; +import com.google.common.collect.ImmutableMap; + +public class NullCache<K, V> implements Cache<K, V> +{ + @Override + public V getIfPresent(final Object key) + { + return null; + } + + @Override + public V get(final K key, final Callable<? extends V> loader) throws ExecutionException + { + try + { + return loader.call(); + } + catch (Exception e) + { + throw new ExecutionException(e); + } + } + + @Override + public ImmutableMap<K, V> getAllPresent(final Iterable<?> keys) + { + return ImmutableMap.of(); + } + + @Override + public void put(final K key, final V value) + { + } + + @Override + public void putAll(final Map<? extends K, ? extends V> m) + { + } + + @Override + public void invalidate(final Object key) + { + } + + @Override + public void invalidateAll(final Iterable<?> keys) + { + } + + @Override + public void invalidateAll() + { + } + + @Override + public long size() + { + return 0; + } + + @Override + public CacheStats stats() + { + throw new UnsupportedOperationException(); + } + + @Override + public ConcurrentMap<K, V> asMap() + { + return new ConcurrentHashMap<>(); + } + + @Override + public void cleanUp() + { + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java index b30373a..9388304 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java @@ -57,7 +57,8 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>> EventListener, StatisticsGatherer, UserPreferencesCreator, - EventLoggerProvider + EventLoggerProvider, + CacheProvider { String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod"; String STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "storeTransactionIdleTimeoutClose"; @@ -163,6 +164,18 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>> @ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS) long DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = Math.max(DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE/8, 1); + String NAMED_CACHE_MAXIMUM_SIZE = "virtualhost.namedCache.maximumSize"; + @SuppressWarnings("unused") + @ManagedContextDefault(name = NAMED_CACHE_MAXIMUM_SIZE, description = "Maximum number of entries within the named cached") + int DEFAULT_NAMED_CACHE_SIZE = 100; + String NAMED_CACHE_MAXIMUM_SIZE_FORMAT = "virtualhost.namedCache.%s.maximumSize"; + + String NAMED_CACHE_EXPIRATION = "virtualhost.namedCache.expiration"; + @SuppressWarnings("unused") + @ManagedContextDefault(name = NAMED_CACHE_EXPIRATION, description = "Expiration time (in millis) applied to cached values within the named cache") + long DEFAULT_NAMED_CACHE_EXPIRATION = 300 * 1000; + String NAMED_CACHE_EXPIRATION_FORMAT = "virtualhost.namedCache.%s.expiration"; + @ManagedAttribute( defaultValue = "${" + QueueManagingVirtualHost.VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS + "}") int getNumberOfSelectors(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPrincipal.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPrincipal.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPrincipal.java index b3d7374..14fd8cc 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPrincipal.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPrincipal.java @@ -44,6 +44,11 @@ public class VirtualHostPrincipal implements Principal, Serializable return _name; } + public VirtualHost<?> getVirtualHost() + { + return _virtualHost; + } + @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d79537d2/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java index 8ff7c96..7d29b5b 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQShortStringTest.java @@ -20,10 +20,12 @@ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.test.utils.QpidTestCase; - import java.nio.charset.StandardCharsets; +import com.google.common.cache.CacheBuilder; + +import org.apache.qpid.test.utils.QpidTestCase; + public class AMQShortStringTest extends QpidTestCase { @@ -132,15 +134,24 @@ public class AMQShortStringTest extends QpidTestCase public void testInterning() { - AMQShortString str1 = AMQShortString.createAMQShortString("hello"); - str1.intern(); - AMQShortString str2 = AMQShortString.createAMQShortString("hello"); - AMQShortString str3 = AMQShortString.createAMQShortString("hello".getBytes(StandardCharsets.UTF_8)); - - assertEquals(str1, str2); - assertEquals(str1, str3); - assertSame(str1, str2); - assertSame(str1, str3); + AMQShortString.setCache(CacheBuilder.newBuilder().maximumSize(1).build()); + + try + { + AMQShortString str1 = AMQShortString.createAMQShortString("hello"); + str1.intern(); + AMQShortString str2 = AMQShortString.createAMQShortString("hello"); + AMQShortString str3 = AMQShortString.createAMQShortString("hello".getBytes(StandardCharsets.UTF_8)); + + assertEquals(str1, str2); + assertEquals(str1, str3); + assertSame(str1, str2); + assertSame(str1, str3); + } + finally + { + AMQShortString.setCache(null); + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org