This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new eae7b9c Actively update auth caches in the background
eae7b9c is described below
commit eae7b9c3ade386f28c5f0c7ee015b0d0445388ac
Author: Blake Eggleston <[email protected]>
AuthorDate: Wed Sep 15 10:52:46 2021 -0400
Actively update auth caches in the background
Patch by Blake Eggleston; reviewed by Sam Tunnicliffe, Jason Brown, and
Caleb Rackliffe for CASSANDRA-16957
Co-authored-by: Blake Eggleston <[email protected]>
Co-authored-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 33 +++++
src/java/org/apache/cassandra/auth/AuthCache.java | 157 +++++++++++++++++----
.../org/apache/cassandra/auth/AuthCacheMBean.java | 4 +
.../org/apache/cassandra/auth/CacheRefresher.java | 94 ++++++++++++
.../cassandra/auth/NetworkPermissionsCache.java | 2 +
.../cassandra/auth/PasswordAuthenticator.java | 39 ++++-
.../apache/cassandra/auth/PermissionsCache.java | 2 +
src/java/org/apache/cassandra/auth/RolesCache.java | 2 +
.../cassandra/auth/jmx/AuthorizationProxy.java | 2 +
src/java/org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 30 ++++
.../apache/cassandra/service/StorageService.java | 5 +
.../org/apache/cassandra/auth/AuthCacheTest.java | 2 +
.../apache/cassandra/auth/CacheRefresherTest.java | 88 ++++++++++++
15 files changed, 429 insertions(+), 35 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f436917..1cfb1fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Actively update auth cache in the background (CASSANDRA-16957)
* Add unix time conversion functions (CASSANDRA-17029)
* JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap
OOMs rather than only supporting direct only (CASSANDRA-17128)
* Forbid other Future implementations with checkstyle (CASSANDRA-17055)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 92a91c6..12221ca 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -170,6 +170,8 @@ network_authorizer: AllowAllNetworkAuthorizer
# after the period specified here, become eligible for (async) reload.
# Defaults to 2000, set to 0 to disable caching entirely.
# Will be disabled automatically for AllowAllAuthenticator.
+# For a long-running cache using roles_cache_active_update, consider
+# setting to something longer such as a daily validation: 86400000
roles_validity_in_ms: 2000
# Refresh interval for roles cache (if enabled).
@@ -177,13 +179,24 @@ roles_validity_in_ms: 2000
# access, an async reload is scheduled and the old value returned until it
# completes. If roles_validity_in_ms is non-zero, then this must be
# also.
+# This setting is also used to inform the interval of auto-updating if
+# using roles_cache_active_update.
# Defaults to the same value as roles_validity_in_ms.
+# For a long-running cache, consider setting this to 60000 (1 hour) etc.
# roles_update_interval_in_ms: 2000
+# If true, cache contents are actively updated by a background task at the
+# interval set by roles_update_interval_in_ms. If false, cache entries
+# become eligible for refresh after their update interval. Upon next access,
+# an async reload is scheduled and the old value returned until it completes.
+# roles_cache_active_update: false
+
# Validity period for permissions cache (fetching permissions can be an
# expensive operation depending on the authorizer, CassandraAuthorizer is
# one example). Defaults to 2000, set to 0 to disable.
# Will be disabled automatically for AllowAllAuthorizer.
+# For a long-running cache using permissions_cache_active_update, consider
+# setting to something longer such as a daily validation: 86400000
permissions_validity_in_ms: 2000
# Refresh interval for permissions cache (if enabled).
@@ -191,9 +204,18 @@ permissions_validity_in_ms: 2000
# access, an async reload is scheduled and the old value returned until it
# completes. If permissions_validity_in_ms is non-zero, then this must be
# also.
+# This setting is also used to inform the interval of auto-updating if
+# using permissions_cache_active_update.
# Defaults to the same value as permissions_validity_in_ms.
+# For a longer-running permissions cache, consider setting to update hourly
(60000)
# permissions_update_interval_in_ms: 2000
+# If true, cache contents are actively updated by a background task at the
+# interval set by permissions_update_interval_in_ms. If false, cache entries
+# become eligible for refresh after their update interval. Upon next access,
+# an async reload is scheduled and the old value returned until it completes.
+# permissions_cache_active_update: false
+
# Validity period for credentials cache. This cache is tightly coupled to
# the provided PasswordAuthenticator implementation of IAuthenticator. If
# another IAuthenticator implementation is configured, this cache will not
@@ -203,6 +225,8 @@ permissions_validity_in_ms: 2000
# underlying table, it may not bring a significant reduction in the
# latency of individual authentication attempts.
# Defaults to 2000, set to 0 to disable credentials caching.
+# For a long-running cache using credentials_cache_active_update, consider
+# setting to something longer such as a daily validation: 86400000
credentials_validity_in_ms: 2000
# Refresh interval for credentials cache (if enabled).
@@ -210,9 +234,18 @@ credentials_validity_in_ms: 2000
# access, an async reload is scheduled and the old value returned until it
# completes. If credentials_validity_in_ms is non-zero, then this must be
# also.
+# This setting is also used to inform the interval of auto-updating if
+# using credentials_cache_active_update.
# Defaults to the same value as credentials_validity_in_ms.
+# For a longer-running permissions cache, consider setting to update hourly
(60000)
# credentials_update_interval_in_ms: 2000
+# If true, cache contents are actively updated by a background task at the
+# interval set by credentials_update_interval_in_ms. If false (default), cache
entries
+# become eligible for refresh after their update interval. Upon next access,
+# an async reload is scheduled and the old value returned until it completes.
+# credentials_cache_active_update: false
+
# The partitioner is responsible for distributing groups of rows (by
# partition key) across nodes in the cluster. The partitioner can NOT be
# changed without reloading all data. If you are adding nodes or upgrading,
diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java
b/src/java/org/apache/cassandra/auth/AuthCache.java
index e69cd6c..a70afba 100644
--- a/src/java/org/apache/cassandra/auth/AuthCache.java
+++ b/src/java/org/apache/cassandra/auth/AuthCache.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.auth;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
@@ -32,7 +35,9 @@ import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Policy;
import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MBeanWrapper;
@@ -46,6 +51,8 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
public static final String MBEAN_NAME_BASE =
"org.apache.cassandra.auth:type=";
+ private volatile ScheduledFuture cacheRefresher = null;
+
// Keep a handle on created instances so their executors can be terminated
cleanly
private static final Set<Shutdownable> REGISTRY = new HashSet<>(4);
@@ -60,15 +67,23 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
protected volatile LoadingCache<K, V> cache;
private ExecutorPlus cacheRefreshExecutor;
- private String name;
- private IntConsumer setValidityDelegate;
- private IntSupplier getValidityDelegate;
- private IntConsumer setUpdateIntervalDelegate;
- private IntSupplier getUpdateIntervalDelegate;
- private IntConsumer setMaxEntriesDelegate;
- private IntSupplier getMaxEntriesDelegate;
- private Function<K, V> loadFunction;
- private BooleanSupplier enableCache;
+ private final String name;
+ private final IntConsumer setValidityDelegate;
+ private final IntSupplier getValidityDelegate;
+ private final IntConsumer setUpdateIntervalDelegate;
+ private final IntSupplier getUpdateIntervalDelegate;
+ private final IntConsumer setMaxEntriesDelegate;
+ private final IntSupplier getMaxEntriesDelegate;
+ private final Consumer<Boolean> setActiveUpdate;
+ private final BooleanSupplier getActiveUpdate;
+ private final Function<K, V> loadFunction;
+ private final BooleanSupplier enableCache;
+
+ // Determines whether the presence of a specific value should trigger the
invalidation of
+ // the supplied key. Used by CredentialsCache & CacheRefresher to identify
when the
+ // credentials for a role couldn't be loaded without throwing an exception
or serving stale
+ // values until the natural expiry time.
+ private final BiPredicate<K, V> invalidateCondition;
/**
* @param name Used for MBean
@@ -78,6 +93,8 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
* @param getUpdateIntervalDelegate Getter for update interval
* @param setMaxEntriesDelegate Used to set max # entries in cache. See
{@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
* @param getMaxEntriesDelegate Getter for max entries.
+ * @param setActiveUpdate Method to process config to actively update the
auth cache prior to configured cache expiration
+ * @param getActiveUpdate Getter for active update
* @param loadFunction Function to load the cache. Called on {@link
#get(Object)}
* @param cacheEnabledDelegate Used to determine if cache is enabled.
*/
@@ -88,9 +105,53 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
IntSupplier getUpdateIntervalDelegate,
IntConsumer setMaxEntriesDelegate,
IntSupplier getMaxEntriesDelegate,
+ Consumer<Boolean> setActiveUpdate,
+ BooleanSupplier getActiveUpdate,
Function<K, V> loadFunction,
BooleanSupplier cacheEnabledDelegate)
{
+ this(name,
+ setValidityDelegate,
+ getValidityDelegate,
+ setUpdateIntervalDelegate,
+ getUpdateIntervalDelegate,
+ setMaxEntriesDelegate,
+ getMaxEntriesDelegate,
+ setActiveUpdate,
+ getActiveUpdate,
+ loadFunction,
+ cacheEnabledDelegate,
+ (k, v) -> false);
+ }
+
+ /**
+ * @param name Used for MBean
+ * @param setValidityDelegate Used to set cache validity period. See
{@link Policy#expireAfterWrite()}
+ * @param getValidityDelegate Getter for validity period
+ * @param setUpdateIntervalDelegate Used to set cache update interval. See
{@link Policy#refreshAfterWrite()}
+ * @param getUpdateIntervalDelegate Getter for update interval
+ * @param setMaxEntriesDelegate Used to set max # entries in cache. See
{@link com.github.benmanes.caffeine.cache.Policy.Eviction#setMaximum(long)}
+ * @param getMaxEntriesDelegate Getter for max entries.
+ * @param setActiveUpdate Actively update the cache before expiry
+ * @param getActiveUpdate Getter for active update
+ * @param loadFunction Function to load the cache. Called on {@link
#get(Object)}
+ * @param cacheEnabledDelegate Used to determine if cache is enabled.
+ * @param invalidationCondition Used during active updates to determine if
a refreshed value indicates a missing
+ * entry in the underlying table. If
satisfied, the key will be invalidated.
+ */
+ protected AuthCache(String name,
+ IntConsumer setValidityDelegate,
+ IntSupplier getValidityDelegate,
+ IntConsumer setUpdateIntervalDelegate,
+ IntSupplier getUpdateIntervalDelegate,
+ IntConsumer setMaxEntriesDelegate,
+ IntSupplier getMaxEntriesDelegate,
+ Consumer<Boolean> setActiveUpdate,
+ BooleanSupplier getActiveUpdate,
+ Function<K, V> loadFunction,
+ BooleanSupplier cacheEnabledDelegate,
+ BiPredicate<K, V> invalidationCondition)
+ {
this.name = checkNotNull(name);
this.setValidityDelegate = checkNotNull(setValidityDelegate);
this.getValidityDelegate = checkNotNull(getValidityDelegate);
@@ -98,8 +159,11 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
this.getUpdateIntervalDelegate =
checkNotNull(getUpdateIntervalDelegate);
this.setMaxEntriesDelegate = checkNotNull(setMaxEntriesDelegate);
this.getMaxEntriesDelegate = checkNotNull(getMaxEntriesDelegate);
+ this.setActiveUpdate = checkNotNull(setActiveUpdate);
+ this.getActiveUpdate = checkNotNull(getActiveUpdate);
this.loadFunction = checkNotNull(loadFunction);
this.enableCache = checkNotNull(cacheEnabledDelegate);
+ this.invalidateCondition = checkNotNull(invalidationCondition);
init();
}
@@ -143,7 +207,7 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
/**
* Invalidate the entire cache.
*/
- public void invalidate()
+ public synchronized void invalidate()
{
cache = initCache(null);
}
@@ -162,7 +226,7 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
* Time in milliseconds that a value in the cache will expire after.
* @param validityPeriod in milliseconds
*/
- public void setValidity(int validityPeriod)
+ public synchronized void setValidity(int validityPeriod)
{
if
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
throw new UnsupportedOperationException("Remote configuration of
auth caches is disabled");
@@ -180,7 +244,7 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
* Time in milliseconds after which an entry in the cache should be
refreshed (it's load function called again)
* @param updateInterval in milliseconds
*/
- public void setUpdateInterval(int updateInterval)
+ public synchronized void setUpdateInterval(int updateInterval)
{
if
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
throw new UnsupportedOperationException("Remote configuration of
auth caches is disabled");
@@ -198,7 +262,7 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
* Set maximum number of entries in the cache.
* @param maxEntries
*/
- public void setMaxEntries(int maxEntries)
+ public synchronized void setMaxEntries(int maxEntries)
{
if
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
throw new UnsupportedOperationException("Remote configuration of
auth caches is disabled");
@@ -212,6 +276,20 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
return getMaxEntriesDelegate.getAsInt();
}
+ public boolean getActiveUpdate()
+ {
+ return getActiveUpdate.getAsBoolean();
+ }
+
+ public synchronized void setActiveUpdate(boolean update)
+ {
+ if
(Boolean.getBoolean("cassandra.disable_auth_caches_remote_configuration"))
+ throw new UnsupportedOperationException("Remote configuration of
auth caches is disabled");
+
+ setActiveUpdate.accept(update);
+ cache = initCache(cache);
+ }
+
/**
* (Re-)initialise the underlying cache. Will update validity, max
entries, and update interval if
* any have changed. The underlying {@link LoadingCache} will be initiated
based on the provided {@code loadFunction}.
@@ -227,26 +305,45 @@ public class AuthCache<K, V> implements AuthCacheMBean,
Shutdownable
if (getValidity() <= 0)
return null;
- logger.info("(Re)initializing {} (validity period/update interval/max
entries) ({}/{}/{})",
- name, getValidity(), getUpdateInterval(), getMaxEntries());
+ boolean activeUpdate = getActiveUpdate();
+ logger.info("(Re)initializing {} (validity period/update interval/max
entries/active update) ({}/{}/{}/{})",
+ name, getValidity(), getUpdateInterval(), getMaxEntries(),
activeUpdate);
+ LoadingCache<K, V> updatedCache;
+
+ if (existing == null)
+ {
+ updatedCache =
Caffeine.newBuilder().refreshAfterWrite(activeUpdate ? getValidity() :
getUpdateInterval(), TimeUnit.MILLISECONDS)
+ .expireAfterWrite(getValidity(),
TimeUnit.MILLISECONDS)
+ .maximumSize(getMaxEntries())
+ .executor(cacheRefreshExecutor)
+ .build(loadFunction::apply);
+ }
+ else
+ {
+ updatedCache = cache;
+ // Always set as mandatory
+ cache.policy().refreshAfterWrite().ifPresent(policy ->
+ policy.setExpiresAfter(activeUpdate ? getValidity() :
getUpdateInterval(), TimeUnit.MILLISECONDS));
+ cache.policy().expireAfterWrite().ifPresent(policy ->
policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS));
+ cache.policy().eviction().ifPresent(policy ->
policy.setMaximum(getMaxEntries()));
+ }
- if (existing == null) {
- return Caffeine.newBuilder()
- .refreshAfterWrite(getUpdateInterval(),
TimeUnit.MILLISECONDS)
- .expireAfterWrite(getValidity(),
TimeUnit.MILLISECONDS)
- .maximumSize(getMaxEntries())
- .executor(cacheRefreshExecutor)
- .build(loadFunction::apply);
+ if (cacheRefresher != null)
+ {
+ cacheRefresher.cancel(false); // permit the two refreshers to race
until the old one dies, should be harmless.
+ cacheRefresher = null;
}
- // Always set as mandatory
- cache.policy().refreshAfterWrite().ifPresent(policy ->
- policy.setExpiresAfter(getUpdateInterval(),
TimeUnit.MILLISECONDS));
- cache.policy().expireAfterWrite().ifPresent(policy ->
- policy.setExpiresAfter(getValidity(), TimeUnit.MILLISECONDS));
- cache.policy().eviction().ifPresent(policy ->
- policy.setMaximum(getMaxEntries()));
- return cache;
+ if (activeUpdate)
+ {
+ cacheRefresher =
ScheduledExecutors.optionalTasks.scheduleAtFixedRate(CacheRefresher.create(name,
+
updatedCache,
+
invalidateCondition),
+
getUpdateInterval(),
+
getUpdateInterval(),
+
TimeUnit.MILLISECONDS);
+ }
+ return updatedCache;
}
@Override
diff --git a/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
b/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
index 43fb88e..3f6247a 100644
--- a/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
+++ b/src/java/org/apache/cassandra/auth/AuthCacheMBean.java
@@ -33,4 +33,8 @@ public interface AuthCacheMBean
public void setMaxEntries(int maxEntries);
public int getMaxEntries();
+
+ public boolean getActiveUpdate();
+
+ public void setActiveUpdate(boolean update);
}
diff --git a/src/java/org/apache/cassandra/auth/CacheRefresher.java
b/src/java/org/apache/cassandra/auth/CacheRefresher.java
new file mode 100644
index 0000000..a199601
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/CacheRefresher.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.BooleanSupplier;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.service.StorageService;
+
+public class CacheRefresher<K, V> implements Runnable
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CacheRefresher.class);
+
+ private final String name;
+ private final LoadingCache<K, V> cache;
+ private final BiPredicate<K, V> invalidationCondition;
+ private final BooleanSupplier skipCondition;
+
+ private CacheRefresher(String name, LoadingCache<K, V> cache,
BiPredicate<K, V> invalidationCondition, BooleanSupplier skipCondition)
+ {
+ this.name = name;
+ this.cache = cache;
+ this.invalidationCondition = invalidationCondition;
+ this.skipCondition = skipCondition;
+ }
+
+ public void run()
+ {
+ if (skipCondition.getAsBoolean())
+ {
+ logger.debug("Skipping {} cache refresh", name);
+ return;
+ }
+
+ try
+ {
+ logger.debug("Refreshing {} cache", name);
+ Set<K> ks = cache.asMap().keySet();
+ for (K key : ks)
+ {
+ cache.refresh(key);
+ V value = cache.getIfPresent(key);
+ if (invalidationCondition.test(key, value))
+ {
+ logger.debug("Invalidating key");
+ cache.invalidate(key);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("Unexpected exception refreshing {} cache", name, e);
+ }
+ }
+
+ @VisibleForTesting
+ public static <K, V> CacheRefresher<K, V> create(String name,
+ LoadingCache<K, V> cache,
+ BiPredicate<K, V>
invalidationCondition,
+ BooleanSupplier
skipCondition)
+ {
+ logger.info("Creating CacheRefresher for {}", name);
+ return new CacheRefresher<>(name, cache, invalidationCondition,
skipCondition);
+ }
+
+ public static <K, V> CacheRefresher<K, V> create(String name,
LoadingCache<K, V> cache, BiPredicate<K, V> invalidationCondition)
+ {
+ // By default we skip cache refreshes if the node has been decommed
+ return create(name, cache, invalidationCondition,
StorageService.instance::isDecommissioned);
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
b/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
index 72817a9..b2e8707 100644
--- a/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
+++ b/src/java/org/apache/cassandra/auth/NetworkPermissionsCache.java
@@ -32,6 +32,8 @@ public class NetworkPermissionsCache extends
AuthCache<RoleResource, DCPermissio
DatabaseDescriptor::getRolesUpdateInterval,
DatabaseDescriptor::setRolesCacheMaxEntries,
DatabaseDescriptor::getRolesCacheMaxEntries,
+ DatabaseDescriptor::setRolesCacheActiveUpdate,
+ DatabaseDescriptor::getRolesCacheActiveUpdate,
authorizer::authorize,
() ->
DatabaseDescriptor.getAuthenticator().requireAuthentication());
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 098ed9f..ac1dbb9 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -60,6 +60,9 @@ public class PasswordAuthenticator implements IAuthenticator
{
private static final Logger logger =
LoggerFactory.getLogger(PasswordAuthenticator.class);
+ /** We intentionally use an empty string sentinel to allow object equality
comparison */
+ private static final String NO_SUCH_CREDENTIAL = "";
+
// name of the hash column.
private static final String SALTED_HASH = "salted_hash";
@@ -95,6 +98,27 @@ public class PasswordAuthenticator implements IAuthenticator
private AuthenticatedUser authenticate(String username, String password)
throws AuthenticationException
{
String hash = cache.get(username);
+
+ // intentional use of object equality
+ if (hash == NO_SUCH_CREDENTIAL)
+ {
+ // The cache was unable to load credentials via
queryHashedPassword, probably because the supplied
+ // rolename doesn't exist. If caching is enabled we will have now
cached the sentinel value for that key
+ // so we should invalidate it otherwise the cache will continue to
serve that until it expires which
+ // will be a problem if the role is added in the meantime.
+ //
+ // We can't just throw the AuthenticationException directly from
queryHashedPassword for a similar reason:
+ // if an existing role is dropped and active updates are enabled
for the cache, the refresh in
+ // CacheRefresher::run will log and swallow the exception and keep
serving the stale credentials until they
+ // eventually expire.
+ //
+ // So whenever we encounter the sentinal value, here and also in
CacheRefresher (if active updates are
+ // enabled), we manually expunge the key from the cache. If
caching is not enabled, AuthCache::invalidate
+ // is a safe no-op.
+ cache.invalidateCredentials(username);
+ throw new AuthenticationException(String.format("Provided username
%s and/or password are incorrect", username));
+ }
+
if (!checkpw(password, hash))
throw new AuthenticationException(String.format("Provided username
%s and/or password are incorrect", username));
@@ -111,14 +135,15 @@ public class PasswordAuthenticator implements
IAuthenticator
ResultMessage.Rows rows = select(authenticateStatement, options);
// If either a non-existent role name was supplied, or no
credentials
- // were found for that role we don't want to cache the result so
we throw
- // an exception.
+ // were found for that role, we don't want to cache the result so
we
+ // return a sentinel value. On receiving the sentinel, the caller
can
+ // invalidate the cache and throw an appropriate exception.
if (rows.result.isEmpty())
- throw new AuthenticationException(String.format("Provided
username %s and/or password are incorrect", username));
+ return NO_SUCH_CREDENTIAL;
UntypedResultSet result = UntypedResultSet.create(rows.result);
if (!result.one().has(SALTED_HASH))
- throw new AuthenticationException(String.format("Provided
username %s and/or password are incorrect", username));
+ return NO_SUCH_CREDENTIAL;
return result.one().getString(SALTED_HASH);
}
@@ -257,8 +282,12 @@ public class PasswordAuthenticator implements
IAuthenticator
DatabaseDescriptor::getCredentialsUpdateInterval,
DatabaseDescriptor::setCredentialsCacheMaxEntries,
DatabaseDescriptor::getCredentialsCacheMaxEntries,
+ DatabaseDescriptor::setCredentialsCacheActiveUpdate,
+ DatabaseDescriptor::getCredentialsCacheActiveUpdate,
authenticator::queryHashedPassword,
- () -> true);
+ () -> true,
+ (k,v) -> NO_SUCH_CREDENTIAL == v); // use a known object as
a sentinel value. CacheRefresher will
+ // invalidate the key if
the sentinel is loaded during a refresh
}
public void invalidateCredentials(String roleName)
diff --git a/src/java/org/apache/cassandra/auth/PermissionsCache.java
b/src/java/org/apache/cassandra/auth/PermissionsCache.java
index a649c35..fd1fce8 100644
--- a/src/java/org/apache/cassandra/auth/PermissionsCache.java
+++ b/src/java/org/apache/cassandra/auth/PermissionsCache.java
@@ -34,6 +34,8 @@ public class PermissionsCache extends
AuthCache<Pair<AuthenticatedUser, IResourc
DatabaseDescriptor::getPermissionsUpdateInterval,
DatabaseDescriptor::setPermissionsCacheMaxEntries,
DatabaseDescriptor::getPermissionsCacheMaxEntries,
+ DatabaseDescriptor::setPermissionsCacheActiveUpdate,
+ DatabaseDescriptor::getPermissionsCacheActiveUpdate,
(p) -> authorizer.authorize(p.left, p.right),
() -> DatabaseDescriptor.getAuthorizer().requireAuthorization());
}
diff --git a/src/java/org/apache/cassandra/auth/RolesCache.java
b/src/java/org/apache/cassandra/auth/RolesCache.java
index 62fecfb..05a5759 100644
--- a/src/java/org/apache/cassandra/auth/RolesCache.java
+++ b/src/java/org/apache/cassandra/auth/RolesCache.java
@@ -34,6 +34,8 @@ public class RolesCache extends AuthCache<RoleResource,
Set<Role>> implements Ro
DatabaseDescriptor::getRolesUpdateInterval,
DatabaseDescriptor::setRolesCacheMaxEntries,
DatabaseDescriptor::getRolesCacheMaxEntries,
+ DatabaseDescriptor::setRolesCacheActiveUpdate,
+ DatabaseDescriptor::getRolesCacheActiveUpdate,
roleManager::getRoleDetails,
enableCache);
}
diff --git a/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
b/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
index 9179062..77bd5c0 100644
--- a/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
+++ b/src/java/org/apache/cassandra/auth/jmx/AuthorizationProxy.java
@@ -489,6 +489,8 @@ public class AuthorizationProxy implements InvocationHandler
DatabaseDescriptor::getPermissionsUpdateInterval,
DatabaseDescriptor::setPermissionsCacheMaxEntries,
DatabaseDescriptor::getPermissionsCacheMaxEntries,
+ DatabaseDescriptor::setPermissionsCacheActiveUpdate,
+ DatabaseDescriptor::getPermissionsCacheActiveUpdate,
AuthorizationProxy::loadPermissions,
() -> true);
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 0dc4180..5a1d048 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -59,12 +59,15 @@ public class Config
public volatile int permissions_validity_in_ms = 2000;
public volatile int permissions_cache_max_entries = 1000;
public volatile int permissions_update_interval_in_ms = -1;
+ public volatile boolean permissions_cache_active_update = false;
public volatile int roles_validity_in_ms = 2000;
public volatile int roles_cache_max_entries = 1000;
public volatile int roles_update_interval_in_ms = -1;
+ public volatile boolean roles_cache_active_update = false;
public volatile int credentials_validity_in_ms = 2000;
public volatile int credentials_cache_max_entries = 1000;
public volatile int credentials_update_interval_in_ms = -1;
+ public volatile boolean credentials_cache_active_update = false;
/* Hashing strategy Random or OPHF */
public String partitioner;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d246fc7..3ef1603 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1286,6 +1286,16 @@ public class DatabaseDescriptor
return conf.permissions_cache_max_entries = maxEntries;
}
+ public static boolean getPermissionsCacheActiveUpdate()
+ {
+ return conf.permissions_cache_active_update;
+ }
+
+ public static void setPermissionsCacheActiveUpdate(boolean update)
+ {
+ conf.permissions_cache_active_update = update;
+ }
+
public static int getRolesValidity()
{
return conf.roles_validity_in_ms;
@@ -1303,6 +1313,16 @@ public class DatabaseDescriptor
: conf.roles_update_interval_in_ms;
}
+ public static void setRolesCacheActiveUpdate(boolean update)
+ {
+ conf.roles_cache_active_update = update;
+ }
+
+ public static boolean getRolesCacheActiveUpdate()
+ {
+ return conf.roles_cache_active_update;
+ }
+
public static void setRolesUpdateInterval(int interval)
{
conf.roles_update_interval_in_ms = interval;
@@ -1350,6 +1370,16 @@ public class DatabaseDescriptor
return conf.credentials_cache_max_entries = maxEntries;
}
+ public static boolean getCredentialsCacheActiveUpdate()
+ {
+ return conf.credentials_cache_active_update;
+ }
+
+ public static void setCredentialsCacheActiveUpdate(boolean update)
+ {
+ conf.credentials_cache_active_update = update;
+ }
+
public static int getMaxValueSize()
{
return conf.max_value_size_in_mb * 1024 * 1024;
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 5015f75..c1ad9cb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4974,6 +4974,11 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return operationMode == Mode.NORMAL;
}
+ public boolean isDecommissioned()
+ {
+ return operationMode == Mode.DECOMMISSIONED;
+ }
+
public String getDrainProgress()
{
return String.format("Drained %s/%s ColumnFamilies", remainingCFs,
totalCFs);
diff --git a/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
b/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
index da97225..15e6b1f 100644
--- a/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
+++ b/test/unit/org/apache/cassandra/auth/AuthCacheTest.java
@@ -258,6 +258,8 @@ public class AuthCacheTest
() -> 1000,
(maxEntries) -> {},
() -> 10,
+ (updateActiveUpdate) -> {},
+ () -> false,
loadFunction,
cacheEnabledDelegate);
}
diff --git a/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java
b/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java
new file mode 100644
index 0000000..3340d82
--- /dev/null
+++ b/test/unit/org/apache/cassandra/auth/CacheRefresherTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CacheRefresherTest
+{
+ @Test
+ public void refresh() throws Exception
+ {
+ Map<String, String> src = new HashMap<>();
+ CacheLoader<String, String> loader = src::get;
+
+ // Supply the directExecutor so the refresh() call executes within the
refresher task like AuthCache (rather than async)
+ LoadingCache<String, String> cache = Caffeine.newBuilder()
+
.executor(MoreExecutors.directExecutor())
+ .build(loader);
+
+ AtomicBoolean skipRefresh = new AtomicBoolean(false);
+ BooleanSupplier skipCondition = skipRefresh::get;
+
+ CacheRefresher<String, String> refresher =
CacheRefresher.create("test", cache, (k, v) -> v.equals("removed"),
skipCondition);
+ src.put("some", "thing");
+ Assert.assertEquals("thing", cache.get("some"));
+
+ // Cache should still have old value...
+ src.put("some", "one");
+ Assert.assertEquals("thing", cache.get("some"));
+
+ // ... but refresher should update it
+ refresher.run();
+ Assert.assertEquals("one", cache.get("some"));
+
+ // If we just remove the value from the src, the cache should still
contain it
+ src.remove("some");
+ Assert.assertEquals("one", cache.get("some"));
+
+ // If we insert the special sentinel value into the src, the refresher
will invalidate it from the cache.
+ // This time when it's removed from the underlying storage, it's not
returned from the cache
+ src.put("some", "removed");
+ refresher.run();
+ src.remove("some");
+
+ // Remove from src
+ Assert.assertNull(cache.get("some"));
+
+ // If the skip condition returns true, don't refresh
+ src.put("some", "one");
+ Assert.assertEquals("one", cache.get("some"));
+ skipRefresh.set(true);
+ src.put("some", "body");
+ refresher.run();
+ Assert.assertEquals("one", cache.get("some"));
+
+ // Change the skip condition back to false and refresh
+ skipRefresh.set(false);
+ refresher.run();
+ Assert.assertEquals("body", cache.get("some"));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]