RANGER-265 Policy manager should timeout if a service is not responding to lookup requests in time.
Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7381bc4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7381bc4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7381bc4e Branch: refs/heads/tag-policy Commit: 7381bc4e971bf4383669584415143dccb6b0a918 Parents: 9a06c64 Author: Alok Lal <[email protected]> Authored: Fri May 15 10:00:01 2015 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Tue May 19 15:08:23 2015 -0700 ---------------------------------------------------------------------- .../plugin/service/ResourceLookupContext.java | 7 +- security-admin/pom.xml | 1 - .../org/apache/ranger/biz/ServiceDBStore.java | 2 +- .../java/org/apache/ranger/biz/ServiceMgr.java | 178 +++++++++++++++-- .../org/apache/ranger/common/RangerFactory.java | 33 ++++ .../org/apache/ranger/common/TimedExecutor.java | 160 ++++++++++++++++ .../common/TimedExecutorConfigurator.java | 93 +++++++++ .../apache/ranger/service/RangerFactory.java | 33 ---- .../conf.dist/ranger-admin-default-site.xml | 27 +++ .../apache/ranger/common/TestTimedExecutor.java | 190 +++++++++++++++++++ 10 files changed, 669 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java index 913f824..a8b8ac0 100644 --- a/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java +++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java @@ -27,8 +27,8 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; @JsonAutoDetect(getterVisibility=Visibility.NONE, setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY) @@ -82,4 +82,9 @@ public class ResourceLookupContext { public void setResources(Map<String, List<String>> resources) { this.resources = resources; } + + @Override + public String toString() { + return String.format("ResourceLookupContext={resourceName=%s,userInput=%s,resources=%s}", resourceName, userInput, resources); + } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/pom.xml ---------------------------------------------------------------------- diff --git a/security-admin/pom.xml b/security-admin/pom.xml index 9783d1f..3c26837 100644 --- a/security-admin/pom.xml +++ b/security-admin/pom.xml @@ -251,7 +251,6 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.11</version> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java index 91fafa4..62670c0 100644 --- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java @@ -40,6 +40,7 @@ import org.apache.ranger.common.PasswordUtils; import org.apache.ranger.common.RESTErrorUtil; import org.apache.ranger.common.RangerCommonEnums; import org.apache.ranger.common.RangerConstants; +import org.apache.ranger.common.RangerFactory; import org.apache.ranger.common.StringUtil; import org.apache.ranger.common.UserSessionBase; import org.apache.ranger.db.RangerDaoManager; @@ -106,7 +107,6 @@ import org.apache.ranger.plugin.util.SearchFilter; import org.apache.ranger.plugin.util.ServicePolicies; import org.apache.ranger.service.RangerAuditFields; import org.apache.ranger.service.RangerDataHistService; -import org.apache.ranger.service.RangerFactory; import org.apache.ranger.service.RangerPolicyService; import org.apache.ranger.service.RangerPolicyWithAssignedIdService; import org.apache.ranger.service.RangerServiceDefService; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java index 8498fbf..576090f 100644 --- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java @@ -23,13 +23,18 @@ import java.io.File; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.common.PropertiesUtil; +import org.apache.ranger.common.TimedExecutor; import org.apache.ranger.plugin.client.HadoopException; import org.apache.ranger.plugin.model.RangerService; import org.apache.ranger.plugin.model.RangerServiceDef; @@ -54,6 +59,9 @@ public class ServiceMgr { @Autowired ServiceDBStore svcDBStore; + @Autowired + TimedExecutor timedExecutor; + public List<String> lookupResource(String serviceName, ResourceLookupContext context, ServiceStore svcStore) throws Exception { List<String> ret = null; @@ -69,18 +77,9 @@ public class ServiceMgr { } if(svc != null) { - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - - try { - Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader()); - - ret = svc.lookupResource(context); - } catch (Exception e) { - LOG.error("==> ServiceMgr.lookupResource Error:" + e); - throw e; - } finally { - Thread.currentThread().setContextClassLoader(clsLoader); - } + LookupCallable callable = new LookupCallable(svc, context); + long time = getTimeoutValueForLookupInMilliSeconds(svc); + ret = timedExecutor.timedTask(callable, time, TimeUnit.MILLISECONDS); } if(LOG.isDebugEnabled()) { @@ -103,12 +102,11 @@ public class ServiceMgr { } if(svc != null) { - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader()); - - HashMap<String, Object> responseData = svc.validateConfig(); + // Timeout value use during validate config is 10 times that used during lookup + long time = getTimeoutValueForValidateConfigInMilliSeconds(svc); + ValidateCallable callable = new ValidateCallable(svc); + HashMap<String, Object> responseData = timedExecutor.timedTask(callable, time, TimeUnit.MILLISECONDS); ret = generateResponseForTestConn(responseData, ""); } catch (Exception e) { @@ -120,8 +118,6 @@ public class ServiceMgr { } ret = generateResponseForTestConn(respData, msg); LOG.error("==> ServiceMgr.validateConfig Error:" + e); - } finally { - Thread.currentThread().setContextClassLoader(clsLoader); } } @@ -344,5 +340,149 @@ public class ServiceMgr { vXResponse.setStatusCode(statusCode); return vXResponse; } + + static final long _DefaultTimeoutValue_Lookp = 1000; // 1 s + static final long _DefaultTimeoutValue_ValidateConfig = 10000; // 10 s + + long getTimeoutValueForLookupInMilliSeconds(RangerBaseService svc) { + return getTimeoutValueInMilliSeconds("resource.lookup", svc, _DefaultTimeoutValue_Lookp); + } + + long getTimeoutValueForValidateConfigInMilliSeconds(RangerBaseService svc) { + return getTimeoutValueInMilliSeconds("validate.config", svc, _DefaultTimeoutValue_ValidateConfig); + } + + long getTimeoutValueInMilliSeconds(final String type, RangerBaseService svc, long defaultValue) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("==> ServiceMgr.getTimeoutValueInMilliSeconds (%s, %s)", type, svc)); + } + String propertyName = type + ".timeout.value.in.ms"; // type == "lookup" || type == "validate-config" + + Long result = null; + Map<String, String> config = svc.getConfigs(); + if (config != null && config.containsKey(propertyName)) { + result = parseLong(config.get(propertyName)); + } + if (result != null) { + LOG.debug("Found override in service config!"); + } else { + String[] keys = new String[] { + "ranger.service." + svc.getServiceName() + "." + propertyName, + "ranger.servicetype." + svc.getServiceType() + "." + propertyName, + "ranger." + propertyName + }; + for (String key : keys) { + String value = PropertiesUtil.getProperty(key); + if (value != null) { + result = parseLong(value); + if (result != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using the value[" + value + "] found in property[" + key + "]"); + } + break; + } + } + } + } + if (result == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No overrides found in service config of properties file. Using supplied default of[" + defaultValue + "]!"); + } + result = defaultValue; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("<== ServiceMgr.getTimeoutValueInMilliSeconds (%s, %s): %s", type, svc, result)); + } + return result; + } + + Long parseLong(String str) { + try { + return Long.valueOf(str); + } catch (NumberFormatException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("ServiceMgr.parseLong: could not parse [" + str + "] as Long! Returning null"); + } + return null; + } + } + + abstract static class TimedCallable<T> implements Callable<T> { + + final RangerBaseService svc; + final Date creation; // NOTE: This would be different from when the callable was actually offered to the executor + + public TimedCallable(RangerBaseService svc) { + this.svc = svc; + this.creation = new Date(); + } + + @Override + public T call() throws Exception { + Date start = null; + if (LOG.isDebugEnabled()) { + start = new Date(); + LOG.debug("==> TimedCallable: " + toString()); + } + + ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader()); + return actualCall(); + } catch (Exception e) { + LOG.error("TimedCallable.call: Error:" + e); + throw e; + } finally { + Thread.currentThread().setContextClassLoader(clsLoader); + if (LOG.isDebugEnabled()) { + Date finish = new Date(); + long waitTime = start.getTime() - creation.getTime(); + long executionTime = finish.getTime() - start.getTime(); + LOG.debug(String.format("<== TimedCallable: %s: wait time[%d ms], execution time [%d ms]", toString(), waitTime, executionTime)); + } + } + } + + abstract T actualCall() throws Exception; + } + + static class LookupCallable extends TimedCallable<List<String>> { + + final ResourceLookupContext context; + + public LookupCallable(final RangerBaseService svc, final ResourceLookupContext context) { + super(svc); + this.context = context; + } + + @Override + public String toString() { + return String.format("lookup resource[%s] for service[%s], ", context.toString(), svc.getServiceName()); + } + + @Override + public List<String> actualCall() throws Exception { + List<String> ret = svc.lookupResource(context); + return ret; + } + } + + static class ValidateCallable extends TimedCallable<HashMap<String, Object>> { + + public ValidateCallable(RangerBaseService svc) { + super(svc); + } + + @Override + public String toString() { + return String.format("validate config for service[%s]", svc.getServiceName()); + } + + @Override + public HashMap<String, Object> actualCall() throws Exception { + return svc.validateConfig(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java b/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java new file mode 100644 index 0000000..29d972e --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.common; + +import org.apache.ranger.plugin.model.RangerPolicy; +import org.apache.ranger.plugin.model.RangerPolicyResourceSignature; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +@Service +@Scope("singleton") +public class RangerFactory { + public RangerPolicyResourceSignature createPolicyResourceSignature(RangerPolicy policy) { + return new RangerPolicyResourceSignature(policy); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java new file mode 100644 index 0000000..643d882 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.common; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.PostConstruct; + +import org.apache.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +@Service +@Scope("singleton") +public class TimedExecutor { + + static final private Logger LOG = Logger.getLogger(TimedExecutor.class); + + @Autowired + TimedExecutorConfigurator _configurator; + + ExecutorService _executorService; + + public TimedExecutor() { + } + + @PostConstruct + void initialize() { + initialize(_configurator); + } + + // Not designed for public access - only for testability + void initialize(TimedExecutorConfigurator configurator) { + final ThreadFactory _ThreadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("timed-executor-pool-%d") + .setUncaughtExceptionHandler(new LocalUncaughtExceptionHandler()) + .build(); + + final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(configurator.getBlockingQueueSize()); + + _executorService = new LocalThreadPoolExecutor(configurator.getCoreThreadPoolSize(), configurator.getMaxThreadPoolSize(), + configurator.getKeepAliveTime(), configurator.getKeepAliveTimeUnit(), + blockingQueue, _ThreadFactory); + } + + public <T> T timedTask(Callable<T> callable, long time, TimeUnit unit) throws Exception{ + try { + Future<T> future = _executorService.submit(callable); + if (LOG.isDebugEnabled()) { + if (future.isCancelled()) { + LOG.debug("Got back a future that was cancelled already for callable[" + callable + "]!"); + } + } + try { + T result = future.get(time, unit); + return result; + } catch (CancellationException | ExecutionException | InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("TimedExecutor: Caught exception[%s] for callable[%s]: detail[%s]. Re-throwing...", e.getClass().getName(), callable, e.getMessage())); + } + throw e; + } catch (TimeoutException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("TimedExecutor: Timed out waiting for callable[%s] to finish. Cancelling the task.", callable)); + } + boolean interruptRunningTask = true; + future.cancel(interruptRunningTask); + LOG.debug("TimedExecutor: Re-throwing timeout exception to caller"); + throw e; + } + } catch (RejectedExecutionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Executor rejected callable[" + callable + "], due to resource exhaustion. Rethrowing exception..."); + } + throw e; + } + } + + /** + * Not designed for public access. Non-private only for testability. Expected to be called by tests to do proper cleanup. + */ + void shutdown() { + _executorService.shutdownNow(); + } + + static class LocalUncaughtExceptionHandler implements UncaughtExceptionHandler { + + @Override + public void uncaughtException(Thread t, Throwable e) { + String message = String.format("TimedExecutor: Uncaught exception hanlder received exception[%s] in thread[%s]", t.getClass().getName(), t.getName()); + LOG.warn(message, e); + } + } + + static class LocalThreadPoolExecutor extends ThreadPoolExecutor { + + private ThreadLocal<Long> startNanoTime = new ThreadLocal<Long>(); + + public LocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + if (LOG.isDebugEnabled()) { + LOG.debug("TimedExecutor: Starting execution of a task."); + startNanoTime.set(System.nanoTime()); + } + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (LOG.isDebugEnabled()) { + long duration = System.nanoTime() - startNanoTime.get(); + LOG.debug("TimedExecutor: Done execution of task. Duration[" + duration/1000000 + " ms]."); + } + } + + @Override + protected void terminated() { + super.terminated(); + LOG.info("TimedExecutor: thread pool has terminated"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java new file mode 100644 index 0000000..1b43abe --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.common; + +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +@Service +@Scope("singleton") +public class TimedExecutorConfigurator { + + // these two are important and hence are user configurable. + static final String Property_MaxThreadPoolSize = "ranger.timed.executor.max.threadpool.size"; + static final String Property_QueueSize = "ranger.timed.executor.queue.size"; + // We need these default-defaults since default-site.xml file isn't inside the jar, i.e. file itself may be missing or values in it might be messed up! :( + static final int _DefaultMaxThreadPoolSize = 10; + static final private int _DefaultBlockingQueueSize = 100; + + + private int _maxThreadPoolSize; + private int _blockingQueueSize; + // The following are hard-coded for now and can be exposed if there is a pressing need. + private int _coreThreadPoolSize = 1; + private long _keepAliveTime = 10; + private TimeUnit _keepAliveTimeUnit = TimeUnit.SECONDS; + + public TimedExecutorConfigurator() { + } + + // Infrequently used class (once per lifetime of policy manager) hence, values read from property file aren't cached. + @PostConstruct + void initialize() { + Integer value = PropertiesUtil.getIntProperty(Property_MaxThreadPoolSize); + if (value == null) { + _maxThreadPoolSize = _DefaultMaxThreadPoolSize; + } else { + _maxThreadPoolSize = value; + } + + value = PropertiesUtil.getIntProperty(Property_QueueSize); + if (value == null) { + _blockingQueueSize = _DefaultBlockingQueueSize; + } else { + _blockingQueueSize = value; + } + } + /** + * Provided mostly only testability. + * @param maxThreadPoolSize + * @param blockingQueueSize + */ + public TimedExecutorConfigurator(int maxThreadPoolSize, int blockingQueueSize) { + _maxThreadPoolSize = maxThreadPoolSize; + _blockingQueueSize = blockingQueueSize; + } + + public int getCoreThreadPoolSize() { + return _coreThreadPoolSize; + } + public int getMaxThreadPoolSize() { + return _maxThreadPoolSize; + } + public long getKeepAliveTime() { + return _keepAliveTime; + } + public TimeUnit getKeepAliveTimeUnit() { + return _keepAliveTimeUnit; + } + public int getBlockingQueueSize() { + return _blockingQueueSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java b/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java deleted file mode 100644 index 7834262..0000000 --- a/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.service; - -import org.apache.ranger.plugin.model.RangerPolicy; -import org.apache.ranger.plugin.model.RangerPolicyResourceSignature; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Service; - -@Service -@Scope("singleton") -public class RangerFactory { - public RangerPolicyResourceSignature createPolicyResourceSignature(RangerPolicy policy) { - return new RangerPolicyResourceSignature(policy); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml ---------------------------------------------------------------------- diff --git a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml index 571d2a1..0783f69 100644 --- a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml +++ b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml @@ -395,14 +395,41 @@ <value>ranger.auditdb.password</value> <description></description> </property> + <property> <name>ranger.ldap.binddn.credential.alias</name> <value>ranger.ldap.binddn.password</value> <description></description> </property> + <property> <name>ranger.ldap.ad.binddn.credential.alias</name> <value>ranger.ad.binddn.password</value> <description></description> </property> + + <property> + <name>ranger.resource.lookup.timeout.value.in.ms</name> + <value>1000</value> + <description></description> + </property> + + <property> + <name>ranger.validate.config.timeout.value.in.ms</name> + <value>10000</value> + <description></description> + </property> + + <property> + <name>ranger.timed.executor.max.threadpool.size</name> + <value>10</value> + <description></description> + </property> + + <property> + <name>ranger.timed.executor.queue.size</name> + <value>100</value> + <description></description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7381bc4e/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java ---------------------------------------------------------------------- diff --git a/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java b/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java new file mode 100644 index 0000000..39d8ecf --- /dev/null +++ b/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Before; +import org.junit.Test; + +public class TestTimedExecutor { + + private static final Log LOG = LogFactory.getLog(TestTimedExecutor.class); + + @Before + public void before() { + + } + + @Test + public void test() throws InterruptedException { + /* + * Create a pool with 2 threads and queue size of 3 such that 6th item should get rejected right away due to capacity. + */ + int poolSize = 2; + int queueSize = 3; + _configurator = new TimedExecutorConfigurator(poolSize, queueSize); + // Just toa void thread shutting down and restarting set keep alive to high value. + _executor.initialize(_configurator); + + // now create 2 callalbles that would keep waiting unless we ask them to proceed + // create an executor which would simulate simultaneous threads calling into executor to perform lookups + ExecutorService executorService = Executors.newCachedThreadPool(); + List<Future<Integer>> futures = new ArrayList<Future<Integer>>(); + /* + * We would have 2 permits for 10 callables, such that + * - 2 should succeed + * - 5 should timeout (2 in pool + 3 in queue) + * - 3 should get rejected. + */ + Semaphore semaphore = new Semaphore(2); + /* + * We need a latch to keep track of when the processing is done so we can check the results of teh test + */ + CountDownLatch latch = new CountDownLatch(10); + // Callables will record exception in this map + final ConcurrentMap<String, AtomicInteger> results = new ConcurrentHashMap<String, AtomicInteger>(); + for (int i = 0; i < 10; i++) { + LookupTask lookupTask = new LookupTask(i, semaphore); + TimedTask timedTask = new TimedTask(_executor, lookupTask, 1, TimeUnit.SECONDS, results, latch); + Future<Integer> aFuture = executorService.submit(timedTask); + futures.add(aFuture); + } + // Let's wait for the threads to finish + LOG.debug("Starting to wait for threadpool to finish"); + latch.await(); + /* + * depending on how threads get scheduled the count in results would vary, except we know for sure that. + * - 2 must succeed since we have exactly 2 permits available. + * - sum of timed out and rejected must be equal to 8. + * - at least 3 and no more than 5 tasks must get rejected. + * - at least 3 and no more than 5 tasks must get timed out + */ + int successCount = results.get("success").get(); + int timeoutCount = results.get("java.util.concurrent.TimeoutException").get(); + int rejectedCount = results.get("java.util.concurrent.RejectedExecutionException").get(); + assertEquals("success count", 2, successCount); + assertTrue("timeout[" + timeoutCount + "]: 3 <= count(timeout) <= 5", timeoutCount >= 3 && timeoutCount <= 5); + assertTrue("rejected[" + rejectedCount + "]: 3 <= count(timeout) <= 5", rejectedCount >= 3 && rejectedCount <= 5); + assertEquals("total should equal 10", 10, successCount + timeoutCount + rejectedCount); + _executor.shutdown(); + } + + static final String format = "%15s id: %2d"; + + static class LookupTask implements Callable<Integer> { + + final int _id; + final private Semaphore _semaphore; + + public LookupTask(int id, Semaphore latch) { + _id = id; + _semaphore = latch; + } + + int getId() { + return _id; + } + + @Override + public Integer call() throws Exception { + LOG.debug(String.format(format, "Starting", _id)); + _semaphore.acquire(); + LOG.debug(String.format(format, "Acquired", _id)); + LOG.debug(String.format(format, "Ended", _id)); + return _id; + } + + } + + static class TimedTask implements Callable<Integer> { + + final LookupTask _callable; + final TimedExecutor _executor; + final ConcurrentMap<String, AtomicInteger> _results; + final long _timeout; + final TimeUnit _unit; + final CountDownLatch _latch; + + public TimedTask(TimedExecutor executor, LookupTask callable, int timout, TimeUnit unit, ConcurrentMap<String, AtomicInteger> results, CountDownLatch latch) { + _callable = callable; + _executor = executor; + _results = results; + _timeout = timout; + _unit = unit; + _latch = latch; + } + + @Override + public Integer call() throws Exception { + int id = _callable.getId(); + LOG.debug(String.format(format, "Submitting", id)); + try { + Integer result = _executor.timedTask(_callable, _timeout, _unit); + LOG.debug(String.format(format, "Finished", id)); + recordResult(_results, "success"); + return result; + } catch (Exception e) { + LOG.debug(String.format(format, "Exception", id)); + recordResult(_results, e); + // re-throw caught exception + throw e; + } finally { + _latch.countDown(); + } + } + + } + + static void recordResult(ConcurrentMap<String, AtomicInteger> results, String key) { + if (results.containsKey(key)) { + results.get(key).incrementAndGet(); + } else { + AtomicInteger previous = results.putIfAbsent(key, new AtomicInteger(1)); + if (previous != null) { // a value was already associated with the key + previous.incrementAndGet(); + } + } + } + + static void recordResult(ConcurrentMap<String, AtomicInteger> results, Exception e) { + String exceptionName = e.getClass().getCanonicalName(); + recordResult(results, exceptionName); + } + + private TimedExecutorConfigurator _configurator; + private TimedExecutor _executor = new TimedExecutor(); +}
