This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c668fb  RANGER-2349: Provide an API to download policies and tags
9c668fb is described below

commit 9c668fbdab6a4f327faaf9fc090200c5902e34ed
Author: Abhay Kulkarni <>
AuthorDate: Thu Mar 7 12:55:00 2019 -0800

    RANGER-2349: Provide an API to download policies and tags
---
 .../plugin/contextenricher/RangerTagEnricher.java  | 59 +++++++++-----
 .../ranger/plugin/service/RangerAuthContext.java   |  6 +-
 .../ranger/plugin/service/RangerBasePlugin.java    | 93 ++++++++++++++++++++--
 .../apache/ranger/plugin/util/DownloadTrigger.java | 36 +++++++++
 .../apache/ranger/plugin/util/DownloaderTask.java  | 47 +++++++++++
 .../apache/ranger/plugin/util/PolicyRefresher.java | 27 ++-----
 6 files changed, 221 insertions(+), 47 deletions(-)

diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
index 028efe8..fbf0360 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
@@ -36,6 +36,8 @@ import 
org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
 import 
org.apache.ranger.plugin.policyresourcematcher.RangerDefaultPolicyResourceMatcher;
 import 
org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceMatcher;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.DownloaderTask;
 import org.apache.ranger.plugin.service.RangerAuthContext;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
@@ -57,6 +59,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class RangerTagEnricher extends RangerAbstractContextEnricher {
        private static final Log LOG = 
LogFactory.getLog(RangerTagEnricher.class);
@@ -75,6 +80,9 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
        private EnrichedServiceTags                enrichedServiceTags;
        private boolean                            
disableCacheIfServiceNotFound = true;
 
+       private final BlockingQueue<DownloadTrigger> tagDownloadQueue = new 
LinkedBlockingQueue<>();
+       private Timer                              tagDownloadTimer;
+
        @Override
        public void init() {
                if (LOG.isDebugEnabled()) {
@@ -121,7 +129,7 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
                                tagRetriever.setAppId(appId);
                                
tagRetriever.init(enricherDef.getEnricherOptions());
 
-                               tagRefresher = new 
RangerTagRefresher(tagRetriever, this, -1L, cacheFile, pollingIntervalMs);
+                               tagRefresher = new 
RangerTagRefresher(tagRetriever, this, -1L, tagDownloadQueue, cacheFile);
 
                                try {
                                        tagRefresher.populateTags();
@@ -130,6 +138,19 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
                                }
                                tagRefresher.setDaemon(true);
                                tagRefresher.startRefresher();
+
+                               tagDownloadTimer = new 
Timer("policyDownloadTimer", true);
+
+                               try {
+                                       tagDownloadTimer.schedule(new 
DownloaderTask(tagDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Scheduled 
tagDownloadRefresher to download tags every " + pollingIntervalMs + " 
milliseconds");
+                                       }
+                               } catch (IllegalStateException exception) {
+                                       LOG.error("Error scheduling 
tagDownloadTimer:", exception);
+                                       LOG.error("*** Tags will NOT be 
downloaded every " + pollingIntervalMs + " milliseconds ***");
+                                       tagDownloadTimer = null;
+                               }
                        }
                } else {
                        LOG.error("No value specified for " + 
TAG_RETRIEVER_CLASSNAME_OPTION + " in the RangerTagEnricher options");
@@ -178,7 +199,6 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
                        LOG.debug("<== RangerTagEnricher.enrich(" + request + 
") with dataStore:[" + dataStore + "]): tags count=" + (matchedTags == null ? 0 
: matchedTags.size()));
                }
        }
-
        /*
         * This class implements a cache of result of look-up of keyset of 
policy-resources for each of the collections of hierarchies
         * for policy types: access, datamask and rowfilter. If a keyset is 
examined for validity in a hierarchy of a policy-type,
@@ -321,6 +341,11 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
 
                super.preCleanup();
 
+               if (tagDownloadTimer != null) {
+                       tagDownloadTimer.cancel();
+                       tagDownloadTimer = null;
+               }
+
                if (tagRefresher != null) {
                        tagRefresher.cleanup();
                        tagRefresher = null;
@@ -332,6 +357,12 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
                return ret;
        }
 
+       public void syncTagsWithAdmin(final DownloadTrigger token) throws 
InterruptedException {
+               tagDownloadQueue.put(token);
+               token.waitForCompletion();
+       }
+
+
        private Set<RangerTagForEval> findMatchingTags(final 
RangerAccessRequest request, EnrichedServiceTags dataStore) {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> RangerTagEnricher.findMatchingTags(" + 
request + ")");
@@ -530,24 +561,19 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
                private final RangerTagRetriever tagRetriever;
                private final RangerTagEnricher tagEnricher;
                private long lastKnownVersion = -1L;
+               private final BlockingQueue<DownloadTrigger> tagDownloadQueue;
                private long lastActivationTimeInMillis;
 
-               private final long pollingIntervalMs;
                private final String cacheFile;
                private boolean hasProvidedTagsToReceiver;
                private Gson gson;
 
-
-               final long getPollingIntervalMs() {
-                       return pollingIntervalMs;
-               }
-
-               RangerTagRefresher(RangerTagRetriever tagRetriever, 
RangerTagEnricher tagEnricher, long lastKnownVersion, String cacheFile, long 
pollingIntervalMs) {
+               RangerTagRefresher(RangerTagRetriever tagRetriever, 
RangerTagEnricher tagEnricher, long lastKnownVersion, 
BlockingQueue<DownloadTrigger> tagDownloadQueue, String cacheFile) {
                        this.tagRetriever = tagRetriever;
                        this.tagEnricher = tagEnricher;
                        this.lastKnownVersion = lastKnownVersion;
+                       this.tagDownloadQueue = tagDownloadQueue;
                        this.cacheFile = cacheFile;
-                       this.pollingIntervalMs = pollingIntervalMs;
                        try {
                                gson = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
                        } catch(Throwable excp) {
@@ -567,30 +593,25 @@ public class RangerTagEnricher extends 
RangerAbstractContextEnricher {
                public void run() {
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> 
RangerTagRefresher(pollingIntervalMs=" + pollingIntervalMs + ").run()");
+                               LOG.debug("==> RangerTagRefresher().run()");
                        }
 
                        while (true) {
 
                                try {
-
-                                       // Sleep first and then fetch tags
-                                       if (pollingIntervalMs > 0) {
-                                               Thread.sleep(pollingIntervalMs);
-                                       } else {
-                                               break;
-                                       }
                                        RangerPerfTracer perf = null;
 
                                        
if(RangerPerfTracer.isPerfTraceEnabled(PERF_CONTEXTENRICHER_INIT_LOG)) {
                                                perf = 
RangerPerfTracer.getPerfTracer(PERF_CONTEXTENRICHER_INIT_LOG, 
"RangerTagRefresher.populateTags(serviceName=" + tagRetriever.getServiceName() 
+ ",lastKnownVersion=" + lastKnownVersion + ")");
                                        }
+                                       DownloadTrigger trigger = 
tagDownloadQueue.take();
                                        populateTags();
+                                       trigger.signalCompletion();
 
                                        RangerPerfTracer.log(perf);
 
                                } catch (InterruptedException excp) {
-                                       
LOG.debug("RangerTagRefresher(pollingIntervalMs=" + pollingIntervalMs + 
").run() : interrupted! Exiting thread", excp);
+                                       LOG.debug("RangerTagRefresher().run() : 
interrupted! Exiting thread", excp);
                                        break;
                                }
                        }
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
index 8b00144..b2cccef 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerAuthContext.java
@@ -38,10 +38,10 @@ import 
org.apache.ranger.plugin.util.RangerAccessRequestUtil;
 import org.apache.ranger.plugin.util.ServicePolicies;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class RangerAuthContext implements RangerPolicyEngine {
     private RangerPolicyEngine policyEngine;
@@ -61,7 +61,7 @@ public class RangerAuthContext implements RangerPolicyEngine {
                    this.policyEngine = other.getPolicyEngine();
                    Map<RangerContextEnricher, Object> localReference = 
other.requestContextEnrichers;
                    if (MapUtils.isNotEmpty(localReference)) {
-                           this.requestContextEnrichers = new 
HashMap<>(localReference);
+                           this.requestContextEnrichers = new 
ConcurrentHashMap<>(localReference);
                    }
            }
     }
@@ -77,7 +77,7 @@ public class RangerAuthContext implements RangerPolicyEngine {
 
     public void addOrReplaceRequestContextEnricher(RangerContextEnricher 
enricher, Object database) {
         if (requestContextEnrichers == null) {
-            requestContextEnrichers = new HashMap<>();
+            requestContextEnrichers = new ConcurrentHashMap<>();
         }
 
         requestContextEnrichers.put(enricher, database);
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
index 96ca317..e52d4de 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
@@ -25,11 +25,14 @@ import java.util.Collection;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +43,8 @@ import org.apache.ranger.audit.provider.AuditProviderFactory;
 import org.apache.ranger.audit.provider.StandAloneAuditProviderFactory;
 import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
 import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.contextenricher.RangerContextEnricher;
+import org.apache.ranger.plugin.contextenricher.RangerTagEnricher;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
@@ -52,6 +57,8 @@ import 
org.apache.ranger.plugin.policyengine.RangerPolicyEngineImpl;
 import org.apache.ranger.plugin.policyengine.RangerPolicyEngineOptions;
 import org.apache.ranger.plugin.policyengine.RangerResourceAccessInfo;
 import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.DownloaderTask;
 import org.apache.ranger.plugin.util.GrantRevokeRequest;
 import org.apache.ranger.plugin.util.PolicyRefresher;
 import org.apache.ranger.plugin.util.ServicePolicies;
@@ -76,13 +83,17 @@ public class RangerBasePlugin {
        private RangerAccessResultProcessor resultProcessor;
        private boolean                   useForwardedIPAddress;
        private String[]                  trustedProxyAddresses;
+       private Timer                     policyDownloadTimer;
        private Timer                     policyEngineRefreshTimer;
        private RangerAuthContextListener authContextListener;
        private AuditProviderFactory      auditProviderFactory;
 
+       private final BlockingQueue<DownloadTrigger> policyDownloadQueue = new 
LinkedBlockingQueue<>();
+       private final DownloadTrigger                accessTrigger       = new 
DownloadTrigger();
+
 
        Map<String, LogHistory> logHistoryList = new Hashtable<String, 
RangerBasePlugin.LogHistory>();
-       int logInterval = 30000; // 30 seconds
+       int                     logInterval    = 30000; // 30 seconds
 
        public static Map<String, RangerBasePlugin> getServicePluginMap() {
                return servicePluginMap;
@@ -183,7 +194,6 @@ public class RangerBasePlugin {
                String cacheDir          = configuration.get(propertyPrefix + 
".policy.cache.dir");
                serviceName = configuration.get(propertyPrefix + 
".service.name");
                clusterName = 
RangerConfiguration.getInstance().get(propertyPrefix + ".ambari.cluster.name", 
"");
-
                useForwardedIPAddress = configuration.getBoolean(propertyPrefix 
+ ".use.x-forwarded-for.ipaddress", false);
                String trustedProxyAddressString = 
configuration.get(propertyPrefix + ".trusted.proxy.ipaddresses");
                trustedProxyAddresses = 
StringUtils.split(trustedProxyAddressString, 
RANGER_TRUSTED_PROXY_IPADDRESSES_SEPARATOR_CHAR);
@@ -220,10 +230,23 @@ public class RangerBasePlugin {
 
                RangerAdminClient admin = createAdminClient(serviceName, appId, 
propertyPrefix);
 
-               refresher = new PolicyRefresher(this, serviceType, appId, 
serviceName, admin, pollingIntervalMs, cacheDir);
+               refresher = new PolicyRefresher(this, serviceType, appId, 
serviceName, admin, policyDownloadQueue, cacheDir);
                refresher.setDaemon(true);
                refresher.startRefresher();
 
+               policyDownloadTimer = new Timer("policyDownloadTimer", true);
+
+               try {
+                       policyDownloadTimer.schedule(new 
DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Scheduled policyDownloadRefresher to 
download policies every " + pollingIntervalMs + " milliseconds");
+                       }
+               } catch (IllegalStateException exception) {
+                       LOG.error("Error scheduling policyDownloadTimer:", 
exception);
+                       LOG.error("*** Policies will NOT be downloaded every " 
+ pollingIntervalMs + " milliseconds ***");
+                       policyDownloadTimer = null;
+               }
+
                long policyReorderIntervalMs = 
configuration.getLong(propertyPrefix + ".policy.policyReorderInterval", 60 * 
1000);
                if (policyReorderIntervalMs >= 0 && policyReorderIntervalMs < 
15 * 1000) {
                        policyReorderIntervalMs = 15 * 1000;
@@ -365,17 +388,24 @@ public class RangerBasePlugin {
 
                Timer policyEngineRefreshTimer = this.policyEngineRefreshTimer;
 
+               Timer policyDownloadTimer = this.policyDownloadTimer;
+
                String serviceName = this.serviceName;
 
                this.serviceName  = null;
                this.policyEngine = null;
                this.refresher    = null;
                this.policyEngineRefreshTimer = null;
+               this.policyDownloadTimer = null;
 
                if (refresher != null) {
                        refresher.stopRefresher();
                }
 
+               if (policyDownloadTimer != null) {
+                       policyDownloadTimer.cancel();
+               }
+
                if (policyEngineRefreshTimer != null) {
                        policyEngineRefreshTimer.cancel();
                }
@@ -528,6 +558,7 @@ public class RangerBasePlugin {
        public void registerAuthContextEventListener(RangerAuthContextListener 
authContextListener) {
                this.authContextListener = authContextListener;
        }
+
        public void 
unregisterAuthContextEventListener(RangerAuthContextListener 
authContextListener) {
                this.authContextListener = null;
        }
@@ -572,6 +603,31 @@ public class RangerBasePlugin {
                return ret;
        }
 
+       public void refreshPoliciesAndTags() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> refreshPoliciesAndTags()");
+               }
+               try {
+                       // Synch-up policies
+                       long oldPolicyVersion = 
this.policyEngine.getPolicyVersion();
+                       syncPoliciesWithAdmin(accessTrigger);
+                       long newPolicyVersion = 
this.policyEngine.getPolicyVersion();
+
+                       if (oldPolicyVersion == newPolicyVersion) {
+                               // Synch-up tags
+                               RangerTagEnricher tagEnricher = 
getTagEnricher();
+                               if (tagEnricher != null) {
+                                       
tagEnricher.syncTagsWithAdmin(accessTrigger);
+                               }
+                       }
+               } catch (InterruptedException exception) {
+                       LOG.error("Failed to update policy-engine, continuing 
to use old policy-engine and/or tags", exception);
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.info("<== refreshPoliciesAndTags()");
+               }
+       }
+
        private void auditGrantRevoke(GrantRevokeRequest request, String 
action, boolean isSuccess, RangerAccessResultProcessor resultProcessor) {
                if(request != null && resultProcessor != null) {
                        RangerAccessRequestImpl accessRequest = new 
RangerAccessRequestImpl();
@@ -602,7 +658,7 @@ public class RangerBasePlugin {
                }
        }
 
-       public RangerServiceDef getDefaultServiceDef() {
+       private RangerServiceDef getDefaultServiceDef() {
                RangerServiceDef ret = null;
 
                if (StringUtils.isNotBlank(serviceType)) {
@@ -652,7 +708,7 @@ public class RangerBasePlugin {
                return false;
        }
 
-       static class LogHistory {
+       static private final class LogHistory {
                long lastLogTime;
                int counter;
        }
@@ -672,4 +728,29 @@ public class RangerBasePlugin {
                        }
                }
        }
+
+       private void syncPoliciesWithAdmin(final DownloadTrigger token) throws 
InterruptedException{
+               policyDownloadQueue.put(token);
+               token.waitForCompletion();
+       }
+
+       private RangerTagEnricher getTagEnricher() {
+               RangerTagEnricher ret = null;
+               RangerAuthContext authContext = getCurrentRangerAuthContext();
+               if (authContext != null) {
+                       Map<RangerContextEnricher, Object> contextEnricherMap = 
authContext.getRequestContextEnrichers();
+                       if (MapUtils.isNotEmpty(contextEnricherMap)) {
+                               Set<RangerContextEnricher> contextEnrichers = 
contextEnricherMap.keySet();
+
+                               for (RangerContextEnricher enricher : 
contextEnrichers) {
+                                       if (enricher instanceof 
RangerTagEnricher) {
+                                               ret = (RangerTagEnricher) 
enricher;
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               return ret;
+       }
+
 }
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
new file mode 100644
index 0000000..f5754f0
--- /dev/null
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloadTrigger.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+public final class DownloadTrigger {
+    private boolean isNotified = false;
+
+    public synchronized void waitForCompletion() throws InterruptedException {
+        while (!isNotified) {
+            wait();
+        }
+        isNotified = false;
+    }
+
+    public synchronized void signalCompletion() {
+        isNotified = true;
+        notifyAll();
+    }
+}
\ No newline at end of file
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java
new file mode 100644
index 0000000..1345f6f
--- /dev/null
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/DownloaderTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+
+public final class  DownloaderTask extends TimerTask {
+    private static final Log LOG = LogFactory.getLog(DownloaderTask.class);
+
+    private final DownloadTrigger timerTrigger = new DownloadTrigger();
+    private final BlockingQueue<DownloadTrigger> queue;
+
+    public DownloaderTask(BlockingQueue<DownloadTrigger> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        try {
+            queue.put(timerTrigger);
+            timerTrigger.waitForCompletion();
+        } catch (InterruptedException excp) {
+            LOG.error("Caught exception. Exiting thread");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index e85612a..0e52c31 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -24,6 +24,7 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.Reader;
 import java.io.Writer;
+import java.util.concurrent.BlockingQueue;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -45,18 +46,19 @@ public class PolicyRefresher extends Thread {
        private final String            serviceType;
        private final String            serviceName;
        private final RangerAdminClient rangerAdmin;
+       private final BlockingQueue<DownloadTrigger> policyDownloadQueue;
+
        private final String            cacheFileName;
        private final String            cacheDir;
        private final Gson              gson;
        private final boolean           disableCacheIfServiceNotFound;
 
-       private long    pollingIntervalMs   = 30 * 1000;
        private long    lastKnownVersion    = -1L;
        private long    lastActivationTimeInMillis;
        private boolean policiesSetInPlugin;
        private boolean serviceDefSetInPlugin;
 
-       public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, 
String appId, String serviceName, RangerAdminClient rangerAdmin, long 
pollingIntervalMs, String cacheDir) {
+       public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, 
String appId, String serviceName, RangerAdminClient rangerAdmin, 
BlockingQueue<DownloadTrigger> policyDownloadQueue, String cacheDir) {
                if(LOG.isDebugEnabled()) {
                        LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").PolicyRefresher()");
                }
@@ -65,7 +67,7 @@ public class PolicyRefresher extends Thread {
                this.serviceType       = serviceType;
                this.serviceName       = serviceName;
                this.rangerAdmin       = rangerAdmin;
-               this.pollingIntervalMs = pollingIntervalMs;
+               this.policyDownloadQueue = policyDownloadQueue;
 
                if(StringUtils.isEmpty(appId)) {
                        appId = serviceType;
@@ -122,20 +124,6 @@ public class PolicyRefresher extends Thread {
                return rangerAdmin;
        }
 
-       /**
-        * @return the pollingIntervalMilliSeconds
-        */
-       public long getPollingIntervalMs() {
-               return pollingIntervalMs;
-       }
-
-       /**
-        * @param pollingIntervalMilliSeconds the pollingIntervalMilliSeconds 
to set
-        */
-       public void setPollingIntervalMilliSeconds(long 
pollingIntervalMilliSeconds) {
-               this.pollingIntervalMs = pollingIntervalMilliSeconds;
-       }
-
        public long getLastActivationTimeInMillis() {
                return lastActivationTimeInMillis;
        }
@@ -168,9 +156,10 @@ public class PolicyRefresher extends Thread {
                }
 
                while(true) {
-                       loadPolicy();
                        try {
-                               Thread.sleep(pollingIntervalMs);
+                               DownloadTrigger trigger = 
policyDownloadQueue.take();
+                               loadPolicy();
+                               trigger.signalCompletion();
                        } catch(InterruptedException excp) {
                                LOG.info("PolicyRefresher(serviceName=" + 
serviceName + ").run(): interrupted! Exiting thread", excp);
                                break;

Reply via email to