Repository: incubator-ranger
Updated Branches:
  refs/heads/master 68d01056c -> 154c49041


RANGER-265 Policy manager should timeout if a service is not responding to 
lookup requests in time.

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/154c4904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/154c4904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/154c4904

Branch: refs/heads/master
Commit: 154c49041863f040ce99c5d45fa5e996968ced96
Parents: 68d0105
Author: Alok Lal <[email protected]>
Authored: Fri May 15 10:00:01 2015 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Tue May 19 11:57:28 2015 -0700

----------------------------------------------------------------------
 .../plugin/service/ResourceLookupContext.java   |   7 +-
 security-admin/pom.xml                          |   1 -
 .../org/apache/ranger/biz/ServiceDBStore.java   |   2 +-
 .../java/org/apache/ranger/biz/ServiceMgr.java  | 178 +++++++++++++++--
 .../org/apache/ranger/common/RangerFactory.java |  33 ++++
 .../org/apache/ranger/common/TimedExecutor.java | 160 ++++++++++++++++
 .../common/TimedExecutorConfigurator.java       |  93 +++++++++
 .../apache/ranger/service/RangerFactory.java    |  33 ----
 .../conf.dist/ranger-admin-default-site.xml     |  27 +++
 .../apache/ranger/common/TestTimedExecutor.java | 190 +++++++++++++++++++
 10 files changed, 669 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
index 913f824..a8b8ac0 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
@@ -27,8 +27,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
 @JsonAutoDetect(getterVisibility=Visibility.NONE, 
setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY)
@@ -82,4 +82,9 @@ public class ResourceLookupContext {
        public void setResources(Map<String, List<String>> resources) {
                this.resources = resources;
        }
+       
+       @Override
+       public String toString() {
+               return 
String.format("ResourceLookupContext={resourceName=%s,userInput=%s,resources=%s}",
 resourceName, userInput, resources);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/pom.xml
----------------------------------------------------------------------
diff --git a/security-admin/pom.xml b/security-admin/pom.xml
index 9783d1f..3c26837 100644
--- a/security-admin/pom.xml
+++ b/security-admin/pom.xml
@@ -251,7 +251,6 @@
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
-                   <version>4.11</version>
                    <scope>test</scope>
                </dependency>
                <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java 
b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
index 009cbf8..2c9ceff 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
@@ -40,6 +40,7 @@ import org.apache.ranger.common.PasswordUtils;
 import org.apache.ranger.common.RESTErrorUtil;
 import org.apache.ranger.common.RangerCommonEnums;
 import org.apache.ranger.common.RangerConstants;
+import org.apache.ranger.common.RangerFactory;
 import org.apache.ranger.common.StringUtil;
 import org.apache.ranger.common.UserSessionBase;
 import org.apache.ranger.db.RangerDaoManager;
@@ -106,7 +107,6 @@ import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServicePolicies;
 import org.apache.ranger.service.RangerAuditFields;
 import org.apache.ranger.service.RangerDataHistService;
-import org.apache.ranger.service.RangerFactory;
 import org.apache.ranger.service.RangerPolicyService;
 import org.apache.ranger.service.RangerPolicyWithAssignedIdService;
 import org.apache.ranger.service.RangerServiceDefService;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java 
b/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
index 8498fbf..576090f 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
@@ -23,13 +23,18 @@ import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.common.PropertiesUtil;
+import org.apache.ranger.common.TimedExecutor;
 import org.apache.ranger.plugin.client.HadoopException;
 import org.apache.ranger.plugin.model.RangerService;
 import org.apache.ranger.plugin.model.RangerServiceDef;
@@ -54,6 +59,9 @@ public class ServiceMgr {
        @Autowired
        ServiceDBStore svcDBStore;
        
+       @Autowired
+       TimedExecutor timedExecutor;
+
        public List<String> lookupResource(String serviceName, 
ResourceLookupContext context, ServiceStore svcStore) throws Exception {
                List<String>      ret = null;
                
@@ -69,18 +77,9 @@ public class ServiceMgr {
                }
 
                if(svc != null) {
-                       ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-
-                       try {
-                               
Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader());
-
-                               ret = svc.lookupResource(context);
-                       } catch (Exception e) {
-                               LOG.error("==> ServiceMgr.lookupResource 
Error:" + e);
-                               throw e;
-                       } finally {
-                               
Thread.currentThread().setContextClassLoader(clsLoader);
-                       }
+                       LookupCallable callable = new LookupCallable(svc, 
context);
+                       long time = getTimeoutValueForLookupInMilliSeconds(svc);
+                       ret = timedExecutor.timedTask(callable, time, 
TimeUnit.MILLISECONDS);
                }
 
                if(LOG.isDebugEnabled()) {
@@ -103,12 +102,11 @@ public class ServiceMgr {
                }
 
                if(svc != null) {
-                       ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-
                        try {
-                               
Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader());
-
-                               HashMap<String, Object> responseData = 
svc.validateConfig();
+                               // Timeout value use during validate config is 
10 times that used during lookup
+                               long time = 
getTimeoutValueForValidateConfigInMilliSeconds(svc);
+                               ValidateCallable callable = new 
ValidateCallable(svc);
+                               HashMap<String, Object> responseData = 
timedExecutor.timedTask(callable, time, TimeUnit.MILLISECONDS);
 
                                ret = generateResponseForTestConn(responseData, 
"");
                        } catch (Exception e) {
@@ -120,8 +118,6 @@ public class ServiceMgr {
                                }
                                ret = generateResponseForTestConn(respData, 
msg);
                                LOG.error("==> ServiceMgr.validateConfig 
Error:" + e);
-                       } finally {
-                               
Thread.currentThread().setContextClassLoader(clsLoader);
                        }
                }
 
@@ -344,5 +340,149 @@ public class ServiceMgr {
                vXResponse.setStatusCode(statusCode);
                return vXResponse;
        }
+       
+       static final long _DefaultTimeoutValue_Lookp = 1000; // 1 s
+       static final long _DefaultTimeoutValue_ValidateConfig = 10000; // 10 s
+
+       long getTimeoutValueForLookupInMilliSeconds(RangerBaseService svc) {
+               return getTimeoutValueInMilliSeconds("resource.lookup", svc, 
_DefaultTimeoutValue_Lookp);
+       }
+       
+       long getTimeoutValueForValidateConfigInMilliSeconds(RangerBaseService 
svc) {
+               return getTimeoutValueInMilliSeconds("validate.config", svc, 
_DefaultTimeoutValue_ValidateConfig);
+       }
+       
+       long getTimeoutValueInMilliSeconds(final String type, RangerBaseService 
svc, long defaultValue) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(String.format("==> 
ServiceMgr.getTimeoutValueInMilliSeconds (%s, %s)", type, svc));
+               }
+               String propertyName = type + ".timeout.value.in.ms"; // type == 
"lookup" || type == "validate-config"
+
+               Long result = null;
+               Map<String, String> config = svc.getConfigs();
+               if (config != null && config.containsKey(propertyName)) {
+                       result = parseLong(config.get(propertyName));
+               }
+               if (result != null) {
+                       LOG.debug("Found override in service config!");
+               } else {
+                       String[] keys = new String[] {
+                                       "ranger.service." + 
svc.getServiceName() + "." + propertyName,
+                                       "ranger.servicetype." + 
svc.getServiceType() + "." + propertyName,
+                                       "ranger." + propertyName
+                       };
+                       for (String key : keys) {
+                               String value = PropertiesUtil.getProperty(key);
+                               if (value != null) {
+                                       result = parseLong(value);
+                                       if (result != null) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Using the 
value[" + value + "] found in property[" + key + "]");
+                                               }
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               if (result == null) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("No overrides found in service config 
of properties file.  Using supplied default of[" + defaultValue + "]!");
+                       }
+                       result = defaultValue;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(String.format("<== 
ServiceMgr.getTimeoutValueInMilliSeconds (%s, %s): %s", type, svc, result));
+               }
+               return result;
+       }
+       
+       Long parseLong(String str) {
+               try {
+                       return Long.valueOf(str);
+               } catch (NumberFormatException e) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("ServiceMgr.parseLong: could not 
parse [" + str + "] as Long! Returning null");
+                       }
+                       return null;
+               }
+       }
+       
+       abstract static class TimedCallable<T> implements Callable<T> {
+
+               final RangerBaseService svc;
+               final Date creation; // NOTE: This would be different from when 
the callable was actually offered to the executor
+
+               public TimedCallable(RangerBaseService svc) {
+                       this.svc = svc;
+                       this.creation = new Date();
+               }
+
+               @Override
+               public T call() throws Exception {
+                       Date start = null;
+                       if (LOG.isDebugEnabled()) {
+                               start = new Date();
+                               LOG.debug("==> TimedCallable: " + toString());
+                       }
+
+                       ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
+                       try {
+                               
Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader());
+                               return actualCall();
+                       } catch (Exception e) {
+                               LOG.error("TimedCallable.call: Error:" + e);
+                               throw e;
+                       } finally {
+                               
Thread.currentThread().setContextClassLoader(clsLoader);
+                               if (LOG.isDebugEnabled()) {
+                                       Date finish = new Date();
+                                       long waitTime = start.getTime() - 
creation.getTime();
+                                       long executionTime = finish.getTime() - 
start.getTime();
+                                       LOG.debug(String.format("<== 
TimedCallable: %s: wait time[%d ms], execution time [%d ms]", toString(), 
waitTime, executionTime));
+                               }
+                       }
+               }
+
+               abstract T actualCall() throws Exception;
+       }
+
+       static class LookupCallable extends TimedCallable<List<String>> {
+
+               final ResourceLookupContext context;
+
+               public LookupCallable(final RangerBaseService svc, final 
ResourceLookupContext context) {
+                       super(svc);
+                       this.context = context;
+               }
+
+               @Override
+               public String toString() {
+                       return String.format("lookup resource[%s] for 
service[%s], ", context.toString(), svc.getServiceName());
+               }
+
+               @Override
+               public List<String> actualCall() throws Exception {
+                       List<String> ret = svc.lookupResource(context);
+                       return ret;
+               }
+       }
+
+       static class ValidateCallable extends TimedCallable<HashMap<String, 
Object>> {
+
+               public ValidateCallable(RangerBaseService svc) {
+                       super(svc);
+               }
+
+               @Override
+               public String toString() {
+                       return String.format("validate config for service[%s]", 
svc.getServiceName());
+               }
+
+               @Override
+               public HashMap<String, Object> actualCall() throws Exception {
+                       return svc.validateConfig();
+               }
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java 
b/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java
new file mode 100644
index 0000000..29d972e
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+@Service
+@Scope("singleton")
+public class RangerFactory {
+       public RangerPolicyResourceSignature 
createPolicyResourceSignature(RangerPolicy policy) {
+               return new RangerPolicyResourceSignature(policy);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java 
b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java
new file mode 100644
index 0000000..643d882
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Service
+@Scope("singleton")
+public class TimedExecutor {
+
+       static final private Logger LOG = Logger.getLogger(TimedExecutor.class);
+
+       @Autowired
+       TimedExecutorConfigurator _configurator;
+       
+       ExecutorService _executorService;
+       
+       public TimedExecutor() {
+       }
+       
+       @PostConstruct
+       void initialize() {
+               initialize(_configurator);
+       }
+               
+       // Not designed for public access - only for testability
+       void initialize(TimedExecutorConfigurator configurator) {
+               final ThreadFactory _ThreadFactory = new ThreadFactoryBuilder()
+                                                                               
.setDaemon(true)
+                                                                               
.setNameFormat("timed-executor-pool-%d")
+                                                                               
.setUncaughtExceptionHandler(new LocalUncaughtExceptionHandler())
+                                                                               
.build();
+
+               final BlockingQueue<Runnable> blockingQueue = new 
ArrayBlockingQueue<>(configurator.getBlockingQueueSize());
+
+               _executorService = new 
LocalThreadPoolExecutor(configurator.getCoreThreadPoolSize(), 
configurator.getMaxThreadPoolSize(),
+                                                                               
                                configurator.getKeepAliveTime(), 
configurator.getKeepAliveTimeUnit(), 
+                                                                               
                                blockingQueue, _ThreadFactory);
+       }
+       
+       public <T> T timedTask(Callable<T> callable, long time, TimeUnit unit) 
throws Exception{
+               try {
+               Future<T> future = _executorService.submit(callable);
+                       if (LOG.isDebugEnabled()) {
+                               if (future.isCancelled()) {
+                                       LOG.debug("Got back a future that was 
cancelled already for callable[" + callable + "]!");
+                               }
+                       }
+                       try {
+                               T result = future.get(time, unit);
+                               return result;
+                       } catch (CancellationException | ExecutionException | 
InterruptedException e) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug(String.format("TimedExecutor: 
Caught exception[%s] for callable[%s]: detail[%s].  Re-throwing...", 
e.getClass().getName(), callable, e.getMessage()));
+                               }
+                               throw e;
+                       } catch (TimeoutException e) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug(String.format("TimedExecutor: 
Timed out waiting for callable[%s] to finish.  Cancelling the task.", 
callable));
+                               }
+                               boolean interruptRunningTask = true;
+                               future.cancel(interruptRunningTask);
+                               LOG.debug("TimedExecutor: Re-throwing timeout 
exception to caller");
+                               throw e;
+                       }
+               } catch (RejectedExecutionException e) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Executor rejected callable[" + 
callable + "], due to resource exhaustion.  Rethrowing exception...");
+                       }
+                       throw e;
+               }
+       }
+       
+       /**
+        * Not designed for public access.  Non-private only for testability.  
Expected to be called by tests to do proper cleanup.
+        */
+       void shutdown() {
+               _executorService.shutdownNow();
+       }
+       
+       static class LocalUncaughtExceptionHandler implements 
UncaughtExceptionHandler {
+
+               @Override
+               public void uncaughtException(Thread t, Throwable e) {
+                       String message = String.format("TimedExecutor: Uncaught 
exception hanlder received exception[%s] in thread[%s]", 
t.getClass().getName(), t.getName());
+                       LOG.warn(message, e);
+               }
+       }
+       
+       static class LocalThreadPoolExecutor extends ThreadPoolExecutor {
+
+               private ThreadLocal<Long> startNanoTime = new 
ThreadLocal<Long>();
+               
+               public LocalThreadPoolExecutor(int corePoolSize, int 
maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> 
workQueue, ThreadFactory threadFactory) {
+                       super(corePoolSize, maximumPoolSize, keepAliveTime, 
unit, workQueue, threadFactory);
+               }
+               
+               @Override
+               protected void beforeExecute(Thread t, Runnable r) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("TimedExecutor: Starting execution of 
a task.");
+                               startNanoTime.set(System.nanoTime());
+                       }
+                       super.beforeExecute(t, r);
+               }
+               
+               @Override
+               protected void afterExecute(Runnable r, Throwable t) {
+                       super.afterExecute(r, t);
+                       if (LOG.isDebugEnabled()) {
+                               long duration = System.nanoTime() - 
startNanoTime.get();
+                               LOG.debug("TimedExecutor: Done execution of 
task. Duration[" + duration/1000000 + " ms].");
+                       }
+               }
+               
+               @Override
+               protected void terminated() {
+                       super.terminated();
+                       LOG.info("TimedExecutor: thread pool has terminated");
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
 
b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
new file mode 100644
index 0000000..1b43abe
--- /dev/null
+++ 
b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+@Service
+@Scope("singleton")
+public class TimedExecutorConfigurator {
+
+       // these two are important and hence are user configurable.
+       static final String Property_MaxThreadPoolSize = 
"ranger.timed.executor.max.threadpool.size";
+       static final String Property_QueueSize = 
"ranger.timed.executor.queue.size";
+       // We need these default-defaults since default-site.xml file isn't 
inside the jar, i.e. file itself may be missing or values in it might be messed 
up! :(
+       static final int _DefaultMaxThreadPoolSize = 10;
+       static final private int _DefaultBlockingQueueSize = 100;
+
+
+       private int _maxThreadPoolSize;
+       private int _blockingQueueSize;
+       // The following are hard-coded for now and can be exposed if there is 
a pressing need.
+       private int _coreThreadPoolSize = 1;
+       private long _keepAliveTime = 10;
+       private TimeUnit _keepAliveTimeUnit = TimeUnit.SECONDS;
+       
+       public TimedExecutorConfigurator() {
+       }
+
+       // Infrequently used class (once per lifetime of policy manager) hence, 
values read from property file aren't cached.
+       @PostConstruct
+       void initialize() {
+               Integer value = 
PropertiesUtil.getIntProperty(Property_MaxThreadPoolSize);
+               if (value == null) {
+                       _maxThreadPoolSize = _DefaultMaxThreadPoolSize;
+               } else {
+                       _maxThreadPoolSize = value;
+               }
+
+               value = PropertiesUtil.getIntProperty(Property_QueueSize);
+               if (value == null) {
+                       _blockingQueueSize = _DefaultBlockingQueueSize;
+               } else {
+                       _blockingQueueSize = value;
+               }
+       }
+       /**
+        * Provided mostly only testability.
+        * @param maxThreadPoolSize
+        * @param blockingQueueSize
+        */
+       public TimedExecutorConfigurator(int maxThreadPoolSize, int 
blockingQueueSize) {
+               _maxThreadPoolSize = maxThreadPoolSize;
+               _blockingQueueSize = blockingQueueSize;
+       }
+       
+       public int getCoreThreadPoolSize() {
+               return _coreThreadPoolSize;
+       }
+       public int getMaxThreadPoolSize() {
+               return _maxThreadPoolSize;
+       }
+       public long getKeepAliveTime() {
+               return _keepAliveTime;
+       }
+       public TimeUnit getKeepAliveTimeUnit() {
+               return _keepAliveTimeUnit;
+       }
+       public int getBlockingQueueSize() {
+               return _blockingQueueSize;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java 
b/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java
deleted file mode 100644
index 7834262..0000000
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.service;
-
-import org.apache.ranger.plugin.model.RangerPolicy;
-import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
-import org.springframework.context.annotation.Scope;
-import org.springframework.stereotype.Service;
-
-@Service
-@Scope("singleton")
-public class RangerFactory {
-       public RangerPolicyResourceSignature 
createPolicyResourceSignature(RangerPolicy policy) {
-               return new RangerPolicyResourceSignature(policy);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml 
b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
index 571d2a1..0783f69 100644
--- a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
+++ b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
@@ -395,14 +395,41 @@
                <value>ranger.auditdb.password</value>
                <description></description>
        </property>
+
        <property>
                <name>ranger.ldap.binddn.credential.alias</name>
                <value>ranger.ldap.binddn.password</value>
                <description></description>
        </property>
+
        <property>
                <name>ranger.ldap.ad.binddn.credential.alias</name>
                <value>ranger.ad.binddn.password</value>
                <description></description>
        </property>
+
+       <property>
+               <name>ranger.resource.lookup.timeout.value.in.ms</name>
+               <value>1000</value>
+               <description></description>
+       </property>
+
+       <property>
+               <name>ranger.validate.config.timeout.value.in.ms</name>
+               <value>10000</value>
+               <description></description>
+       </property>
+
+       <property>
+               <name>ranger.timed.executor.max.threadpool.size</name>
+               <value>10</value>
+               <description></description>
+       </property>
+
+       <property>
+               <name>ranger.timed.executor.queue.size</name>
+               <value>100</value>
+               <description></description>
+       </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java 
b/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java
new file mode 100644
index 0000000..39d8ecf
--- /dev/null
+++ 
b/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTimedExecutor {
+
+       private static final Log LOG = 
LogFactory.getLog(TestTimedExecutor.class);
+
+       @Before
+       public void before() {
+               
+       }
+       
+       @Test
+       public void test() throws InterruptedException {
+               /*
+                * Create a pool with 2 threads and queue size of 3 such that 
6th item should get rejected right away due to capacity.
+                */
+               int poolSize = 2;
+               int queueSize = 3;
+               _configurator = new TimedExecutorConfigurator(poolSize, 
queueSize);
+               // Just toa void thread shutting down and restarting set keep 
alive to high value.
+               _executor.initialize(_configurator);
+               
+               // now create 2 callalbles that would keep waiting unless we 
ask them to proceed
+               // create an executor which would simulate simultaneous threads 
calling into executor to perform lookups
+               ExecutorService executorService = 
Executors.newCachedThreadPool();
+               List<Future<Integer>> futures = new 
ArrayList<Future<Integer>>();
+               /*
+                * We would have 2 permits for 10 callables, such that
+                * - 2 should succeed
+                * - 5 should timeout (2 in pool + 3 in queue)
+                * - 3 should get rejected.
+                */
+               Semaphore semaphore = new Semaphore(2);
+               /*
+                * We need a latch to keep track of when the processing is done 
so we can check the results of teh test
+                */
+               CountDownLatch latch = new CountDownLatch(10);
+               // Callables will record exception in this map
+               final ConcurrentMap<String, AtomicInteger> results = new 
ConcurrentHashMap<String, AtomicInteger>(); 
+               for (int i = 0; i < 10; i++) {
+                       LookupTask lookupTask = new LookupTask(i, semaphore);
+                       TimedTask timedTask = new TimedTask(_executor, 
lookupTask, 1, TimeUnit.SECONDS, results, latch);
+                       Future<Integer> aFuture = 
executorService.submit(timedTask);
+                       futures.add(aFuture);
+               }
+               // Let's wait for the threads to finish
+               LOG.debug("Starting to wait for threadpool to finish");
+               latch.await();
+               /*
+                * depending on how threads get scheduled the count in results 
would vary, except we know for sure that.
+                * - 2 must succeed since we have exactly 2 permits available.
+                * - sum of timed out and rejected must be equal to 8.
+                * - at least 3 and no more than 5 tasks must get rejected.
+                * - at least 3 and no more than 5 tasks must get timed out
+                */
+               int successCount = results.get("success").get();
+               int timeoutCount = 
results.get("java.util.concurrent.TimeoutException").get();
+               int rejectedCount = 
results.get("java.util.concurrent.RejectedExecutionException").get();
+               assertEquals("success count", 2, successCount);
+               assertTrue("timeout[" + timeoutCount + "]: 3 <= count(timeout) 
<= 5", timeoutCount >= 3 && timeoutCount <= 5);
+               assertTrue("rejected[" + rejectedCount + "]: 3 <= 
count(timeout) <= 5", rejectedCount >= 3 && rejectedCount <= 5);
+               assertEquals("total should equal 10", 10, successCount + 
timeoutCount + rejectedCount);
+               _executor.shutdown();
+       }
+
+       static final String format = "%15s id: %2d";
+       
+       static class LookupTask implements Callable<Integer> {
+
+               final int _id;
+               final private Semaphore _semaphore;
+               
+               public LookupTask(int id, Semaphore latch) {
+                       _id = id;
+                       _semaphore = latch;
+               }
+               
+               int getId() {
+                       return _id;
+               }
+               
+               @Override
+               public Integer call() throws Exception {
+                       LOG.debug(String.format(format, "Starting", _id));
+                       _semaphore.acquire();
+                       LOG.debug(String.format(format, "Acquired", _id));
+                       LOG.debug(String.format(format, "Ended", _id));
+                       return _id;
+               }
+               
+       }
+
+       static class TimedTask implements Callable<Integer> {
+
+               final LookupTask _callable;
+               final TimedExecutor _executor;
+               final ConcurrentMap<String, AtomicInteger> _results;
+               final long _timeout;
+               final TimeUnit _unit;
+               final CountDownLatch _latch;
+               
+               public TimedTask(TimedExecutor executor, LookupTask callable, 
int timout, TimeUnit unit, ConcurrentMap<String, AtomicInteger> results, 
CountDownLatch latch) {
+                       _callable = callable;
+                       _executor = executor;
+                       _results = results;
+                       _timeout = timout;
+                       _unit = unit;
+                       _latch = latch;
+               }
+               
+               @Override
+               public Integer call() throws Exception {
+                       int id = _callable.getId();
+                       LOG.debug(String.format(format, "Submitting", id));
+                       try {
+                               Integer result = _executor.timedTask(_callable, 
_timeout, _unit);
+                               LOG.debug(String.format(format, "Finished", 
id));
+                               recordResult(_results, "success");
+                               return result;
+                       } catch (Exception e) {
+                               LOG.debug(String.format(format, "Exception", 
id));
+                               recordResult(_results, e);
+                               // re-throw caught exception
+                               throw e;
+                       } finally {
+                               _latch.countDown();
+                       }
+               }
+               
+       }
+       
+       static void recordResult(ConcurrentMap<String, AtomicInteger> results, 
String key) {
+               if (results.containsKey(key)) {
+                       results.get(key).incrementAndGet();
+               } else {
+                       AtomicInteger previous = results.putIfAbsent(key, new 
AtomicInteger(1));
+                       if (previous != null) {  // a value was already 
associated with the key
+                               previous.incrementAndGet();
+                       }
+               }
+       }
+
+       static void recordResult(ConcurrentMap<String, AtomicInteger> results, 
Exception e) {
+               String exceptionName = e.getClass().getCanonicalName();
+               recordResult(results, exceptionName);
+       }
+       
+       private TimedExecutorConfigurator _configurator;
+       private TimedExecutor _executor = new TimedExecutor(); 
+}

Reply via email to