Repository: incubator-ranger Updated Branches: refs/heads/master 903e0cdaf -> 7d95b475b
RANGER-1079: Retry initialization of failed tag sources Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7d95b475 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7d95b475 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7d95b475 Branch: refs/heads/master Commit: 7d95b475b55fdfe8f35e6d4cd574cad923f87a16 Parents: 903e0cd Author: Abhay Kulkarni <[email protected]> Authored: Sun Jul 3 01:00:38 2016 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Jul 7 22:08:20 2016 -0700 ---------------------------------------------------------------------- .../ranger/tagsync/model/AbstractTagSource.java | 16 +++++ .../apache/ranger/tagsync/model/TagSource.java | 4 ++ .../ranger/tagsync/process/TagSyncConfig.java | 16 +++++ .../ranger/tagsync/process/TagSynchronizer.java | 76 ++++++++++++++------ 4 files changed, 90 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java index d46170a..da4c5cb 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java @@ -27,6 +27,7 @@ import org.apache.ranger.plugin.util.ServiceTags; public abstract class AbstractTagSource implements TagSource { private static final Log LOG = LogFactory.getLog(AbstractTagSource.class); private TagSink tagSink; + private String name; @Override public void setTagSink(TagSink sink) { @@ -37,6 +38,21 @@ public abstract class AbstractTagSource implements TagSource { } } + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } + + @Override + public String toString( ) { + return this.name; + } + protected void updateSink(final ServiceTags toUpload) { if (toUpload == null) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java index 5ef6c57..484add3 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java @@ -28,6 +28,10 @@ public interface TagSource { void setTagSink(TagSink sink); + void setName(String name); + + String getName(); + boolean start(); void stop(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java index c52e0d2..d98379a 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java @@ -81,6 +81,8 @@ public class TagSyncConfig extends Configuration { private static final String TAGSYNC_TAGADMIN_KEYSTORE_PROP = "ranger.tagsync.keystore.filename"; private static final String TAGSYNC_ATLASREST_KEYSTORE_PROP = "ranger.tagsync.source.atlasrest.keystore.filename"; + private static final String TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL_PROP = "ranger.tagsync.source.retry.initialization.interval.millis"; + private static final String DEFAULT_TAGADMIN_USERNAME = "rangertagsync"; private static final String DEFAULT_TAGADMIN_PASSWORD = "rangertagsync"; private static final String DEFAULT_ATLASREST_USERNAME = "admin"; @@ -89,6 +91,7 @@ public class TagSyncConfig extends Configuration { private static final int DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL = 15000; private static final long DEFAULT_TAGSYNC_ATLASREST_SOURCE_DOWNLOAD_INTERVAL = 900000; private static final long DEFAULT_TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL = 60000; + private static final long DEFAULT_TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL = 10000; private static final String AUTH_TYPE = "hadoop.security.authentication"; private static final String NAME_RULES = "hadoop.security.auth_to_local"; @@ -383,6 +386,19 @@ public class TagSyncConfig extends Configuration { return ret; } + static public long getTagSourceRetryInitializationInterval(Properties prop) { + long ret = DEFAULT_TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL; + String val = prop.getProperty(TAGSYNC_SOURCE_RETRY_INITIALIZATION_INTERVAL_PROP); + if (StringUtils.isNotBlank(val)) { + try { + ret = Long.valueOf(val); + } catch (NumberFormatException exception) { + // Ignore + } + } + return ret; + } + private TagSyncConfig() { super(false); init() ; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7d95b475/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java index b1b17cd..349f212 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java @@ -36,13 +36,14 @@ public class TagSynchronizer { private static final Logger LOG = Logger.getLogger(TagSynchronizer.class); - private TagSink tagSink = null; - private List<TagSource> tagSources; - private Properties properties = null; - private static final String TAGSYNC_SOURCE_BASE = "ranger.tagsync.source."; private static final String PROP_CLASS_NAME = "class"; + private TagSink tagSink = null; + private List<TagSource> tagSources = new ArrayList<TagSource>(); + private List<TagSource> failedTagSources = new ArrayList<TagSource>(); + private Properties properties = null; + private final Object shutdownNotifier = new Object(); private volatile boolean isShutdownInProgress = false; @@ -103,12 +104,7 @@ public class TagSynchronizer { tagSink = initializeTagSink(properties); if (tagSink != null) { - - tagSources = initializeTagSources(properties); - - for (TagSource tagSource : tagSources) { - tagSource.setTagSink(tagSink); - } + initializeTagSources(); ret = true; } @@ -134,9 +130,14 @@ public class TagSynchronizer { } if (threadsStarted) { + long tagSourceRetryInitializationInterval = TagSyncConfig.getTagSourceRetryInitializationInterval(properties); + synchronized(shutdownNotifier) { while(! isShutdownInProgress) { - shutdownNotifier.wait(); + shutdownNotifier.wait(tagSourceRetryInitializationInterval); + if (CollectionUtils.isNotEmpty(failedTagSources)) { + reInitializeFailedTagSources(); + } } } } @@ -213,13 +214,11 @@ public class TagSynchronizer { return ret; } - static public List<TagSource> initializeTagSources(Properties properties) { + private void initializeTagSources() { if (LOG.isDebugEnabled()) { LOG.debug("==> TagSynchronizer.initializeTagSources()"); } - List<TagSource> ret = new ArrayList<TagSource>(); - List<String> tagSourceNameList = new ArrayList<String>(); for (Object propNameObj : properties.keySet()) { @@ -253,29 +252,60 @@ public class TagSynchronizer { try { if (!tagSource.initialize(properties)) { LOG.error("Failed to initialize TAG source " + tagSourceName); - ret.clear(); - break; + failedTagSources.add(tagSource); } else { - ret.add(tagSource); + tagSource.setTagSink(tagSink); + tagSources.add(tagSource); initializedTagSourceNameList.add(tagSourceName); } } catch(Exception exception) { LOG.error("tag-source:" + tagSourceName + " initialization failed with ", exception); - ret.clear(); - break; + failedTagSources.add(tagSource); } } } - if (CollectionUtils.isEmpty(initializedTagSourceNameList)) { + if (CollectionUtils.isEmpty(tagSources)) { LOG.warn("TagSync is not configured for any tag-sources. No tags will be received by TagSync."); LOG.warn("Please recheck configuration properties and tagsync environment to ensure that this is correct."); } if (LOG.isDebugEnabled()) { - LOG.debug("<== TagSynchronizer.initializeTagSources(" + initializedTagSourceNameList + ")"); + LOG.debug("<== TagSynchronizer.initializeTagSources(initilaizedTagSources=" + initializedTagSourceNameList + + ", failedTagSources=" + failedTagSources + ")"); + } + } + + private void reInitializeFailedTagSources() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagSynchronizer.reInitializeFailedTagSources()"); + } + + for (int index = 0; index < failedTagSources.size(); index++) { + TagSource tagSource = failedTagSources.get(index); + try { + if (tagSource.initialize(properties)) { + failedTagSources.remove(index); + --index; + tagSources.add(tagSource); + tagSource.setTagSink(tagSink); + if (tagSource.start()) { + tagSources.add(tagSource); + } else { + LOG.error("Failed to start tagSource: " + tagSource); + } + } else { + LOG.error("Failed to initialize TAG source " + tagSource); + } + } catch (Exception exception) { + LOG.error("tag-source:" + tagSource + " initialization failed with ", exception); + } + + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagSynchronizer.reInitializeFailedTagSources()"); } - return ret; } static private TagSource getTagSourceFromConfig(Properties props, @@ -304,6 +334,7 @@ public class TagSynchronizer { if (LOG.isDebugEnabled()) { LOG.debug("Created instance of " + className); } + tagSource.setName(tagSourceName); } catch (Exception e) { LOG.fatal("Can't instantiate tagSource class for tagSourceName=" + tagSourceName + ", className=" + className @@ -337,4 +368,5 @@ public class TagSynchronizer { } return list; } + }
