This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch RANGER-3923 in repository https://gitbox.apache.org/repos/asf/ranger.git
commit 51ecff4e10ecfb1a958470ea4831de58aa7bb643 Author: Madhan Neethiraj <[email protected]> AuthorDate: Sat Sep 16 11:28:51 2023 -0700 RANGER-4302: caching of ServiceGdsInfo in Ranger admin using RangerCache implementation --- .../ranger/plugin/util/AutoClosableLock.java | 29 ++ .../org/apache/ranger/plugin/util/RangerCache.java | 370 ++++++++++++++++ .../apache/ranger/plugin/util/RangerCacheTest.java | 487 +++++++++++++++++++++ .../java/org/apache/ranger/biz/GdsDBStore.java | 145 +----- .../apache/ranger/common/ServiceGdsInfoCache.java | 290 ++++++++++++ .../ranger/util/RangerCacheDBValueLoader.java | 62 +++ 6 files changed, 1244 insertions(+), 139 deletions(-) diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/AutoClosableLock.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/AutoClosableLock.java index 270096a32..5082bc8c7 100644 --- a/agents-common/src/main/java/org/apache/ranger/plugin/util/AutoClosableLock.java +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/AutoClosableLock.java @@ -19,6 +19,7 @@ package org.apache.ranger.plugin.util; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -36,6 +37,34 @@ public class AutoClosableLock implements AutoCloseable { lock.unlock(); } + public static class AutoClosableTryLock implements AutoCloseable { + private final Lock lock; + private final boolean isLocked; + + public AutoClosableTryLock(Lock lock, long timeout, TimeUnit timeUnit) { + this.lock = lock; + + boolean isLocked = false; + + try { + isLocked = this.lock.tryLock(timeout, timeUnit); + } catch (InterruptedException excp) { + // ignored + } + + this.isLocked = isLocked; + } + + public boolean isLocked() { return isLocked; } + + @Override + public void close() { + if (isLocked) { + lock.unlock(); + } + } + } + public static class AutoClosableReadLock implements AutoCloseable { private final ReadWriteLock lock; diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerCache.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerCache.java new file mode 100644 index 000000000..da8725b2b --- /dev/null +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerCache.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.plugin.util; + +import com.sun.istack.NotNull; +import org.apache.ranger.plugin.util.AutoClosableLock.AutoClosableTryLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + + +public class RangerCache<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(RangerCache.class); + + public enum RefreshMode { ON_ACCESS, ON_SCHEDULE } // when to refresh the value: when a value is accessed? or on a scheduled interval? + + private static final AtomicInteger CACHE_NUMBER = new AtomicInteger(1); + private static final String CACHE_LOADER_THREAD_PREFIX = "ranger-cache-"; + private static final int DEFAULT_LOADER_THREADS_COUNT = 10; + private static final RefreshMode DEFAULT_REFRESH_MODE = RefreshMode.ON_ACCESS; + private static final int DEFAULT_VALUE_VALIDITY_PERIOD_MS = 30 * 1000; + private static final int DEFAULT_VALUE_INIT_TIMEOUT_MS = -1; // infinite timeout + private static final int DEFAULT_VALUE_REFRESH_TIMEOUT_MS = 10; + + private final String name; + private final Map<K, CachedValue> cache; + private ValueLoader<K, V> loader; // loader implementation that fetches the value for a given key from the source + private final int loaderThreadsCount; // number of threads to use for loading values into cache + private final RefreshMode refreshMode; // when to refresh a cached value: when a value is accessed? or on a scheduled interval? + private final long valueValidityPeriodMs; // minimum interval before a cached value is refreshed + private final long valueInitLoadTimeoutMs; // max time a caller would wait if cache doesn't have the value + private final long valueRefreshLoadTimeoutMs; // max time a caller would wait if cache already has the value, but needs refresh + private final ExecutorService loaderThreadPool; + + protected RangerCache(String name, ValueLoader<K, V> loader) { + this(name, loader, DEFAULT_LOADER_THREADS_COUNT, DEFAULT_REFRESH_MODE, DEFAULT_VALUE_VALIDITY_PERIOD_MS, DEFAULT_VALUE_INIT_TIMEOUT_MS, DEFAULT_VALUE_REFRESH_TIMEOUT_MS); + } + + protected RangerCache(String name, ValueLoader<K, V> loader, int loaderThreadsCount, RefreshMode refreshMode, long valueValidityPeriodMs, long valueInitLoadTimeoutMs, long valueRefreshLoadTimeoutMs) { + this.name = name; + this.cache = new ConcurrentHashMap<>(); + this.loader = loader; + this.loaderThreadsCount = loaderThreadsCount; + this.refreshMode = refreshMode; + this.valueValidityPeriodMs = valueValidityPeriodMs; + this.valueInitLoadTimeoutMs = valueInitLoadTimeoutMs; + this.valueRefreshLoadTimeoutMs = valueRefreshLoadTimeoutMs; + + if (this.refreshMode == RefreshMode.ON_SCHEDULE) { + this.loaderThreadPool = Executors.newScheduledThreadPool(loaderThreadsCount, createThreadFactory()); + } else { + this.loaderThreadPool = Executors.newFixedThreadPool(loaderThreadsCount, createThreadFactory()); + } + + LOG.info("Created RangerCache(name={}): loaderThreadsCount={}, refreshMode={}, valueValidityPeriodMs={}, valueInitLoadTimeoutMs={}, valueRefreshLoadTimeoutMs={}", name, loaderThreadsCount, refreshMode, valueValidityPeriodMs, valueInitLoadTimeoutMs, valueRefreshLoadTimeoutMs); + } + + protected void setLoader(ValueLoader<K, V> loader) { this.loader = loader; } + + public String getName() { return name; } + + public ValueLoader<K, V> getLoader() { return loader; } + + public int getLoaderThreadsCount() { return loaderThreadsCount; } + + public RefreshMode getRefreshMode() { return refreshMode; } + + public long getValueValidityPeriodMs() { return valueValidityPeriodMs; } + + public long getValueInitLoadTimeoutMs() { return valueInitLoadTimeoutMs; } + + public long getValueRefreshLoadTimeoutMs() { return valueRefreshLoadTimeoutMs; } + + public V get(K key) { + final long startTime = System.currentTimeMillis(); + final CachedValue value = cache.computeIfAbsent(key, f -> new CachedValue(key)); + final long timeoutMs = value.isInitialized() ? valueRefreshLoadTimeoutMs : valueInitLoadTimeoutMs; + final V ret; + + if (timeoutMs >= 0) { + final long timeTaken = System.currentTimeMillis() - startTime; + + if (timeoutMs <= timeTaken) { + ret = value.getCurrentValue(); + + if (LOG.isDebugEnabled()) { + LOG.debug("key={}: cache-lookup={}ms took longer than timeout={}ms. Using current value {}", key, timeTaken, timeoutMs, ret); + } + } else { + ret = value.getValue(timeoutMs - timeTaken); + } + } else { + ret = value.getValue(); + } + + return ret; + } + + public Set<K> getKeys() { + return new HashSet<>(cache.keySet()); + } + + public void addIfAbsent(K key) { + cache.computeIfAbsent(key, f -> new CachedValue(key)); + } + + public V remove(K key) { + CachedValue value = cache.remove(key); + final V ret; + + if (value != null) { + value.isRemoved = true; // so that the refresher thread doesn't schedule next refresh + + ret = value.getCurrentValue(); + } else { + ret = null; + } + + return ret; + } + + public boolean isLoaded(K key) { + CachedValue entry = cache.get(key); + RefreshableValue<V> value = entry != null ? entry.value : null; + + return value != null; + } + + public static class RefreshableValue<V> { + private final V value; + private long nextRefreshTimeMs = -1; + + public RefreshableValue(V value) { + this.value = value; + } + + public V getValue() { return value; } + + public boolean needsRefresh() { + return nextRefreshTimeMs == -1 || System.currentTimeMillis() > nextRefreshTimeMs; + } + + private void setNextRefreshTimeMs(long nextRefreshTimeMs) {this.nextRefreshTimeMs = nextRefreshTimeMs; } + } + + public static abstract class ValueLoader<K, V> { + public abstract RefreshableValue<V> load(K key, RefreshableValue<V> currentValue) throws Exception; + } + + private class CachedValue { + private final ReentrantLock lock = new ReentrantLock(); + private final K key; + private volatile boolean isRemoved = false; + private volatile RefreshableValue<V> value = null; + private volatile Future<?> refresher = null; + + private CachedValue(K key) { + if (LOG.isDebugEnabled()) { + LOG.debug("CachedValue({})", key); + } + + this.key = key; + } + + public K getKey() { return key; } + + public V getValue() { + refreshIfNeeded(); + + return getCurrentValue(); + } + + public V getValue(long timeoutMs) { + if (timeoutMs < 0) { + refreshIfNeeded(); + } else { + refreshIfNeeded(timeoutMs); + } + + return getCurrentValue(); + } + + public V getCurrentValue() { + RefreshableValue<V> value = this.value; + + return value != null ? value.getValue() : null; + } + + public boolean needsRefresh() { + return !isInitialized() || (refreshMode == RefreshMode.ON_ACCESS && value.needsRefresh()); + } + + public boolean isInitialized() { + RefreshableValue<V> value = this.value; + + return value != null; + } + + private void refreshIfNeeded() { + if (needsRefresh()) { + try (AutoClosableLock ignored = new AutoClosableLock(lock)) { + if (needsRefresh()) { + Future<?> future = this.refresher; + + if (future == null) { // refresh from current thread + if (LOG.isDebugEnabled()) { + LOG.debug("refreshIfNeeded(key={}): using caller thread", key); + } + + refreshValue(); + } else { // wait for the refresher to complete + try { + future.get(); + + this.refresher = null; + } catch (InterruptedException | ExecutionException excp) { + LOG.warn("refreshIfNeeded(key={}) failed", key, excp); + } + } + } + } + } + } + + private void refreshIfNeeded(long timeoutMs) { + if (needsRefresh()) { + long startTime = System.currentTimeMillis(); + + try (AutoClosableTryLock tryLock = new AutoClosableTryLock(lock, timeoutMs, TimeUnit.MILLISECONDS)) { + if (tryLock.isLocked()) { + if (needsRefresh()) { + Future<?> future = this.refresher; + + if (future == null) { + future = this.refresher = loaderThreadPool.submit(this::refreshValue); + + if (LOG.isDebugEnabled()) { + LOG.debug("refresher scheduled for key {}", key); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("refresher already exists for key {}", key); + } + } + + long timeLeftMs = timeoutMs - (System.currentTimeMillis() - startTime); + + if (timeLeftMs > 0) { + try { + future.get(timeLeftMs, TimeUnit.MILLISECONDS); + + this.refresher = null; + } catch (TimeoutException | InterruptedException | ExecutionException excp) { + if (LOG.isDebugEnabled()) { + LOG.debug("refreshIfNeeded(key={}, timeoutMs={}) failed", key, timeoutMs, excp); + } + } + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("refreshIfNeeded(key={}, timeoutMs={}) couldn't obtain lock", key, timeoutMs); + } + } + } + } + } + + private Boolean refreshValue() { + long startTime = System.currentTimeMillis(); + boolean isSuccess = false; + RefreshableValue<V> newValue = null; + + try { + ValueLoader<K, V> loader = RangerCache.this.loader; + + if (loader != null) { + newValue = loader.load(key, value); + isSuccess = true; + } + } catch (KeyNotFoundException excp) { + LOG.debug("refreshValue(key={}) failed with KeyNotFoundException. Removing it", key, excp); + + remove(key); // remove the key from cache (so that next get() will try to load it again + } catch (Exception excp) { + LOG.warn("refreshValue(key={}) failed", key, excp); + + // retain the old value, update the loadTime + newValue = value; + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("refresher {} for key {}, timeTaken={}", (isSuccess ? "completed" : "failed"), key, (System.currentTimeMillis() - startTime)); + } + + setValue(newValue); + + if (refreshMode == RefreshMode.ON_SCHEDULE) { + if (!isRemoved) { + ScheduledExecutorService scheduledExecutor = ((ScheduledExecutorService) loaderThreadPool); + + scheduledExecutor.schedule(this::refreshValue, valueValidityPeriodMs, TimeUnit.MILLISECONDS); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("key {} was removed. Not scheduling next refresh ", key); + } + } + } + } + + return Boolean.TRUE; + } + + private void setValue(RefreshableValue<V> value) { + if (value != null) { + this.value = value; + + this.value.setNextRefreshTimeMs(System.currentTimeMillis() + valueValidityPeriodMs); + } + } + } + + private ThreadFactory createThreadFactory() { + return new ThreadFactory() { + private final String namePrefix = CACHE_LOADER_THREAD_PREFIX + CACHE_NUMBER.getAndIncrement() + "-" + name; + private final AtomicInteger number = new AtomicInteger(1); + + @Override + public Thread newThread(@NotNull Runnable r) { + Thread t = new Thread(r, namePrefix + number.getAndIncrement()); + + if (!t.isDaemon()) { + t.setDaemon(true); + } + + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + + return t; + } + }; + } + + public static class KeyNotFoundException extends Exception { + public KeyNotFoundException(String msg) { + super(msg); + } + } +} diff --git a/agents-common/src/test/java/org/apache/ranger/plugin/util/RangerCacheTest.java b/agents-common/src/test/java/org/apache/ranger/plugin/util/RangerCacheTest.java new file mode 100644 index 000000000..8b894962e --- /dev/null +++ b/agents-common/src/test/java/org/apache/ranger/plugin/util/RangerCacheTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.plugin.util; + +import org.apache.ranger.plugin.util.RangerCache.RefreshMode; +import org.apache.ranger.plugin.util.RangerCache.RefreshableValue; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +public class RangerCacheTest { + private static final int CACHE_THREAD_COUNT = 25; + private static final int VALUE_VALIDITY_DURATION_MS = 3 * 1000; + private static final int VALUE_REFRESH_TIMEOUT_MS = 10; + private static final int VALUE_INIT_TIMEOUT_MS = -1; // no timeout for init + private static final int VALUE_LOAD_TIME_TYPICAL_MAX_MS = 8; + private static final int VALUE_LOAD_TIME_FAIL_MAX_MS = 100; + private static final int VALUE_LOAD_TIME_LONG_MIN_MS = VALUE_VALIDITY_DURATION_MS / 2; + private static final int VALUE_LOAD_TIME_VERY_LONG_MIN_MS = VALUE_VALIDITY_DURATION_MS; + + private static final int USER_COUNT = 100; + private static final int CACHE_CLIENT_THREAD_COUNT = 20; + private static final int CACHE_LOOKUP_COUNT = 200; + private static final int CACHE_LOOKUP_INTERVAL_MS = 5; + + private static final boolean IS_DEBUG_ENABLED = false; + + /* + * Test cases: + * 1. successful initial load and refresh + * 2. failure in initial load + * 3. failure in refresh + * 4. long initial load - just above half the value validity period + * 5. long refresh - just above half the value validity period + * 6. very long initial load - above the value validity period + * 7. very long refresh - above the value validity period + */ + private static final String USERNAME_PREFIX_TYPICAL_LOAD = "typical_"; + private static final String USERNAME_PREFIX_FAILED_FIRST_INIT = "failedFirstInit_"; + private static final String USERNAME_PREFIX_FAILED_INIT = "failedInit_"; + private static final String USERNAME_PREFIX_FAILED_REFRESH = "failedRefresh_"; + private static final String USERNAME_PREFIX_REMOVED = "removed_"; + private static final String USERNAME_PREFIX_LONG_INIT = "longInit_"; + private static final String USERNAME_PREFIX_LONG_REFRESH = "longRefresh_"; + private static final String USERNAME_PREFIX_VERY_LONG_INIT = "veryLongInit_"; + private static final String USERNAME_PREFIX_VERY_LONG_REFRESH = "veryLongRefresh_"; + + private final Random random = new Random(); + + + @Test + public void testOnAccessRefreshCacheMultiThreadedGet() throws Throwable { + UserGroupCache cache = createCache(RefreshMode.ON_ACCESS); + + runMultiThreadedGet("testOnAccessRefreshCacheMultiThreadedGet", cache); + } + + @Test + public void testOnScheduleRefreshCacheMultiThreadedGet() throws Throwable { + UserGroupCache cache = createCache(RefreshMode.ON_SCHEDULE); + + runMultiThreadedGet("testOnScheduleRefreshCacheMultiThreadedGet", cache); + } + + @Test + public void testOnScheduleRefreshCacheRemoveKey() throws Exception { + UserGroupCache cache = createCache(RefreshMode.ON_SCHEDULE); + ThreadPoolExecutor executor = new ThreadPoolExecutor(CACHE_CLIENT_THREAD_COUNT, CACHE_CLIENT_THREAD_COUNT, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + List<Future<?>> futures = new ArrayList<>(); + + long startTimeMs = System.currentTimeMillis(); + + // submit tasks to access cache from multiple threads + for (String user : cache.stats.keySet()) { + Future<?> future = executor.submit(new GetGroupsForUserFromCache(cache, user, 0)); + + futures.add(future); + } + + log(String.format("waiting for %s submitted tasks to complete", futures.size())); + for (Future<?> future : futures) { + future.get(); + } + + log(String.format("all submitted tasks completed: timeTaken=%sms", (System.currentTimeMillis() - startTimeMs))); + + executor.shutdown(); + + for (String user : cache.stats.keySet()) { + cache.remove(user); + } + + assertEquals("cache should have no users", 0, cache.getKeys().size()); + + log(String.format("all entries in the cache are now removed: timeTaken=%sms", (System.currentTimeMillis() - startTimeMs))); + } + + private void runMultiThreadedGet(String testName, UserGroupCache cache) throws Throwable { + ThreadPoolExecutor executor = new ThreadPoolExecutor(CACHE_CLIENT_THREAD_COUNT, CACHE_CLIENT_THREAD_COUNT, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + List<Future<?>> futures = new ArrayList<>(); + + long startTimeMs = System.currentTimeMillis(); + + for (int i = 0; i < CACHE_LOOKUP_COUNT; i++) { + for (String user : cache.stats.keySet()) { + Future<?> future = executor.submit(new GetGroupsForUserFromCache(cache, user, i)); + + futures.add(future); + } + } + + log(String.format("waiting for %s submitted tasks to complete", futures.size())); + for (Future<?> future : futures) { + future.get(); + } + + executor.shutdown(); + + printStats(testName, System.currentTimeMillis() - startTimeMs, cache); + } + + private UserGroupCache createCache(RefreshMode refreshMode) { + UserGroupCache ret = new UserGroupCache("ug", CACHE_THREAD_COUNT, refreshMode, VALUE_VALIDITY_DURATION_MS, VALUE_INIT_TIMEOUT_MS, VALUE_REFRESH_TIMEOUT_MS); + + // initialize cache with users + for (int i = 0; i < USER_COUNT; i++) { + ret.addUserStats(getUserName(i)); + } + + // prime the cache with empty entry for each user, to avoid the test from making excessive concurrent calls to enter into cache + for (String user : ret.stats.keySet()) { + ret.addIfAbsent(user); + } + + return ret; + } + + private String getUserName(int index) { + int percent = (index % USER_COUNT) * 100 / USER_COUNT; + final String ret; + + if (percent < 88) { + ret = USERNAME_PREFIX_TYPICAL_LOAD; + } else if (percent < 89) { + ret = USERNAME_PREFIX_FAILED_FIRST_INIT; + } else if (percent < 90) { + ret = USERNAME_PREFIX_FAILED_INIT; + } else if (percent < 91) { + ret = USERNAME_PREFIX_FAILED_REFRESH; + } else if (percent < 92) { + ret = USERNAME_PREFIX_REMOVED; + } else if (percent < 94) { + ret = USERNAME_PREFIX_LONG_INIT; + } else if (percent < 96) { + ret = USERNAME_PREFIX_LONG_REFRESH; + } else if (percent < 98) { + ret = USERNAME_PREFIX_VERY_LONG_INIT; + } else { + ret = USERNAME_PREFIX_VERY_LONG_REFRESH; + } + + return String.format("%s%04d", ret, index); + } + + private void log(String msg) { + System.out.println(new Date() + " [" + Thread.currentThread().getName() + "] " + msg); + } + + private void testLoadWait(UserGroupCache.UserStats userStats, RefreshableValue<List<String>> currVal) throws Exception { + boolean fail = false; + long sleepTimeMs; + String userName = userStats.userName; + + if (currVal == null) { // initial load + if (userName.startsWith(USERNAME_PREFIX_LONG_INIT)) { + sleepTimeMs = VALUE_LOAD_TIME_LONG_MIN_MS + random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + } else if (userName.startsWith(USERNAME_PREFIX_VERY_LONG_INIT)) { + sleepTimeMs = VALUE_LOAD_TIME_VERY_LONG_MIN_MS + random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + } else if (userName.startsWith(USERNAME_PREFIX_FAILED_FIRST_INIT)) { + sleepTimeMs = random.nextInt(VALUE_LOAD_TIME_FAIL_MAX_MS); + + fail = userStats.load.count.get() == 0; + } else if (userName.startsWith(USERNAME_PREFIX_FAILED_INIT)) { + sleepTimeMs = random.nextInt(VALUE_LOAD_TIME_FAIL_MAX_MS); + + fail = true; + } else { + sleepTimeMs = random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + } + } else { // refresh + if (userName.startsWith(USERNAME_PREFIX_LONG_REFRESH)) { + sleepTimeMs = VALUE_LOAD_TIME_LONG_MIN_MS + random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + } else if (userName.startsWith(USERNAME_PREFIX_VERY_LONG_REFRESH)) { + sleepTimeMs = VALUE_LOAD_TIME_VERY_LONG_MIN_MS + random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + } else if (userName.startsWith(USERNAME_PREFIX_FAILED_REFRESH)) { + sleepTimeMs = random.nextInt(VALUE_LOAD_TIME_FAIL_MAX_MS); + + fail = true; + } else if (userName.startsWith(USERNAME_PREFIX_REMOVED)) { + sleepTimeMs = random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + + fail = true; + } else { + sleepTimeMs = random.nextInt(VALUE_LOAD_TIME_TYPICAL_MAX_MS); + } + } + + sleep(sleepTimeMs); + + if (fail) { + if (userName.startsWith(USERNAME_PREFIX_REMOVED)) { + throw new RangerCache.KeyNotFoundException(userName + ": user not found"); + } else { + throw new Exception("failed to retrieve value"); + } + } + } + + private void sleep(long timeoutMs) { + try { + Thread.sleep(timeoutMs); + } catch (InterruptedException excp) { + // ignore + } + } + + /* Sample output: + * + testOnAccessRefreshCacheMultiThreadedGet(): timeTaken=7489ms + cache: loaderThreads=25, refreshMode=ON_ACCESS, valueValidityMs=3000, valueInitTimeoutMs=-1, valueRefreshTimeoutMs=10 + test: cacheKeyCount=100, cacheClientThreads=20, lookupCount=200, lookupIntervalMs=5 + userPrefix=failedFirstInit_ userCount=1 loadCount=4 getCount=200 avgLoadTime=19.750 avgGetTime=0.520 + userPrefix=failedInit_ userCount=1 loadCount=99 getCount=99 avgLoadTime=57.283 avgGetTime=57.566 + userPrefix=failedRefresh_ userCount=1 loadCount=3 getCount=200 avgLoadTime=21.667 avgGetTime=0.165 + userPrefix=longInit_ userCount=2 loadCount=4 getCount=322 avgLoadTime=756.000 avgGetTime=9.416 + userPrefix=longRefresh_ userCount=2 loadCount=4 getCount=400 avgLoadTime=756.500 avgGetTime=2.885 + userPrefix=removed_ userCount=1 loadCount=5 getCount=200 avgLoadTime=5.400 avgGetTime=0.205 + userPrefix=typical_ userCount=88 loadCount=264 getCount=17600 avgLoadTime=4.405 avgGetTime=0.147 + userPrefix=veryLongInit_ userCount=2 loadCount=4 getCount=236 avgLoadTime=1507.250 avgGetTime=25.691 + userPrefix=veryLongRefresh_ userCount=2 loadCount=4 getCount=400 avgLoadTime=1506.500 avgGetTime=5.260 + + ****** Detailed stats for each user ****** + failedFirstInit_0088: lastValue([group-1, group-2, group-3]), load(count: 4, totalTime: 79, minTime: 0, maxTime: 65, avgTime: 19.750), get(count: 200, totalTime: 104, minTime: 0, maxTime: 65, avgTime: 0.520) + failedInit_0089: lastValue(null), load(count: 99, totalTime: 5671, minTime: 0, maxTime: 110, avgTime: 57.283), get(count: 99, totalTime: 5699, minTime: 0, maxTime: 110, avgTime: 57.566) + failedRefresh_0090: lastValue([group-1]), load(count: 3, totalTime: 65, minTime: 6, maxTime: 33, avgTime: 21.667), get(count: 200, totalTime: 33, minTime: 0, maxTime: 12, avgTime: 0.165) + longInit_0092: lastValue([group-1, group-2]), load(count: 2, totalTime: 1513, minTime: 3, maxTime: 1510, avgTime: 756.500), get(count: 161, totalTime: 1513, minTime: 0, maxTime: 1510, avgTime: 9.398) + longInit_0093: lastValue([group-1, group-2]), load(count: 2, totalTime: 1511, minTime: 3, maxTime: 1508, avgTime: 755.500), get(count: 161, totalTime: 1519, minTime: 0, maxTime: 1508, avgTime: 9.435) + longRefresh_0094: lastValue([group-1, group-2]), load(count: 2, totalTime: 1513, minTime: 3, maxTime: 1510, avgTime: 756.500), get(count: 200, totalTime: 585, minTime: 0, maxTime: 39, avgTime: 2.925) + longRefresh_0095: lastValue([group-1, group-2]), load(count: 2, totalTime: 1513, minTime: 4, maxTime: 1509, avgTime: 756.500), get(count: 200, totalTime: 569, minTime: 0, maxTime: 38, avgTime: 2.845) + removed_0091: lastValue([group-1]), load(count: 5, totalTime: 27, minTime: 3, maxTime: 8, avgTime: 5.400), get(count: 200, totalTime: 41, minTime: 0, maxTime: 9, avgTime: 0.205) + typical_0000: lastValue([group-1, group-2, group-3]), load(count: 3, totalTime: 17, minTime: 2, maxTime: 8, avgTime: 5.667), get(count: 200, totalTime: 19, minTime: 0, maxTime: 10, avgTime: 0.095) + ... + typical_0087: lastValue([group-1, group-2, group-3]), load(count: 3, totalTime: 10, minTime: 1, maxTime: 6, avgTime: 3.333), get(count: 200, totalTime: 13, minTime: 0, maxTime: 7, avgTime: 0.065) + veryLongInit_0096: lastValue([group-1, group-2]), load(count: 2, totalTime: 3011, minTime: 5, maxTime: 3006, avgTime: 1505.500), get(count: 118, totalTime: 3032, minTime: 0, maxTime: 3015, avgTime: 25.695) + veryLongInit_0097: lastValue([group-1, group-2]), load(count: 2, totalTime: 3018, minTime: 13, maxTime: 3005, avgTime: 1509.000), get(count: 118, totalTime: 3031, minTime: 0, maxTime: 3014, avgTime: 25.686) + veryLongRefresh_0098: lastValue([group-1, group-2]), load(count: 2, totalTime: 3012, minTime: 6, maxTime: 3006, avgTime: 1506.000), get(count: 200, totalTime: 1050, minTime: 0, maxTime: 21, avgTime: 5.250) + veryLongRefresh_0099: lastValue([group-1, group-2]), load(count: 2, totalTime: 3014, minTime: 0, maxTime: 3014, avgTime: 1507.000), get(count: 200, totalTime: 1054, minTime: 0, maxTime: 19, avgTime: 5.270) + * + */ + private void printStats(String testName, long timeTakenMs, UserGroupCache cache) { + log(String.format("%s(): timeTaken=%sms", testName, timeTakenMs)); + log(String.format(" cache: loaderThreads=%s, refreshMode=%s, valueValidityMs=%s, valueInitTimeoutMs=%s, valueRefreshTimeoutMs=%s", cache.getLoaderThreadsCount(), cache.getRefreshMode(), cache.getValueValidityPeriodMs(), cache.getValueInitLoadTimeoutMs(), cache.getValueRefreshLoadTimeoutMs())); + log(String.format(" test: cacheKeyCount=%s, cacheClientThreads=%s, lookupCount=%s, lookupIntervalMs=%s", cache.stats.size(), CACHE_CLIENT_THREAD_COUNT, CACHE_LOOKUP_COUNT, CACHE_LOOKUP_INTERVAL_MS)); + + printStats(cache.stats, USERNAME_PREFIX_FAILED_FIRST_INIT); + printStats(cache.stats, USERNAME_PREFIX_FAILED_INIT); + printStats(cache.stats, USERNAME_PREFIX_FAILED_REFRESH); + printStats(cache.stats, USERNAME_PREFIX_LONG_INIT); + printStats(cache.stats, USERNAME_PREFIX_LONG_REFRESH); + printStats(cache.stats, USERNAME_PREFIX_REMOVED); + printStats(cache.stats, USERNAME_PREFIX_TYPICAL_LOAD); + printStats(cache.stats, USERNAME_PREFIX_VERY_LONG_INIT); + printStats(cache.stats, USERNAME_PREFIX_VERY_LONG_REFRESH); + + log(""); + log("****** Detailed stats for each user ******"); + + // print stats for all users, in a predictable order + List<String> userNames = new ArrayList<>(cache.stats.keySet()); + + Collections.sort(userNames); + + for (String userName : userNames) { + log(String.format("%s", cache.stats.get(userName))); + } + } + + private void printStats(Map<String, UserGroupCache.UserStats> stats, String userNamePrefix) { + long userCount = 0, loadCount = 0, getCount = 0, totalLoadTime = 0, totalGetTime = 0; + + for (Map.Entry<String, UserGroupCache.UserStats> entry : stats.entrySet()) { + String userName = entry.getKey(); + + if (!userName.startsWith(userNamePrefix)) { + continue; + } + + UserGroupCache.UserStats userStats = entry.getValue(); + + userCount++; + loadCount += userStats.load.count.get(); + getCount += userStats.get.count.get(); + totalLoadTime += userStats.load.totalTime.get(); + totalGetTime += userStats.get.totalTime.get(); + } + + log(String.format(" userPrefix=%-16s userCount=%-4s loadCount=%-5s getCount=%-7s avgLoadTime=%-9.3f avgGetTime=%-6.3f", userNamePrefix, userCount, loadCount, getCount, (totalLoadTime / (float)loadCount), (totalGetTime / (float)getCount))); + } + + // multiple instances of this class are used by the test to simulate simultaneous access to cache to obtain groups for users + private class GetGroupsForUserFromCache implements Runnable { + private final UserGroupCache cache; + private final String userName; + private final int lookupCount; + + public GetGroupsForUserFromCache(UserGroupCache cache, String userName, int lookupCount) { + this.cache = cache; + this.userName = userName; + this.lookupCount = lookupCount; + } + + @Override + public void run() { + UserGroupCache.UserStats userStats = cache.getUserStats(userName); + + // test threads can be blocked by values that take a long time to initialize + // avoid this by restricting such values to have only one pending call until they are initialized + if (!cache.isLoaded(userName) && userStats.inProgressCount.get() > 0) { + if (userName.startsWith(USERNAME_PREFIX_FAILED_INIT) || userName.startsWith(USERNAME_PREFIX_LONG_INIT) || userName.startsWith(USERNAME_PREFIX_VERY_LONG_INIT)) { + if (IS_DEBUG_ENABLED) { + log(String.format("[%s] [lookupCount=%s] get(%s): aborted, as initial loading is already in progress for this user", Thread.currentThread().getName(), lookupCount, userName)); + } + + return; + } + } + + userStats.inProgressCount.getAndIncrement(); + + long startTime = System.currentTimeMillis(); + List<String> userGroups = cache.get(userName); + long timeTaken = System.currentTimeMillis() - startTime; + + userStats.inProgressCount.getAndDecrement(); + + userStats.get.record(timeTaken); + + if (userName.startsWith(USERNAME_PREFIX_FAILED_INIT)) { + assertNull("userGroups should be null for user=" + userName + ", lookupCount=" + lookupCount, userGroups); + } else if (userName.startsWith(USERNAME_PREFIX_FAILED_FIRST_INIT)) { + if (lookupCount == 0) { + assertNull("userGroups should be null after first lookup for user=" + userName + ", lookupCount=" + lookupCount, userGroups); + } else { + assertNotNull("userGroups should be null only after first lookup for user=" + userName + ", lookupCount=" + lookupCount, userGroups); + } + } else { + assertNotNull("userGroups should not be null for user=" + userName + ", lookupCount=" + lookupCount, userGroups); + } + + userStats.lastValue = userGroups; + + if (IS_DEBUG_ENABLED) { + log(String.format("[%s] [lookupCount=%s] get(%s): timeTaken=%s, userGroups=%s", Thread.currentThread().getName(), lookupCount, userName, timeTaken, userGroups)); + } + + sleep(CACHE_LOOKUP_INTERVAL_MS); + } + } + + private class UserGroupCache extends RangerCache<String, List<String>> { + private final Map<String, UserStats> stats = new HashMap<>(); + + public UserGroupCache(String name, int loaderThreadsCount, RefreshMode refreshMode, long valueValidityPeriodMs, long valueInitLoadTimeoutMs, long valueRefreshLoadTimeoutMs) { + super(name, null, loaderThreadsCount, refreshMode, valueValidityPeriodMs, valueInitLoadTimeoutMs, valueRefreshLoadTimeoutMs); + + setLoader(new UserGroupLoader()); + } + + public void addUserStats(String userName) { + stats.put(userName, new UserStats(userName)); + } + + public UserStats getUserStats(String userName) { + return stats.get(userName); + } + + // + // this class implements value-loader interface used by the cache to populate and refresh the cache + // load() method simulates loading of groups for the given user + // + private class UserGroupLoader extends ValueLoader<String, List<String>> { + public UserGroupLoader() { + } + + @Override + public RefreshableValue<List<String>> load(String userName, RefreshableValue<List<String>> currVal) throws Exception { + long startTimeMs = System.currentTimeMillis(); + + UserStats userStats = stats.get(userName); + + try { + testLoadWait(userStats, currVal); // simulate various load conditions, depending on the userName + + // simply append 'group-#' to current value, where # is the number of groups including this one + final List<String> value = currVal != null && currVal.getValue() != null ? new ArrayList<>(currVal.getValue()) : new ArrayList<>(); + + value.add("group-" + (value.size() + 1)); + + return new RefreshableValue<>(value); + } finally { + userStats.load.record(System.currentTimeMillis() - startTimeMs); + } + } + } + + private class UserStats { + final String userName; + final TimedCounter get = new TimedCounter(); + final TimedCounter load = new TimedCounter(); + final AtomicLong inProgressCount = new AtomicLong(); + List<String> lastValue; + + public UserStats(String userName) { + this.userName = userName; + } + + @Override + public String toString() { + return userName + ": lastValue(" + lastValue + "), load(" + load + "), get(" + get + ")"; + } + } + } + + private static class TimedCounter { + final AtomicLong count = new AtomicLong(); + final AtomicLong totalTime = new AtomicLong(); + final AtomicLong minTime = new AtomicLong(Long.MAX_VALUE); + final AtomicLong maxTime = new AtomicLong(); + + public void record(long timeTaken) { + count.getAndIncrement(); + totalTime.addAndGet(timeTaken); + + long minTimeTaken = minTime.get(); + long maxTimeTaken = maxTime.get(); + + if (timeTaken < minTimeTaken) { + minTime.compareAndSet(minTimeTaken, timeTaken); + } + + if (timeTaken > maxTimeTaken) { + maxTime.compareAndSet(maxTimeTaken, timeTaken); + } + } + + @Override + public String toString() { + return "count: " + count.get() + ", totalTime: " + totalTime.get() + ", minTime: " + minTime.get() + ", maxTime: " + maxTime.get() + ", avgTime: " + (getAvgTimeMs()); + } + + private String getAvgTimeMs() { + long totalTime = this.totalTime.get(); + long count = this.count.get(); + + return String.format("%.3f", (count != 0 ? (totalTime / (double)count) : -1)); + } + } +} diff --git a/security-admin/src/main/java/org/apache/ranger/biz/GdsDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/GdsDBStore.java index 589fcdd68..30f2810c8 100755 --- a/security-admin/src/main/java/org/apache/ranger/biz/GdsDBStore.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/GdsDBStore.java @@ -36,7 +36,6 @@ import org.apache.ranger.plugin.store.AbstractGdsStore; import org.apache.ranger.plugin.store.PList; import org.apache.ranger.plugin.store.ServiceStore; import org.apache.ranger.plugin.util.*; -import org.apache.ranger.plugin.util.ServiceGdsInfo.*; import org.apache.ranger.service.*; import org.apache.ranger.validation.RangerGdsValidator; import org.apache.ranger.view.RangerGdsVList.*; @@ -46,7 +45,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import javax.servlet.http.HttpServletResponse; import java.util.*; @@ -107,6 +105,9 @@ public class GdsDBStore extends AbstractGdsStore { @Autowired RESTErrorUtil restErrorUtil; + @Autowired + ServiceGdsInfoCache serviceGdsInfoCache; + @PostConstruct public void initStore() { if (LOG.isDebugEnabled()) { @@ -1186,28 +1187,9 @@ public class GdsDBStore extends AbstractGdsStore { public ServiceGdsInfo getGdsInfoIfUpdated(String serviceName, Long lastKnownVersion) throws Exception { LOG.debug("==> GdsDBStore.getGdsInfoIfUpdated({}, {})", serviceName , lastKnownVersion); - Long serviceId = daoMgr.getXXService().findIdByName(serviceName); - - if (serviceId == null) { - LOG.error("Requested Service not found. serviceName={}", serviceName); - - throw restErrorUtil.createRESTException(HttpServletResponse.SC_NOT_FOUND, RangerServiceNotFoundException.buildExceptionMsg(serviceName), false); - } - - ServiceGdsInfo ret = null; - XXServiceVersionInfo serviceVersionInfo = daoMgr.getXXServiceVersionInfo().findByServiceId(serviceId); - Long currentGdsVersion = serviceVersionInfo != null ? serviceVersionInfo.getGdsVersion() : null; - - if (currentGdsVersion == null || !currentGdsVersion.equals(lastKnownVersion)) { - ret = retrieveServiceGdsInfo(serviceId, serviceName); - - Date lastUpdateTime = serviceVersionInfo != null ? serviceVersionInfo.getGdsUpdateTime() : null; - - ret.setGdsLastUpdateTime(lastUpdateTime != null ? lastUpdateTime.getTime() : null); - ret.setGdsVersion(currentGdsVersion); - } else { - LOG.debug("No change in gdsVersionInfo: serviceName={}, lastKnownVersion={}", serviceName, lastKnownVersion); - } + ServiceGdsInfo latest = serviceGdsInfoCache.get(serviceName); + Long latestVersion = latest != null ? latest.getGdsVersion() : null; + ServiceGdsInfo ret = (lastKnownVersion == null || lastKnownVersion == -1 || !lastKnownVersion.equals(latestVersion)) ? latest : null; LOG.debug("<== GdsDBStore.getGdsInfoIfUpdated({}, {}): ret={}", serviceName, lastKnownVersion, ret); @@ -1915,119 +1897,4 @@ public class GdsDBStore extends AbstractGdsStore { } } } - - private ServiceGdsInfo retrieveServiceGdsInfo(Long serviceId, String serviceName) throws Exception { - ServiceGdsInfo ret = new ServiceGdsInfo(); - - ret.setServiceName(serviceName); - ret.setGdsServiceDef(svcStore.getServiceDefByName(EMBEDDED_SERVICEDEF_GDS_NAME)); - - SearchFilter filter = new SearchFilter(SearchFilter.SERVICE_ID, serviceId.toString()); - - populateDatasets(ret, filter); - populateProjects(ret, filter); - populateDataShares(ret, filter); - populateSharedResources(ret, filter); - populateDataSharesInDataset(ret, filter); - populateDatasetsInProject(ret, filter); - - return ret; - } - - private void populateDatasets(ServiceGdsInfo gdsInfo, SearchFilter filter) { - for (RangerDataset dataset : datasetService.searchDatasets(filter).getList()) { - DatasetInfo dsInfo = new DatasetInfo(); - - dsInfo.setId(dataset.getId()); - dsInfo.setName(dataset.getName()); - dsInfo.setPolicies(getPolicies(daoMgr.getXXGdsDatasetPolicyMap().getDatasetPolicyIds(dataset.getId()))); - - gdsInfo.addDataset(dsInfo); - } - } - - private void populateProjects(ServiceGdsInfo gdsInfo, SearchFilter filter) { - for (RangerProject project : projectService.searchProjects(filter).getList()) { - ProjectInfo projInfo = new ProjectInfo(); - - projInfo.setId(project.getId()); - projInfo.setName(project.getName()); - projInfo.setPolicies(getPolicies(daoMgr.getXXGdsProjectPolicyMap().getProjectPolicyIds(project.getId()))); - - gdsInfo.addProject(projInfo); - } - } - - private void populateDataShares(ServiceGdsInfo gdsInfo, SearchFilter filter) { - RangerDataShareList dataShares = dataShareService.searchDataShares(filter); - - for (RangerDataShare dataShare : dataShares.getList()) { - DataShareInfo dshInfo = new DataShareInfo(); - - dshInfo.setId(dataShare.getId()); - dshInfo.setName(dataShare.getName()); - dshInfo.setZoneName(dataShare.getZone()); - dshInfo.setConditionExpr(dataShare.getConditionExpr()); - dshInfo.setDefaultAccessTypes(dataShare.getDefaultAccessTypes()); - dshInfo.setDefaultTagMasks(dataShare.getDefaultTagMasks()); - - gdsInfo.addDataShare(dshInfo); - } - } - - private void populateSharedResources(ServiceGdsInfo gdsInfo, SearchFilter filter) { - for (RangerSharedResource resource : sharedResourceService.searchSharedResources(filter).getList()) { - SharedResourceInfo resourceInfo = new SharedResourceInfo(); - - resourceInfo.setId(resource.getId()); - resourceInfo.setName(resource.getName()); - resourceInfo.setDataShareId(resource.getDataShareId()); - resourceInfo.setResource(resource.getResource()); - resourceInfo.setSubResource(resource.getSubResource()); - resourceInfo.setSubResourceType(resource.getSubResourceType()); - resourceInfo.setConditionExpr(resource.getConditionExpr()); - resourceInfo.setAccessTypes(resource.getAccessTypes()); - resourceInfo.setRowFilter(resource.getRowFilter()); - resourceInfo.setSubResourceMasks(resource.getSubResourceMasks()); - resourceInfo.setProfiles(resource.getProfiles()); - - gdsInfo.addResource(resourceInfo); - } - } - - private void populateDataSharesInDataset(ServiceGdsInfo gdsInfo, SearchFilter filter) { - for (RangerDataShareInDataset dshInDs : dataShareInDatasetService.searchDataShareInDatasets(filter).getList()) { - if (dshInDs.getStatus() != GdsShareStatus.ACTIVE) { - continue; - } - - DataShareInDatasetInfo dshInDsInfo = new DataShareInDatasetInfo(); - - dshInDsInfo.setDatasetId(dshInDs.getDatasetId()); - dshInDsInfo.setDataShareId(dshInDs.getDataShareId()); - dshInDsInfo.setStatus(dshInDs.getStatus()); - dshInDsInfo.setValiditySchedule(dshInDs.getValiditySchedule()); - dshInDsInfo.setProfiles(dshInDs.getProfiles()); - - gdsInfo.addDataShareInDataset(dshInDsInfo); - } - } - - private void populateDatasetsInProject(ServiceGdsInfo gdsInfo, SearchFilter filter) { - for (RangerDatasetInProject dip : datasetInProjectService.searchDatasetInProjects(filter).getList()) { - if (dip.getStatus() != GdsShareStatus.ACTIVE) { - continue; - } - - DatasetInProjectInfo dipInfo = new DatasetInProjectInfo(); - - dipInfo.setDatasetId(dip.getDatasetId()); - dipInfo.setProjectId(dip.getProjectId()); - dipInfo.setStatus(dip.getStatus()); - dipInfo.setValiditySchedule(dip.getValiditySchedule()); - dipInfo.setProfiles(dip.getProfiles()); - - gdsInfo.addDatasetInProjectInfo(dipInfo); - } - } } diff --git a/security-admin/src/main/java/org/apache/ranger/common/ServiceGdsInfoCache.java b/security-admin/src/main/java/org/apache/ranger/common/ServiceGdsInfoCache.java new file mode 100644 index 000000000..b27a32265 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/common/ServiceGdsInfoCache.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.common; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig; +import org.apache.ranger.db.RangerDaoManager; +import org.apache.ranger.entity.XXServiceVersionInfo; +import org.apache.ranger.plugin.model.RangerGds; +import org.apache.ranger.plugin.model.RangerPolicy; +import org.apache.ranger.plugin.store.ServiceStore; +import org.apache.ranger.plugin.util.RangerCache; +import org.apache.ranger.plugin.util.SearchFilter; +import org.apache.ranger.plugin.util.ServiceGdsInfo; +import org.apache.ranger.service.*; +import org.apache.ranger.util.RangerCacheDBValueLoader; +import org.apache.ranger.view.RangerGdsVList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_GDS_NAME; + +@Component +public class ServiceGdsInfoCache extends RangerCache<String, ServiceGdsInfo> { + private static final Logger LOG = LoggerFactory.getLogger(ServiceGdsInfoCache.class); + + public static final String PROP_LOADER_THREAD_POOL_SIZE = "ranger.admin.cache.gds-info.loader.threadpool.size"; + public static final String PROP_VALUE_INIT_TIMEOUT_MS = "ranger.admin.cache.gds-info.value.init.timeout.ms"; + public static final String PROP_VALUE_REFRESH_TIMEOUT_MS = "ranger.admin.cache.gds-info.value.refresh.timeout.ms"; + + private static final int DEFAULT_LOADER_THREAD_POOL_SIZE = 1; + private static final int DEFAULT_VALUE_INIT_LOAD_TIMEOUT_MS = -1; + private static final int DEFAULT_VALUE_REFRESH_LOAD_TIMEOUT_MS = 10 * 1000; // 10 seconds + + @Autowired + RangerDaoManager daoMgr; + + @Autowired + RangerGdsDataShareService dataShareService; + + @Autowired + RangerGdsSharedResourceService sharedResourceService; + + @Autowired + RangerGdsDatasetService datasetService; + + @Autowired + RangerGdsDataShareInDatasetService dataShareInDatasetService; + + @Autowired + RangerGdsProjectService projectService; + + @Autowired + RangerGdsDatasetInProjectService datasetInProjectService; + + @Autowired + ServiceStore svcStore; + + @Autowired + @Qualifier(value = "transactionManager") + PlatformTransactionManager txManager; + + public ServiceGdsInfoCache() { + super("ServiceGdsInfoCache", + null, // loader will be set in init(), so that txManager is initialized + getLoaderThreadPoolSize(), + RefreshMode.ON_ACCESS, + 0, // every access should look to refresh + getValueInitLoadTimeout(), + getValueRefreshLoadTimeout()); + } + + @PostConstruct + public void init() { + setLoader(new ServiceGdsInfoLoader(txManager)); + } + + + private static int getLoaderThreadPoolSize() { + return RangerAdminConfig.getInstance().getInt(PROP_LOADER_THREAD_POOL_SIZE, DEFAULT_LOADER_THREAD_POOL_SIZE); + } + + private static long getValueInitLoadTimeout() { + return RangerAdminConfig.getInstance().getLong(PROP_VALUE_INIT_TIMEOUT_MS, DEFAULT_VALUE_INIT_LOAD_TIMEOUT_MS); + } + + private static long getValueRefreshLoadTimeout() { + return RangerAdminConfig.getInstance().getLong(PROP_VALUE_REFRESH_TIMEOUT_MS, DEFAULT_VALUE_REFRESH_LOAD_TIMEOUT_MS); + } + + private class ServiceGdsInfoLoader extends RangerCacheDBValueLoader<String, ServiceGdsInfo> { + public ServiceGdsInfoLoader(PlatformTransactionManager txManager) { + super(txManager); + } + + @Override + protected RefreshableValue<ServiceGdsInfo> dbLoad(String serviceName, RefreshableValue<ServiceGdsInfo> currentValue) throws Exception { + XXServiceVersionInfo serviceVersionInfo = daoMgr.getXXServiceVersionInfo().findByServiceName(serviceName); + + if (serviceVersionInfo == null) { + LOG.error("Requested Service not found. serviceName={}", serviceName); + + throw new KeyNotFoundException(serviceName + ": service not found"); + } + + ServiceGdsInfo lastKnownGdsInfo = currentValue != null ? currentValue.getValue() : null; + Long lastKnownVersion = lastKnownGdsInfo != null ? lastKnownGdsInfo.getGdsVersion() : null; + Long latestVersion = serviceVersionInfo.getGdsVersion(); + + RefreshableValue<ServiceGdsInfo> ret = null; + + if (lastKnownVersion == null || !lastKnownVersion.equals(latestVersion)) { + ServiceGdsInfo latestGdsInfo = retrieveServiceGdsInfo(serviceVersionInfo.getServiceId(), serviceName); + Date latestUpdateTime = serviceVersionInfo.getGdsUpdateTime(); + + latestGdsInfo.setGdsLastUpdateTime(latestUpdateTime != null ? latestUpdateTime.getTime() : null); + latestGdsInfo.setGdsVersion(latestVersion); + + LOG.info("Refreshed gdsVersionInfo: serviceName={}, lastKnownVersion={}, latestVersion={}", serviceName, lastKnownVersion, latestVersion); + + ret = new RefreshableValue<>(latestGdsInfo); + } else { + LOG.debug("No change in gdsVersionInfo: serviceName={}, lastKnownVersion={}, latestVersion={}", serviceName, lastKnownVersion, latestVersion); + } + + return ret; + } + + private ServiceGdsInfo retrieveServiceGdsInfo(Long serviceId, String serviceName) throws Exception { + ServiceGdsInfo ret = new ServiceGdsInfo(); + + ret.setServiceName(serviceName); + ret.setGdsServiceDef(svcStore.getServiceDefByName(EMBEDDED_SERVICEDEF_GDS_NAME)); + + SearchFilter filter = new SearchFilter(SearchFilter.SERVICE_ID, serviceId.toString()); + + populateDatasets(ret, filter); + populateProjects(ret, filter); + populateDataShares(ret, filter); + populateSharedResources(ret, filter); + populateDataSharesInDataset(ret, filter); + populateDatasetsInProject(ret, filter); + + return ret; + } + + private void populateDatasets(ServiceGdsInfo gdsInfo, SearchFilter filter) { + for (RangerGds.RangerDataset dataset : datasetService.searchDatasets(filter).getList()) { + ServiceGdsInfo.DatasetInfo dsInfo = new ServiceGdsInfo.DatasetInfo(); + + dsInfo.setId(dataset.getId()); + dsInfo.setName(dataset.getName()); + dsInfo.setPolicies(getPolicies(daoMgr.getXXGdsDatasetPolicyMap().getDatasetPolicyIds(dataset.getId()))); + + gdsInfo.addDataset(dsInfo); + } + } + + private void populateProjects(ServiceGdsInfo gdsInfo, SearchFilter filter) { + for (RangerGds.RangerProject project : projectService.searchProjects(filter).getList()) { + ServiceGdsInfo.ProjectInfo projInfo = new ServiceGdsInfo.ProjectInfo(); + + projInfo.setId(project.getId()); + projInfo.setName(project.getName()); + projInfo.setPolicies(getPolicies(daoMgr.getXXGdsProjectPolicyMap().getProjectPolicyIds(project.getId()))); + + gdsInfo.addProject(projInfo); + } + } + + private void populateDataShares(ServiceGdsInfo gdsInfo, SearchFilter filter) { + RangerGdsVList.RangerDataShareList dataShares = dataShareService.searchDataShares(filter); + + for (RangerGds.RangerDataShare dataShare : dataShares.getList()) { + ServiceGdsInfo.DataShareInfo dshInfo = new ServiceGdsInfo.DataShareInfo(); + + dshInfo.setId(dataShare.getId()); + dshInfo.setName(dataShare.getName()); + dshInfo.setZoneName(dataShare.getZone()); + dshInfo.setConditionExpr(dataShare.getConditionExpr()); + dshInfo.setDefaultAccessTypes(dataShare.getDefaultAccessTypes()); + dshInfo.setDefaultTagMasks(dataShare.getDefaultTagMasks()); + + gdsInfo.addDataShare(dshInfo); + } + } + + private void populateSharedResources(ServiceGdsInfo gdsInfo, SearchFilter filter) { + for (RangerGds.RangerSharedResource resource : sharedResourceService.searchSharedResources(filter).getList()) { + ServiceGdsInfo.SharedResourceInfo resourceInfo = new ServiceGdsInfo.SharedResourceInfo(); + + resourceInfo.setId(resource.getId()); + resourceInfo.setName(resource.getName()); + resourceInfo.setDataShareId(resource.getDataShareId()); + resourceInfo.setResource(resource.getResource()); + resourceInfo.setSubResource(resource.getSubResource()); + resourceInfo.setSubResourceType(resource.getSubResourceType()); + resourceInfo.setConditionExpr(resource.getConditionExpr()); + resourceInfo.setAccessTypes(resource.getAccessTypes()); + resourceInfo.setRowFilter(resource.getRowFilter()); + resourceInfo.setSubResourceMasks(resource.getSubResourceMasks()); + resourceInfo.setProfiles(resource.getProfiles()); + + gdsInfo.addResource(resourceInfo); + } + } + + private void populateDataSharesInDataset(ServiceGdsInfo gdsInfo, SearchFilter filter) { + for (RangerGds.RangerDataShareInDataset dshInDs : dataShareInDatasetService.searchDataShareInDatasets(filter).getList()) { + if (dshInDs.getStatus() != RangerGds.GdsShareStatus.ACTIVE) { + continue; + } + + ServiceGdsInfo.DataShareInDatasetInfo dshInDsInfo = new ServiceGdsInfo.DataShareInDatasetInfo(); + + dshInDsInfo.setDatasetId(dshInDs.getDatasetId()); + dshInDsInfo.setDataShareId(dshInDs.getDataShareId()); + dshInDsInfo.setStatus(dshInDs.getStatus()); + dshInDsInfo.setValiditySchedule(dshInDs.getValiditySchedule()); + dshInDsInfo.setProfiles(dshInDs.getProfiles()); + + gdsInfo.addDataShareInDataset(dshInDsInfo); + } + } + + private void populateDatasetsInProject(ServiceGdsInfo gdsInfo, SearchFilter filter) { + for (RangerGds.RangerDatasetInProject dip : datasetInProjectService.searchDatasetInProjects(filter).getList()) { + if (dip.getStatus() != RangerGds.GdsShareStatus.ACTIVE) { + continue; + } + + ServiceGdsInfo.DatasetInProjectInfo dipInfo = new ServiceGdsInfo.DatasetInProjectInfo(); + + dipInfo.setDatasetId(dip.getDatasetId()); + dipInfo.setProjectId(dip.getProjectId()); + dipInfo.setStatus(dip.getStatus()); + dipInfo.setValiditySchedule(dip.getValiditySchedule()); + dipInfo.setProfiles(dip.getProfiles()); + + gdsInfo.addDatasetInProjectInfo(dipInfo); + } + } + + private List<RangerPolicy> getPolicies(List<Long> policyIds) { + List<RangerPolicy> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(policyIds)) { + for (Long policyId : policyIds) { + try { + RangerPolicy policy = svcStore.getPolicy(policyId); + + if (policy != null) { + ret.add(policy); + } + } catch (Exception excp) { + LOG.error("getPolicies(): failed to get policy with id=" + policyId, excp); + } + } + } + + return ret; + } + } +} diff --git a/security-admin/src/main/java/org/apache/ranger/util/RangerCacheDBValueLoader.java b/security-admin/src/main/java/org/apache/ranger/util/RangerCacheDBValueLoader.java new file mode 100644 index 000000000..7b5da29c3 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/util/RangerCacheDBValueLoader.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.util; + +import org.apache.ranger.plugin.util.RangerCache; +import org.apache.ranger.plugin.util.RangerCache.RefreshableValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; + +public abstract class RangerCacheDBValueLoader<K, V> extends RangerCache.ValueLoader<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(RangerCacheDBValueLoader.class); + + private final TransactionTemplate txTemplate; + + public RangerCacheDBValueLoader(PlatformTransactionManager txManager) { + this.txTemplate = new TransactionTemplate(txManager); + + txTemplate.setReadOnly(true); + } + + @Override + public RefreshableValue<V> load(K key, RefreshableValue<V> currentValue) throws Exception { + Exception[] ex = new Exception[1]; + + RefreshableValue<V> ret = txTemplate.execute(status -> { + try { + return dbLoad(key, currentValue); + } catch (Exception excp) { + LOG.error("RangerDBLoaderCache.load(): failed to load for key={}", key, excp); + + ex[0] = excp; + } + + return null; + }); + + if (ex[0] != null) { + throw ex[0]; + } + + return ret; + } + + protected abstract RefreshableValue<V> dbLoad(K key, RefreshableValue<V> currentValue) throws Exception; +}
