Repository: incubator-ranger
Updated Branches:
  refs/heads/master 903e0cdaf -> 7d95b475b


RANGER-1079: Retry initialization of failed tag sources

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7d95b475
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7d95b475
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7d95b475

Branch: refs/heads/master
Commit: 7d95b475b55fdfe8f35e6d4cd574cad923f87a16
Parents: 903e0cd
Author: Abhay Kulkarni <[email protected]>
Authored: Sun Jul 3 01:00:38 2016 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Thu Jul 7 22:08:20 2016 -0700

----------------------------------------------------------------------
 .../ranger/tagsync/model/AbstractTagSource.java | 16 +++++
 .../apache/ranger/tagsync/model/TagSource.java  |  4 ++
 .../ranger/tagsync/process/TagSyncConfig.java   | 16 +++++
 .../ranger/tagsync/process/TagSynchronizer.java | 76 ++++++++++++++------
 4 files changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index d46170a..da4c5cb 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -27,6 +27,7 @@ import org.apache.ranger.plugin.util.ServiceTags;
 public abstract  class AbstractTagSource implements TagSource {
        private static final Log LOG = 
LogFactory.getLog(AbstractTagSource.class);
        private TagSink tagSink;
+       private String name;
 
        @Override
        public void setTagSink(TagSink sink) {
@@ -37,6 +38,21 @@ public abstract  class AbstractTagSource implements 
TagSource {
                }
        }
 
+       @Override
+       public void setName(String name) {
+               this.name = name;
+       }
+
+       @Override
+       public String getName() {
+               return name;
+       }
+
+       @Override
+       public String toString( ) {
+               return this.name;
+       }
+
        protected void updateSink(final ServiceTags toUpload) {
                if (toUpload == null) {
                        if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
index 5ef6c57..484add3 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
@@ -28,6 +28,10 @@ public interface TagSource {
 
        void setTagSink(TagSink sink);
 
+       void setName(String name);
+
+       String getName();
+
        boolean start();
 
        void stop();

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index c52e0d2..d98379a 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -81,6 +81,8 @@ public class TagSyncConfig extends Configuration {
        private static final String TAGSYNC_TAGADMIN_KEYSTORE_PROP = 
"ranger.tagsync.keystore.filename";
        private static final String TAGSYNC_ATLASREST_KEYSTORE_PROP = 
"ranger.tagsync.source.atlasrest.keystore.filename";
 
+       private static final String 
TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL_PROP = 
"ranger.tagsync.source.retry.initialization.interval.millis";
+
        private static final String DEFAULT_TAGADMIN_USERNAME = "rangertagsync";
        private static final String DEFAULT_TAGADMIN_PASSWORD = "rangertagsync";
        private static final String DEFAULT_ATLASREST_USERNAME = "admin";
@@ -89,6 +91,7 @@ public class TagSyncConfig extends Configuration {
        private static final int 
DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL = 15000;
        private static final long 
DEFAULT_TAGSYNC_ATLASREST_SOURCE_DOWNLOAD_INTERVAL = 900000;
        private static final long 
DEFAULT_TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL = 60000;
+       private static final long 
DEFAULT_TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL = 10000;
 
        private static final String AUTH_TYPE = 
"hadoop.security.authentication";
        private static final String NAME_RULES = 
"hadoop.security.auth_to_local";
@@ -383,6 +386,19 @@ public class TagSyncConfig extends Configuration {
                return ret;
        }
 
+       static public long getTagSourceRetryInitializationInterval(Properties 
prop) {
+               long ret = DEFAULT_TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL;
+               String val = 
prop.getProperty(TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL_PROP);
+               if (StringUtils.isNotBlank(val)) {
+                       try {
+                               ret = Long.valueOf(val);
+                       } catch (NumberFormatException exception) {
+                               // Ignore
+                       }
+               }
+               return ret;
+       }
+
        private TagSyncConfig() {
                super(false);
                init() ;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index b1b17cd..349f212 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -36,13 +36,14 @@ public class TagSynchronizer {
 
        private static final Logger LOG = 
Logger.getLogger(TagSynchronizer.class);
 
-       private TagSink tagSink = null;
-       private List<TagSource> tagSources;
-       private Properties properties = null;
-
        private static final String TAGSYNC_SOURCE_BASE = 
"ranger.tagsync.source.";
        private static final String PROP_CLASS_NAME = "class";
 
+       private TagSink tagSink = null;
+       private List<TagSource> tagSources = new ArrayList<TagSource>();
+       private List<TagSource> failedTagSources = new ArrayList<TagSource>();
+       private Properties properties = null;
+
        private final Object shutdownNotifier = new Object();
        private volatile boolean isShutdownInProgress = false;
 
@@ -103,12 +104,7 @@ public class TagSynchronizer {
                tagSink = initializeTagSink(properties);
 
                if (tagSink != null) {
-
-                       tagSources = initializeTagSources(properties);
-
-                       for (TagSource tagSource : tagSources) {
-                               tagSource.setTagSink(tagSink);
-                       }
+                       initializeTagSources();
                        ret = true;
                }
 
@@ -134,9 +130,14 @@ public class TagSynchronizer {
                        }
 
                        if (threadsStarted) {
+                               long tagSourceRetryInitializationInterval = 
TagSyncConfig.getTagSourceRetryInitializationInterval(properties);
+
                                synchronized(shutdownNotifier) {
                                        while(! isShutdownInProgress) {
-                                               shutdownNotifier.wait();
+                                               
shutdownNotifier.wait(tagSourceRetryInitializationInterval);
+                                               if 
(CollectionUtils.isNotEmpty(failedTagSources)) {
+                                                       
reInitializeFailedTagSources();
+                                               }
                                        }
                                }
                        }
@@ -213,13 +214,11 @@ public class TagSynchronizer {
                return ret;
        }
 
-       static public List<TagSource> initializeTagSources(Properties 
properties) {
+       private void initializeTagSources() {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> TagSynchronizer.initializeTagSources()");
                }
 
-               List<TagSource> ret = new ArrayList<TagSource>();
-
                List<String> tagSourceNameList = new ArrayList<String>();
 
                for (Object propNameObj : properties.keySet()) {
@@ -253,29 +252,60 @@ public class TagSynchronizer {
                                try {
                                        if (!tagSource.initialize(properties)) {
                                                LOG.error("Failed to initialize 
TAG source " + tagSourceName);
-                                               ret.clear();
-                                               break;
+                                               failedTagSources.add(tagSource);
                                        } else {
-                                               ret.add(tagSource);
+                                               tagSource.setTagSink(tagSink);
+                                               tagSources.add(tagSource);
                                                
initializedTagSourceNameList.add(tagSourceName);
                                        }
                                } catch(Exception exception) {
                                        LOG.error("tag-source:" + tagSourceName 
+ " initialization failed with ", exception);
-                                       ret.clear();
-                                       break;
+                                       failedTagSources.add(tagSource);
                                }
                        }
                }
 
-               if (CollectionUtils.isEmpty(initializedTagSourceNameList)) {
+               if (CollectionUtils.isEmpty(tagSources)) {
                        LOG.warn("TagSync is not configured for any 
tag-sources. No tags will be received by TagSync.");
                        LOG.warn("Please recheck configuration properties and 
tagsync environment to ensure that this is correct.");
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagSynchronizer.initializeTagSources(" + 
initializedTagSourceNameList + ")");
+                       LOG.debug("<== 
TagSynchronizer.initializeTagSources(initilaizedTagSources=" + 
initializedTagSourceNameList
+                                       + ", failedTagSources=" + 
failedTagSources + ")");
+               }
+       }
+
+       private void reInitializeFailedTagSources() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
TagSynchronizer.reInitializeFailedTagSources()");
+               }
+
+               for (int index = 0; index < failedTagSources.size(); index++) {
+                       TagSource tagSource = failedTagSources.get(index);
+                       try {
+                               if (tagSource.initialize(properties)) {
+                                       failedTagSources.remove(index);
+                                       --index;
+                                       tagSources.add(tagSource);
+                                       tagSource.setTagSink(tagSink);
+                                       if (tagSource.start()) {
+                                               tagSources.add(tagSource);
+                                       } else {
+                                               LOG.error("Failed to start 
tagSource: " + tagSource);
+                                       }
+                               } else {
+                                       LOG.error("Failed to initialize TAG 
source " + tagSource);
+                               }
+                       } catch (Exception exception) {
+                               LOG.error("tag-source:" + tagSource + " 
initialization failed with ", exception);
+                       }
+
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
TagSynchronizer.reInitializeFailedTagSources()");
                }
-               return ret;
        }
 
        static private TagSource getTagSourceFromConfig(Properties props,
@@ -304,6 +334,7 @@ public class TagSynchronizer {
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Created instance of " + 
className);
                                }
+                               tagSource.setName(tagSourceName);
                        } catch (Exception e) {
                                LOG.fatal("Can't instantiate tagSource class 
for tagSourceName="
                                                + tagSourceName + ", 
className=" + className
@@ -337,4 +368,5 @@ public class TagSynchronizer {
                }
                return list;
        }
+
 }

Reply via email to