RANGER-660: ranger-tagsync configuration validation

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/757d1eb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/757d1eb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/757d1eb2

Branch: refs/heads/master
Commit: 757d1eb211922be7f09cfb773d839f23ef98f147
Parents: 22859f5
Author: Abhay Kulkarni <[email protected]>
Authored: Wed Oct 21 14:54:39 2015 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Tue Oct 27 17:03:51 2015 -0700

----------------------------------------------------------------------
 .../conf/templates/installprop2xml.properties   |   2 +-
 .../conf/templates/ranger-tagsync-template.xml  |   2 +-
 tagsync/scripts/install.properties              |   3 +-
 tagsync/scripts/ranger-tagsync-services.sh      |   6 +
 tagsync/scripts/setup.py                        |   1 -
 .../apache/ranger/tagsync/model/TagSink.java    |   3 +-
 .../ranger/tagsync/process/TagSyncConfig.java   |  23 +-
 .../ranger/tagsync/process/TagSynchronizer.java | 207 +++++----
 .../tagsync/sink/tagadmin/TagRESTSink.java      | 412 ++---------------
 .../tagsync/source/atlas/AtlasUtility.java      | 404 +++++++++++++++++
 .../tagsync/source/atlas/TagAtlasSource.java    | 442 ++-----------------
 .../tagsync/source/file/TagFileSource.java      |  21 +-
 .../main/resources/ranger-tagsync-default.xml   |  18 +-
 .../src/main/resources/ranger-tagsync-site.xml  |  57 +++
 .../tagsync/process/TestTagSynchronizer.java    |  29 +-
 15 files changed, 693 insertions(+), 937 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/conf/templates/installprop2xml.properties
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/installprop2xml.properties 
b/tagsync/conf/templates/installprop2xml.properties
index 94618fc..5b63835 100644
--- a/tagsync/conf/templates/installprop2xml.properties
+++ b/tagsync/conf/templates/installprop2xml.properties
@@ -30,7 +30,7 @@ TAGADMIN_SSL_CONFIG_FILENAME = 
ranger.tagsync.tagadmin.rest.ssl.config.file
 TAGSYNC_KEYSTORE_FILENAME = ranger.tagsync.tagadmin.keystore
 
 
-SYNC_INTERVAL = ranger.tagsync.sleeptimeinmillisbetweensynccycle
+TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 
ranger.tagsync.filesource.modtime.check.interval
 
 TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/conf/templates/ranger-tagsync-template.xml
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/ranger-tagsync-template.xml 
b/tagsync/conf/templates/ranger-tagsync-template.xml
index ebee22d..9a88681 100644
--- a/tagsync/conf/templates/ranger-tagsync-template.xml
+++ b/tagsync/conf/templates/ranger-tagsync-template.xml
@@ -28,7 +28,7 @@
                <value></value>
        </property>
        <property>
-               <name>ranger.tagsync.sleeptimeinmillisbetweensynccycle</name>
+               <name>ranger.tagsync.filesource.modtime.check.interval</name>
                <value></value>
        </property>
        <property>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/scripts/install.properties
----------------------------------------------------------------------
diff --git a/tagsync/scripts/install.properties 
b/tagsync/scripts/install.properties
index 6b36846..f7de6e3 100644
--- a/tagsync/scripts/install.properties
+++ b/tagsync/scripts/install.properties
@@ -47,8 +47,7 @@ TAGSYNC_FILESOURCE_FILENAME = /etc/ranger/data/tags.json
 
 
 # Interval for checking the source for any changes in case of TAG_SOURCE = file
-# Also used for periodicity of checking if the process needs to be shut down
-SYNC_INTERVAL = 60000
+TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 60000
 
 # Endpoint specifications needed by Atlas
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/scripts/ranger-tagsync-services.sh
----------------------------------------------------------------------
diff --git a/tagsync/scripts/ranger-tagsync-services.sh 
b/tagsync/scripts/ranger-tagsync-services.sh
index e818d0d..ca82ead 100755
--- a/tagsync/scripts/ranger-tagsync-services.sh
+++ b/tagsync/scripts/ranger-tagsync-services.sh
@@ -82,10 +82,16 @@ if [ "${action}" == "START" ]; then
 
        if [ "${pid}" != "" ]
        then
+               if [ -z "`ps axf | grep ${pid} | grep -v grep`" ]; then
+                       rm -f ${pidf}
+                       echo "Ranger Tagsync Service failed to start. Please 
refer to log files under ${logdir} for further details."
+               else
                echo "Ranger Tagsync Service has started successfully."
+        fi
        else
                echo "Ranger Tagsync Service failed to start. Please refer to 
log files under ${logdir} for further details."
        fi
+
        exit;
 
 elif [ "${action}" == "STOP" ]; then

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/scripts/setup.py
----------------------------------------------------------------------
diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py
index 2721186..e4b2433 100755
--- a/tagsync/scripts/setup.py
+++ b/tagsync/scripts/setup.py
@@ -69,7 +69,6 @@ rootOwnerId = 0
 initPrefixList = ['S99', 'K00']
 
 TAG_SOURCE_KEY  = 'TAG_SOURCE'
-SYNC_INTERVAL_NEW_KEY = 'ranger.tagsync.sleeptimeinmillisbetweensynccycle'
 TAGSYNC_ATLAS_KAFKA_ENDPOINTS_KEY = 'TAGSYNC_ATLAS_KAFKA_ENDPOINTS'
 TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT_KEY = 'TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT'
 TAGSYNC_ATLAS_CONSUMER_GROUP_KEY = 'TAGSYNC_ATLAS_CONSUMER_GROUP'

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
index 6565159..9eb5319 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
@@ -19,13 +19,12 @@
 
 package org.apache.ranger.tagsync.model;
 
-import org.apache.ranger.plugin.store.TagStore;
 import org.apache.ranger.plugin.util.ServiceTags;
 
 import java.util.Properties;
 
 
-public interface TagSink extends TagStore {
+public interface TagSink {
        boolean initialize(Properties properties);
        void uploadServiceTags(ServiceTags serviceTags) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index bfd1b8b..e1b5130 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -45,7 +45,7 @@ public class TagSyncConfig extends Configuration {
 
        private static final String TAGSYNC_FILESOURCE_FILENAME_PROP = 
"ranger.tagsync.filesource.filename";
 
-       private static final String 
TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP = 
"ranger.tagsync.sleeptimeinmillisbetweensynccycle";
+       private static final String 
TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP = 
"ranger.tagsync.filesource.modtime.check.interval";
 
        private static final String TAGSYNC_SOURCE_CLASS_PROP = 
"ranger.tagsync.source.impl.class";
 
@@ -64,20 +64,7 @@ public class TagSyncConfig extends Configuration {
        private static final String TAGSYNC_TAGADMIN_PASSWORD_PROP = 
"ranger.tagsync.tagadmin.password";
        private static final String DEFAULT_TAGADMIN_USERNAME = "rangertagsync";
 
-       private static volatile TagSyncConfig instance = null;
-
        public static TagSyncConfig getInstance() {
-       /*
-               TagSyncConfig ret = instance;
-               if (ret == null) {
-                       synchronized(TagSyncConfig.class) {
-                               if (ret == null) {
-                                       ret = instance = new TagSyncConfig();
-                                       LOG.debug("TagSyncConfig = {" + ret + 
"}");
-                               }
-                       }
-               }
-       */
                TagSyncConfig newConfig = new TagSyncConfig();
                return newConfig;
        }
@@ -179,8 +166,8 @@ public class TagSyncConfig extends Configuration {
                return val;
        }
 
-       static public long getSleepTimeInMillisBetweenCycle(Properties prop) {
-               String val = 
prop.getProperty(TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP);
+       static public long 
getTagSourceFileModTimeCheckIntervalInMillis(Properties prop) {
+               String val = 
prop.getProperty(TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP);
                return Long.valueOf(val);
        }
 
@@ -194,6 +181,10 @@ public class TagSyncConfig extends Configuration {
                        return val;
        }
 
+       static public String getTagSource(Properties prop) {
+               return prop.getProperty(TAGSYNC_SOURCE_CLASS_PROP);
+       }
+
        static public String getTagSinkClassName(Properties prop) {
                String val = prop.getProperty(TAGSYNC_SINK_CLASS_PROP);
                if (StringUtils.equalsIgnoreCase(val, "tagadmin")) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 0235581..7bae973 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -20,6 +20,7 @@
 package org.apache.ranger.tagsync.process;
 
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
@@ -27,165 +28,163 @@ import org.apache.ranger.tagsync.model.TagSource;
 import java.util.Map;
 import java.util.Properties;
 
-public class TagSynchronizer implements Runnable {
+public class TagSynchronizer {
 
        private static final Logger LOG = 
Logger.getLogger(TagSynchronizer.class);
 
-       private final static int MAX_INIT_RETRIES = 5;
-
        private boolean shutdownFlag = false;
-       private TagSink tagSink = null;
        private TagSource tagSource = null;
        private Properties properties = null;
 
-
        public static void main(String[] args) {
 
-               TagSyncConfig config = TagSyncConfig.getInstance();
-               Properties props = config.getProperties();
+               boolean tagSynchronizerInitialized = false;
+               TagSynchronizer tagSynchronizer = new TagSynchronizer();
+
+               while (!tagSynchronizerInitialized) {
+
+                       TagSyncConfig config = TagSyncConfig.getInstance();
+                       Properties props = config.getProperties();
+
+                       tagSynchronizer.setProperties(props);
+                       tagSynchronizerInitialized = 
tagSynchronizer.initialize();
 
-               TagSynchronizer tagSynchronizer = new TagSynchronizer(props);
+                       if (!tagSynchronizerInitialized) {
+                               LOG.error("TagSynchronizer failed to initialize 
correctly");
+
+                               try {
+                                       LOG.error("Sleeping for [60] seconds 
before attempting to re-read configuration XML files");
+                                       Thread.sleep(60 * 1000);
+                               } catch (InterruptedException e) {
+                                       LOG.error("Failed to wait for [60] 
seconds", e);
+                               }
+                       }
+               }
 
                tagSynchronizer.run();
        }
 
+       public TagSynchronizer() {
+               setProperties(null);
+       }
+
        public TagSynchronizer(Properties properties) {
+               setProperties(properties);
+       }
+
+       public void setProperties(Properties properties) {
                if (properties == null || MapUtils.isEmpty(properties)) {
-                       LOG.error("TagSynchronizer initialized with null 
properties!");
                        this.properties = new Properties();
                } else {
                        this.properties = properties;
                }
        }
 
-       public TagSink getTagSink() {
-               return tagSink;
-       }
-
-       public TagSource getTagSource() {
-               return tagSource;
-       }
+       public boolean initialize() {
 
-       @Override
-       public void run() {
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagSynchronizer.run()");
+                       LOG.debug("==> TagSynchronizer.initialize()");
                }
-               try {
-                       long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
-
-                       boolean initDone = initLoop();
 
-                       if (initDone) {
+               printConfigurationProperties();
 
-                               Thread tagSourceThread = tagSource.start();
+               boolean ret = true;
 
-                               if (tagSourceThread != null) {
-                                       while (!shutdownFlag) {
-                                               try {
-                                                       LOG.debug("Sleeping for 
[" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
-                                                       
Thread.sleep(sleepTimeBetweenCycleInMillis);
-                                               } catch (InterruptedException 
e) {
-                                                       LOG.error("Failed to 
wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting 
to synchronize tag information", e);
-                                               }
-                                       }
-                                       if (shutdownFlag) {
-                                               LOG.info("Interrupting 
tagSourceThread...");
-                                               tagSourceThread.interrupt();
-                                               try {
-                                                       tagSourceThread.join();
-                                               } catch (InterruptedException 
interruptedException) {
-                                                       
LOG.error("tagSourceThread.join() was interrupted");
-                                               }
-                                       }
-                               } else {
-                                       LOG.error("Could not start tagSource 
monitoring thread");
-                               }
-                       } else {
-                               LOG.error("Failed to initialize TagSynchonizer 
after " + MAX_INIT_RETRIES + " retries. Exiting thread");
-                       }
+               String tagSourceName = TagSyncConfig.getTagSource(properties);
 
-               } catch (Throwable t) {
-                       LOG.error("tag-sync thread got an error", t);
-               } finally {
-                       LOG.error("Shutting down the tag-sync thread");
+               if (StringUtils.isBlank(tagSourceName) ||
+                               (!StringUtils.equalsIgnoreCase(tagSourceName, 
"file") && !StringUtils.equalsIgnoreCase(tagSourceName, "atlas"))) {
+                       ret = false;
+                       LOG.error("'ranger.tagsync.source.impl.class' value is 
invalid!, 'ranger.tagsync.source.impl.class'=" + tagSourceName + ". Supported 
'ranger.tagsync.source.impl.class' values are : file, atlas");
                }
 
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagSynchronizer.run()");
-               }
-       }
+               if (ret) {
 
-       public boolean initLoop() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagSynchronizer.initLoop()");
-               }
-               boolean ret = false;
+                       try {
+                               LOG.info("Initializing TAG source and sink");
+                               // Initialize tagSink and tagSource
+                               String tagSourceClassName = 
TagSyncConfig.getTagSourceClassName(properties);
+                               String tagSinkClassName = 
TagSyncConfig.getTagSinkClassName(properties);
 
-               long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("tagSourceClassName=" + 
tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName);
+                               }
 
-               for (int initRetries = 0; initRetries < MAX_INIT_RETRIES && 
!ret; initRetries++) {
+                               @SuppressWarnings("unchecked")
+                               Class<TagSource> tagSourceClass = 
(Class<TagSource>) Class.forName(tagSourceClassName);
 
-                       printConfigurationProperties();
+                               @SuppressWarnings("unchecked")
+                               Class<TagSink> tagSinkClass = (Class<TagSink>) 
Class.forName(tagSinkClassName);
 
-                       ret = init();
+                               TagSink tagSink = tagSinkClass.newInstance();
+                               tagSource = tagSourceClass.newInstance();
 
-                       if (!ret) {
-                               LOG.error("Failed to initialize TAG 
source/sink. Will retry after " + sleepTimeBetweenCycleInMillis + " 
milliseconds.");
-                               try {
-                                       LOG.debug("Sleeping for [" + 
sleepTimeBetweenCycleInMillis + "] milliSeconds");
-                                       
Thread.sleep(sleepTimeBetweenCycleInMillis);
-                                       properties = 
TagSyncConfig.getInstance().getProperties();
-                               } catch (Exception e) {
-                                       LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to initialize 
tag source/sink", e);
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Created instance of " + 
tagSourceClassName + ", " + tagSinkClassName);
                                }
+
+                               ret = tagSink.initialize(properties) && 
tagSource.initialize(properties);
+
+                               if (ret) {
+                                       tagSource.setTagSink(tagSink);
+                               }
+
+                               LOG.info("Done initializing TAG source and 
sink");
+                       } catch (Throwable t) {
+                               LOG.error("Failed to initialize TAG 
source/sink. Error details: ", t);
+                               ret = false;
                        }
                }
+
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagSynchronizer.initLoop()");
+                       LOG.debug("<== TagSynchronizer.initialize(), result=" + 
ret);
                }
+
                return ret;
        }
 
-       public boolean init() {
-
+       public void run() {
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagSynchronizer.init()");
+                       LOG.debug("==> TagSynchronizer.run()");
                }
-               boolean ret = false;
-               try {
-                       LOG.info("Initializing TAG source and sink");
-                       // Initialize tagSink and tagSource
-                       String tagSourceClassName = 
TagSyncConfig.getTagSourceClassName(properties);
-                       String tagSinkClassName = 
TagSyncConfig.getTagSinkClassName(properties);
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("tagSourceClassName=" + 
tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName);
-                       }
 
-                       Class<TagSource> tagSourceClass = (Class<TagSource>) 
Class.forName(tagSourceClassName);
-                       Class<TagSink> tagSinkClass = (Class<TagSink>) 
Class.forName(tagSinkClassName);
+               long shutdownCheckIntervalInMs = 60*1000;
 
-                       tagSink = tagSinkClass.newInstance();
-                       tagSource = tagSourceClass.newInstance();
+               Thread tagSourceThread = null;
 
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Created instance of " + 
tagSourceClassName + ", " + tagSinkClassName);
+               try {
+                       tagSourceThread = tagSource.start();
+
+                       if (tagSourceThread != null) {
+                               while (!shutdownFlag) {
+                                       try {
+                                               LOG.debug("Sleeping for [" + 
shutdownCheckIntervalInMs + "] milliSeconds");
+                                               
Thread.sleep(shutdownCheckIntervalInMs);
+                                       } catch (InterruptedException e) {
+                                               LOG.error("Failed to wait for 
[" + shutdownCheckIntervalInMs + "] milliseconds before attempting to 
synchronize tag information", e);
+                                       }
+                               }
                        }
-
-                       ret = tagSink.initialize(properties) && 
tagSource.initialize(properties);
-
-                       tagSource.setTagSink(tagSink);
-
-                       LOG.info("Done initializing TAG source and sink");
                } catch (Throwable t) {
-                       LOG.error("Failed to initialize TAG source/sink. Error 
details: ", t);
+                       LOG.error("tag-sync thread got an error", t);
+               } finally {
+                       if (tagSourceThread != null) {
+                               LOG.error("Shutting down the tag-sync thread");
+                               LOG.info("Interrupting tagSourceThread...");
+                               tagSourceThread.interrupt();
+                               try {
+                                       tagSourceThread.join();
+                               } catch (InterruptedException 
interruptedException) {
+                                       LOG.error("tagSourceThread.join() was 
interrupted");
+                               }
+                       } else {
+                               LOG.error("Could not start tagSource monitoring 
thread");
+                       }
                }
+
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagSynchronizer.init(), result=" + ret);
+                       LOG.debug("<== TagSynchronizer.run()");
                }
-
-               return ret;
        }
 
        public void shutdown(String reason) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
index 76bb62d..41085d0 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
@@ -28,15 +28,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.admin.client.datatype.RESTResponse;
 import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.plugin.model.*;
-import org.apache.ranger.plugin.store.PList;
-import org.apache.ranger.plugin.store.ServiceStore;
 import org.apache.ranger.plugin.util.RangerRESTClient;
 import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -47,27 +43,18 @@ public class TagRESTSink implements TagSink {
        private static final String MODULE_PREFIX = "/tags";
 
        private static final String REST_MIME_TYPE_JSON = "application/json" ;
-       private static final String REST_URL_TAGDEFS_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tagdefs/" ;
-       private static final String REST_URL_TAGDEF_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tagdef/" ;
-       private static final String REST_URL_SERVICERESOURCES_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "resources/" ;
-       private static final String REST_URL_SERVICERESOURCE_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "resource/" ;
-       private static final String REST_URL_TAGS_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tags/" ;
-       private static final String REST_URL_TAG_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tag/" ;
-       private static final String REST_URL_TAGRESOURCEMAP_IDS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/tagresourcemapids/";
+
        private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
 
        private RangerRESTClient tagRESTClient = null;
 
        @Override
-       public void init() {}
-
-       @Override
        public boolean initialize(Properties properties) {
                if(LOG.isDebugEnabled()) {
                        LOG.debug("==> TagRESTSink.initialize()");
                }
 
-               boolean ret = false;
+               boolean ret = true;
 
                String restUrl       = 
TagSyncConfig.getTagAdminRESTUrl(properties);
                String sslConfigFile = 
TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
@@ -78,340 +65,59 @@ public class TagRESTSink implements TagSink {
                        LOG.debug("restUrl=" + restUrl);
                        LOG.debug("sslConfigFile=" + sslConfigFile);
                        LOG.debug("userName=" + userName);
-                       LOG.debug("password=" + password);
-               }
-               tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
-               if (tagRESTClient != null) {
-                       tagRESTClient.setBasicAuthInfo(userName, password);
-                       ret = true;
-               } else {
-                       LOG.error("Could not create RangerRESTClient");
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagRESTSink.initialize(), result=" + 
ret);
-               }
-               return ret;
-       }
-
-       @Override
-       public void setServiceStore(ServiceStore svcStore) {
-
-       }
-
-       @Override
-       public RangerTagDef createTagDef(RangerTagDef tagDef) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> createTagDef(" + tagDef + ")");
-               }
-
-               RangerTagDef ret = null;
-
-               WebResource webResource = 
createWebResource(REST_URL_TAGDEFS_RESOURCE);
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class,
 tagRESTClient.toJson(tagDef));
-
-               if(response != null && response.getStatus() == 200) {
-                       ret = response.getEntity(RangerTagDef.class);
-               } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== createTagDef(" + tagDef + "): " + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public RangerTagDef updateTagDef(RangerTagDef TagDef) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public void deleteTagDefByName(String name) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public void deleteTagDef(Long id) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> deleteTagDef(" + id  + ")");
-               }
-               WebResource webResource = 
createWebResource(REST_URL_TAGDEF_RESOURCE + Long.toString(id));
-
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
-
-               if(response != null && response.getStatus() == 204) {
-               } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== deleteTagDef(" + id + ")");
-               }
-       }
-
-       @Override
-       public RangerTagDef getTagDef(Long id) throws Exception {
-               throw new Exception("Not implemented");
-
-       }
-
-       @Override
-       public RangerTagDef getTagDefByGuid(String guid) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public RangerTagDef getTagDefByName(String name) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTagDef> getTagDefs(SearchFilter filter) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public PList<RangerTagDef> getPaginatedTagDefs(SearchFilter filter) 
throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<String> getTagTypes() throws Exception {
-               // TODO Auto-generated method stub
-               return null;
-       }
-
-
-       @Override
-       public RangerTag createTag(RangerTag tag) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> createTag(" + tag + ")");
-               }
-
-               RangerTag ret = null;
-
-               WebResource webResource = 
createWebResource(REST_URL_TAGS_RESOURCE);
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class,
 tagRESTClient.toJson(tag));
-
-               if(response != null && response.getStatus() == 200) {
-                       ret = response.getEntity(RangerTag.class);
-               } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== createTag(" + tag + "): " + ret);
                }
 
-               return ret;
-       }
-
-       @Override
-       public RangerTag updateTag(RangerTag tag) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public void deleteTag(Long id) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> deleteTag(" + id  + ")");
-               }
-               WebResource webResource = 
createWebResource(REST_URL_TAG_RESOURCE + Long.toString(id));
-
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
-
-               if(response != null && response.getStatus() == 204) {
+               if (StringUtils.isBlank(restUrl)) {
+                       ret = false;
+                       LOG.error("No value specified for property 
'ranger.tagsync.tagadmin.rest.url'!");
                } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== deleteTag(" + id + ")");
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("ranger.tagsync.tagadmin.rest.url:" + 
restUrl);
+                       }
                }
-       }
-
-       @Override
-       public RangerTag getTag(Long id) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public RangerTag getTagByGuid(String guid) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTag> getTagsByType(String name) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<Long> getTagIdsForResourceId(Long resourceId) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTag> getTagsForResourceId(Long resourceId) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTag> getTagsForResourceGuid(String resourceGuid) 
throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTag> getTags(SearchFilter filter) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public PList<RangerTag> getPaginatedTags(SearchFilter filter) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
 
+               if (ret) {
+                       tagRESTClient = new RangerRESTClient(restUrl, 
sslConfigFile);
+                       tagRESTClient.setBasicAuthInfo(userName, password);
 
-       @Override
-       public RangerServiceResource 
createServiceResource(RangerServiceResource resource) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> createServiceResource(" + resource + 
")");
+                       ret = testConnection();
                }
 
-               RangerServiceResource ret = null;
-
-               WebResource webResource = 
createWebResource(REST_URL_SERVICERESOURCES_RESOURCE);
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class,
 tagRESTClient.toJson(resource));
-
-               if(response != null && response.getStatus() == 200) {
-                       ret = response.getEntity(RangerServiceResource.class);
-               } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
+               if (!ret) {
+                       LOG.error("Cannot connect to Tag Admin. Please recheck 
configuration properties and/or check if Tag Admin is running and responsive");
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== createServiceResource(" + resource + "): 
" + ret);
+                       LOG.debug("<== TagRESTSink.initialize(), result=" + 
ret);
                }
 
                return ret;
        }
 
-       @Override
-       public RangerServiceResource 
updateServiceResource(RangerServiceResource resource) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public void deleteServiceResource(Long id) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> deleteServiceResource(" + id  + ")");
-               }
-               WebResource webResource = 
createWebResource(REST_URL_SERVICERESOURCE_RESOURCE + Long.toString(id));
-
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
-
-               if(response != null && response.getStatus() == 204) {
-               } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== deleteServiceResource(" + id + ")");
-               }
-       }
-
-       @Override
-       public RangerServiceResource getServiceResource(Long id) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public RangerServiceResource getServiceResourceByGuid(String guid) 
throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerServiceResource> getServiceResourcesByService(String 
serviceName) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public RangerServiceResource 
getServiceResourceByResourceSignature(String resourceSignature) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerServiceResource> getServiceResources(SearchFilter 
filter) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public PList<RangerServiceResource> 
getPaginatedServiceResources(SearchFilter filter) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-
-       @Override
-       public RangerTagResourceMap createTagResourceMap(RangerTagResourceMap 
tagResourceMap) throws Exception {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> createTagResourceMap(" + tagResourceMap 
+ ")");
+       public boolean testConnection() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagRESTSink.testConnection()");
                }
 
-               RangerTagResourceMap ret = null;
-
-               WebResource webResource = 
createWebResource(REST_URL_TAGRESOURCEMAP_IDS_RESOURCE)
-                               .queryParam("tag-id", 
Long.toString(tagResourceMap.getTagId()))
-                               .queryParam("resource-id", 
Long.toString(tagResourceMap.getResourceId()));
-
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class);
+               boolean ret = true;
 
-               if(response != null && response.getStatus() == 200) {
-                       ret = response.getEntity(RangerTagResourceMap.class);
-               } else {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       throw new Exception(resp.getMessage());
+               try {
+                       // build a dummy serviceTags structure and upload it to 
test connectivity
+                       ServiceTags serviceTags = new ServiceTags();
+                       serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+                       uploadServiceTags(serviceTags);
+               } catch (Exception exception) {
+                       LOG.error("test-upload of serviceTags failed.", 
exception);
+                       ret = false;
                }
 
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== createTagResourceMap(" + tagResourceMap 
+ "): " + ret);
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagRESTSink.testConnection(), result=" + 
ret);
                }
-
                return ret;
        }
 
        @Override
-       public void deleteTagResourceMap(Long id) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
        public void uploadServiceTags(ServiceTags serviceTags) throws Exception 
{
                if(LOG.isDebugEnabled()) {
                        LOG.debug("==> uploadServiceTags()");
@@ -420,8 +126,7 @@ public class TagRESTSink implements TagSink {
 
                ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class,
 tagRESTClient.toJson(serviceTags));
 
-               if(response != null && response.getStatus() == 204) {
-               } else {
+               if(response == null || response.getStatus() != 204) {
                        LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
 
                        RESTResponse resp = 
RESTResponse.fromClientResponse(response);
@@ -434,63 +139,6 @@ public class TagRESTSink implements TagSink {
                }
        }
 
-       @Override
-       public RangerTagResourceMap getTagResourceMap(Long id) throws Exception 
{
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public RangerTagResourceMap getTagResourceMapByGuid(String guid) throws 
Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTagResourceMap> getTagResourceMapsForTagId(Long 
tagId) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTagResourceMap> getTagResourceMapsForTagGuid(String 
tagGuid) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTagResourceMap> getTagResourceMapsForResourceId(Long 
resourceId) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTagResourceMap> 
getTagResourceMapsForResourceGuid(String resourceGuid) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public RangerTagResourceMap getTagResourceMapForTagAndResourceId(Long 
tagId, Long resourceId) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-
-       @Override
-       public RangerTagResourceMap 
getTagResourceMapForTagAndResourceGuid(String tagGuid, String resourceGuid) 
throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public List<RangerTagResourceMap> getTagResourceMaps(SearchFilter 
filter) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-       @Override
-       public PList<RangerTagResourceMap> 
getPaginatedTagResourceMaps(SearchFilter filter) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
-
-       @Override
-       public ServiceTags getServiceTagsIfUpdated(String serviceName, Long 
lastKnownVersion) throws Exception {
-               throw new Exception("Not implemented");
-       }
-
        private WebResource createWebResource(String url) {
                return createWebResource(url, null);
        }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
new file mode 100644
index 0000000..2548c36
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.EntityImpl;
+import org.apache.atlas.typesystem.IdImpl;
+import org.apache.atlas.typesystem.TraitImpl;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.atlas.typesystem.api.Entity;
+import org.apache.atlas.typesystem.api.Trait;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.plugin.util.RangerRESTUtils;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.*;
+
+
+// class AtlasUtil
+
+@SuppressWarnings("unchecked")
+public class AtlasUtility {
+
+       private static final Log LOG = LogFactory.getLog(AtlasUtility.class);
+
+       // Atlas APIs
+
+       public static final String API_ATLAS_TYPES = "api/atlas/types";
+       public static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
+       public static final String API_ATLAS_ENTITY = "api/atlas/entities/";
+       public static final String API_ATLAS_TYPE = "api/atlas/types/";
+
+       public static final String RESULTS_ATTRIBUTE = "results";
+       public static final String DEFINITION_ATTRIBUTE = "definition";
+       public static final String VALUES_ATTRIBUTE = "values";
+       public static final String TRAITS_ATTRIBUTE = "traits";
+       public static final String TYPE_NAME_ATTRIBUTE = "typeName";
+       public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes";
+       public static final String SUPER_TYPES_ATTRIBUTE = "superTypes";
+       public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
+       public static final String NAME_ATTRIBUTE = "name";
+
+       private Type mapType = new TypeToken<Map<String, Object>>() {
+       }.getType();
+
+       private RangerRESTClient restClient;
+       private Map<String, Entity> entities = new LinkedHashMap<>();
+
+
+       // ----- Constructor 
------------------------------------------------------
+
+       public AtlasUtility(Properties properties) {
+
+               String url = TagSyncConfig.getAtlasEndpoint(properties);
+               String sslConfigFileName = 
TagSyncConfig.getAtlasSslConfigFileName(properties);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Initializing RangerRestClient with (url=" + 
url + ", sslConfigFileName" + sslConfigFileName + ")");
+               }
+
+               restClient = new RangerRESTClient(url, sslConfigFileName);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Initialized RangerRestClient with (url=" + 
url + ", sslConfigFileName=" + sslConfigFileName + ")");
+               }
+       }
+
+       // update the set of entities with current from Atlas
+       public void refreshAllEntities() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAtlasSource.refreshAllEntities()");
+               }
+
+               try {
+                       entities.clear();
+                       entities.putAll(getAllEntities());
+               } catch (IOException e) {
+                       LOG.error("getAllEntities() failed", e);
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagAtlasSource.refreshAllEntities()");
+               }
+       }
+
+       // ----- AtlasUtility 
------------------------------------------------------
+
+       public Map<String, Entity> getAllEntities() throws IOException {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAtlasSource.getAllEntities()");
+               }
+               Map<String, Entity> entities = new LinkedHashMap<>();
+
+               Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
+
+               List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
+
+               for (String type : types) {
+
+                       Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
+
+                       List<String> guids = getAttribute(entitiesResponse, 
RESULTS_ATTRIBUTE, List.class);
+
+                       for (String guid : guids) {
+
+                               if (StringUtils.isNotBlank(guid)) {
+
+                                       Map<Trait, Map<String, ? extends 
Trait>> traitSuperTypes = new HashMap<>();
+
+                                       Map<String, Object> entityResponse = 
atlasAPI(API_ATLAS_ENTITY + guid);
+
+                                       if 
(entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+                                               String definitionJSON = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
+
+                                               LOG.info("{");
+                                               LOG.info("      \"entity-id\":" 
+ guid + ",");
+                                               LOG.info("      
\"entity-definition\":" + definitionJSON);
+                                               LOG.info("}");
+
+                                               Map<String, Object> definition 
= new Gson().fromJson(definitionJSON, mapType);
+
+                                               Map<String, Object> values = 
getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
+                                               Map<String, Object> traits = 
getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+                                               String typeName = 
getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
+
+                                               LOG.info("Received 
entity(typeName=" + typeName + ", id=" + guid + ")");
+
+
+                                               Map<String, TraitImpl> traitMap 
= new HashMap<>();
+
+                                               if 
(MapUtils.isNotEmpty(traits)) {
+
+                                                       LOG.info("Traits for 
entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
+
+                                                       for (Map.Entry<String, 
Object> entry : traits.entrySet()) {
+
+                                                               Map<String, 
Object> trait = (Map<String, Object>) entry.getValue();
+
+                                                               Map<String, 
Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+                                                               String 
traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+
+                                                               Map<String, 
TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), 
traitValues);
+
+                                                               TraitImpl 
trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
+
+                                                               
traitSuperTypes.put(trait1, superTypes);
+
+                                                               
traitMap.put(entry.getKey(), trait1);
+
+
+                                                               LOG.info("      
                Trait(typeName=" + traitTypeName + ")");
+
+                                                       }
+                                               } else {
+                                                       LOG.info("No traits for 
entity(typeName=" + typeName + ", id=" + guid + ")");
+                                               }
+                                               EntityImpl entity = new 
EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
+
+                                               showEntity(entity);
+
+                                               entities.put(guid, entity);
+
+                                       }
+                               }
+                       }
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAtlasSource.getAllEntities()");
+               }
+               return entities;
+       }
+
+
+       // ----- helper methods 
----------------------------------------------------
+
+       private Map<String, Object> getTraitType(String traitName)
+                       throws IOException {
+
+               Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + 
traitName);
+
+               if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+                       String definitionJSON = getAttribute(typeResponse, 
DEFINITION_ATTRIBUTE, String.class);
+
+                       Map<String, Object> definition = new 
Gson().fromJson(definitionJSON, mapType);
+
+                       List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
+
+                       if (traitTypes.size() > 0) {
+                               return (Map<String, Object>) traitTypes.get(0);
+                       }
+               }
+               return null;
+       }
+
+       private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> 
traitType, Map<String, Object> values)
+                       throws IOException {
+
+               Map<String, TraitImpl> superTypes = new HashMap<>();
+
+               if (traitType != null) {
+
+                       List<String> superTypeNames = getAttribute(traitType, 
SUPER_TYPES_ATTRIBUTE, List.class);
+
+                       for (String superTypeName : superTypeNames) {
+
+                               Map<String, Object> superTraitType = 
getTraitType(superTypeName);
+
+                               if (superTraitType != null) {
+                                       List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+
+                                       Map<String, Object> superTypeValues = 
new HashMap<>();
+                                       for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
+
+                                               String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
+                                               if 
(values.containsKey(attributeName)) {
+                                                       
superTypeValues.put(attributeName, values.get(attributeName));
+                                               }
+                                       }
+
+                                       superTypes.put(superTypeName,
+                                                       //new 
TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, 
superTypeName));
+                                                       new 
TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, 
superTypeValues)));
+                               }
+                       }
+               }
+               return superTypes;
+       }
+
+
+       /*
+               private Map<String, Object> atlasAPI(String endpoint) throws 
IOException {
+                       InputStream in = streamProvider.readFrom(atlasEndpoint 
+ endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
+                       return new Gson().fromJson(IOUtils.toString(in, 
"UTF-8"), mapType);
+               }
+               */
+
+
+       private Map<String, Object> atlasAPI(String endpoint) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint + 
")");
+               }
+               // Create a REST client and perform a get on it
+               Map<String, Object> ret = new HashMap<String, Object>();
+
+               WebResource webResource = restClient.getResource(endpoint);
+
+               ClientResponse response = 
webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
+
+               if (response != null && response.getStatus() == 200) {
+                       ret = response.getEntity(ret.getClass());
+               } else {
+                       LOG.error("Atlas REST call returned with response={" + 
response + "}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+                       LOG.error("Error getting Atlas Entity. request=" + 
webResource.toString()
+                                       + ", response=" + resp.toString());
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + 
")");
+               }
+               return ret;
+       }
+
+       private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
+               return type.cast(map.get(name));
+       }
+
+       public void showEntity(Entity entity) {
+
+               LOG.debug("Entity-id    :" + entity.getId());
+
+               LOG.debug("Type:                " + entity.getTypeName());
+
+               LOG.debug("----- Values -----");
+
+               for (Map.Entry<String, Object> entry : 
entity.getValues().entrySet()) {
+                       LOG.debug("             Name:   " + entry.getKey() + 
"");
+                       Object value = entry.getValue();
+                       LOG.debug("             Value:  " + getValue(value, 
entities.keySet()));
+               }
+
+               LOG.debug("----- Traits -----");
+
+               for (String traitName : entity.getTraits().keySet()) {
+                       LOG.debug("             Name:" + entity.getId() + ", 
trait=" + traitName + ">" + traitName);
+               }
+
+       }
+
+       public void showTrait(Entity entity, String traitId) {
+
+               String[] traitNames = traitId.split(",");
+
+               Trait trait = entity.getTraits().get(traitNames[0]);
+
+               for (int i = 1; i < traitNames.length; ++i) {
+                       trait = trait.getSuperTypes().get(traitNames[i]);
+               }
+
+               String typeName = trait.getTypeName();
+
+               LOG.debug("Trait " + typeName + " for Entity id=" + 
entity.getId());
+
+               LOG.debug("Type: " + typeName);
+
+               LOG.debug("----- Values ------");
+
+               for (Map.Entry<String, Object> entry : 
trait.getValues().entrySet()) {
+                       LOG.debug("Name:" + entry.getKey());
+                       Object value = entry.getValue();
+                       LOG.debug("Value:" + getValue(value, 
entities.keySet()));
+               }
+
+               LOG.debug("Super Traits");
+
+
+               for (String traitName : trait.getSuperTypes().keySet()) {
+                       LOG.debug("Name=" + entity.getId() + "&trait=" + 
traitId + "," + traitName + ">" + traitName);
+               }
+       }
+
+       // resolve the given value if necessary
+       private String getValue(Object value, Set<String> ids) {
+               if (value == null) {
+                       return "";
+               }
+               String idString = getIdValue(value, ids);
+               if (idString != null) {
+                       return idString;
+               }
+
+               idString = getIdListValue(value, ids);
+               if (idString != null) {
+                       return idString;
+               }
+
+               return value.toString();
+       }
+
+       // get an id from the given value; return null if the value is not an 
id type
+       private String getIdValue(Object value, Set<String> ids) {
+               if (value instanceof Map) {
+                       Map map = (Map) value;
+                       if (map.size() == 3 && map.containsKey("id")) {
+                               String id = map.get("id").toString();
+                               if (ids.contains(id)) {
+                                       return id;
+                               }
+                       }
+               }
+               return null;
+       }
+
+       // get an id list from the given value; return null if the value is not 
an id list type
+       private String getIdListValue(Object value, Set<String> ids) {
+               if (value instanceof List) {
+                       List list = (List) value;
+                       if (list.size() > 0) {
+                               StringBuilder sb = new StringBuilder();
+                               for (Object o : list) {
+                                       String idString = getIdValue(o, ids);
+                                       if (idString == null) {
+                                               return value.toString();
+                                       }
+                                       if (sb.length() > 0) {
+                                               sb.append(", ");
+                                       }
+                                       sb.append(idString);
+                               }
+                               return sb.toString();
+                       }
+               }
+               return null;
+       }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
index 1c2a25b..e5c91bd 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -20,16 +20,10 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.EntityImpl;
-import org.apache.atlas.typesystem.IdImpl;
-import org.apache.atlas.typesystem.TraitImpl;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -40,25 +34,23 @@ import 
org.apache.atlas.notification.entity.EntityNotificationConsumer;
 import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
 import org.apache.atlas.typesystem.api.Entity;
 import org.apache.atlas.typesystem.api.Trait;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.RangerRESTUtils;
 import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
 
 import java.io.IOException;
-import java.lang.reflect.Type;
+import java.io.InputStream;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 public class TagAtlasSource implements TagSource {
        private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
 
+       public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"application.properties";
+
+       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.notification.kafka.bootstrap.servers";
+       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.notification.kafka.zookeeper.connect";
+       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.notification.kafka.group.id";
 
-       private final Map<String, Entity> entities = new LinkedHashMap<>();
        private TagSink tagSink;
        private Properties properties;
        private ConsumerRunnable consumerTask;
@@ -78,19 +70,46 @@ public class TagAtlasSource implements TagSource {
                        this.properties = properties;
                }
 
+               Properties atlasProperties = new Properties();
 
-               NotificationModule notificationModule = new 
NotificationModule();
+               InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
 
-               Injector injector = Guice.createInjector(notificationModule);
+               if (inputStream != null) {
+                       try {
+                               atlasProperties.load(inputStream);
+                       } catch (IOException ioException) {
+                               ret = false;
+                               LOG.error("Cannot load Atlas application 
properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
+                       }
+               } else {
+                       ret = false;
+                       LOG.error("Cannot find Atlas application properties 
file");
+               }
 
-               EntityNotificationConsumerProvider consumerProvider = 
injector.getInstance(EntityNotificationConsumerProvider.class);
+               if (ret) {
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
+                       }
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
+                       }
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
+                       }
+               }
 
-               consumerTask = new ConsumerRunnable(consumerProvider.get());
+               if (ret) {
+                       NotificationModule notificationModule = new 
NotificationModule();
 
-               //ExecutorService executorService = 
Executors.newFixedThreadPool(1);
+                       Injector injector = 
Guice.createInjector(notificationModule);
 
-               //executorService.submit(new 
ConsumerRunnable(consumerProvider.get()));
+                       EntityNotificationConsumerProvider consumerProvider = 
injector.getInstance(EntityNotificationConsumerProvider.class);
 
+                       consumerTask = new 
ConsumerRunnable(consumerProvider.get());
+               }
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("<== TagAtlasSource.initialize(), result=" + 
ret);
@@ -202,390 +221,7 @@ public class TagAtlasSource implements TagSource {
                                                LOG.debug("                     
        Trait-Value:" + valueEntry.getValue());
                                        }
                                }
-
-                       }
-               }
-
-       }
-
-       public void printAllEntities() {
-               try {
-                       new AtlasUtility().getAllEntities();
-               }
-               catch(java.io.IOException ioException) {
-                       LOG.error("Caught IOException while retrieving Atlas 
Entities:", ioException);
-               }
-       }
-
-       // update the set of entities with current from Atlas
-       public void refreshAllEntities() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.refreshAllEntities()");
-               }
-               AtlasUtility atlasUtility = new AtlasUtility();
-
-               try {
-                       entities.putAll(atlasUtility.getAllEntities());
-               } catch (IOException e) {
-                       LOG.error("getAllEntities() failed", e);
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAtlasSource.refreshAllEntities()");
-               }
-       }
-
-       // Inner class AtlasUtil
-
-       /**
-        * Atlas utility.
-        */
-       @SuppressWarnings("unchecked")
-       private class AtlasUtility {
-
-               /**
-                * Atlas APIs
-                */
-               public static final String API_ATLAS_TYPES    = 
"api/atlas/types";
-               public static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
-               public static final String API_ATLAS_ENTITY   = 
"api/atlas/entities/";
-               public static final String API_ATLAS_TYPE     = 
"api/atlas/types/";
-
-               /**
-                * API Response Attributes
-                */
-               public static final String RESULTS_ATTRIBUTE               = 
"results";
-               public static final String DEFINITION_ATTRIBUTE            = 
"definition";
-               public static final String VALUES_ATTRIBUTE                = 
"values";
-               public static final String TRAITS_ATTRIBUTE                = 
"traits";
-               public static final String TYPE_NAME_ATTRIBUTE             = 
"typeName";
-               public static final String TRAIT_TYPES_ATTRIBUTE           = 
"traitTypes";
-               public static final String SUPER_TYPES_ATTRIBUTE           = 
"superTypes";
-               public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
-               public static final String NAME_ATTRIBUTE                  = 
"name";
-
-               private Type mapType = new TypeToken<Map<String, 
Object>>(){}.getType();
-
-               private RangerRESTClient restClient;
-
-
-               // ----- Constructors 
------------------------------------------------------
-
-               /**
-                * Construct an AtlasUtility
-                *
-                */
-               public AtlasUtility() {
-
-                       String url               = 
TagSyncConfig.getAtlasEndpoint(properties);
-                       String sslConfigFileName = 
TagSyncConfig.getAtlasSslConfigFileName(properties);
-
-
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("Initializing RangerRestClient with 
(url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
-                       }
-
-                       restClient = new RangerRESTClient(url, 
sslConfigFileName);
-
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("Initialized RangerRestClient with 
(url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
-                       }
-               }
-
-
-               // ----- AtlasUtility 
------------------------------------------------------
-
-               /**
-                * Get all of the entities defined in Atlas.
-                *
-                * @return  a mapping of GUIDs to Atlas entities
-                *
-                * @throws IOException if there is an error communicating with 
Atlas
-                */
-               public Map<String, Entity> getAllEntities() throws IOException {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> 
TagAtlasSource.getAllEntities()");
-                       }
-                       Map<String, Entity> entities = new LinkedHashMap<>();
-
-                       Map<String, Object> typesResponse = 
atlasAPI(API_ATLAS_TYPES);
-
-                       List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
-
-                       for (String type : types) {
-
-                               Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
-
-                               List<String> guids = 
getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-                               for (String guid : guids) {
-
-                                       if (StringUtils.isNotBlank(guid)) {
-
-                                               Map<Trait, Map<String, ? 
extends Trait>> traitSuperTypes = new HashMap<>();
-
-                                               Map<String, Object> 
entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
-
-                                               if 
(entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-                                                       String definitionJSON = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
-
-                                                       LOG.info("{");
-                                                       LOG.info("      
\"entity-id\":" + guid + ",");
-                                                       LOG.info("      
\"entity-definition\":" + definitionJSON);
-                                                       LOG.info("}");
-
-                                                       Map<String, Object> 
definition = new Gson().fromJson(definitionJSON, mapType);
-
-                                                       Map<String, Object> 
values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
-                                                       Map<String, Object> 
traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-                                                       String typeName = 
getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                       LOG.info("Received 
entity(typeName=" + typeName + ", id=" + guid + ")");
-
-
-                                                       Map<String, TraitImpl> 
traitMap = new HashMap<>();
-
-                                                       if 
(MapUtils.isNotEmpty(traits)) {
-
-                                                               
LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ 
");
-
-                                                               for 
(Map.Entry<String, Object> entry : traits.entrySet()) {
-
-                                                                       
Map<String, Object> trait = (Map<String, Object>) entry.getValue();
-
-                                                                       
Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, 
Map.class);
-                                                                       String 
traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                                       
Map<String, TraitImpl> superTypes = 
getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-                                                                       
TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
-
-                                                                       
traitSuperTypes.put(trait1, superTypes);
-
-                                                                       
traitMap.put(entry.getKey(), trait1);
-
-
-                                                                       
LOG.info("                      Trait(typeName=" + traitTypeName + ")");
-
-                                                               }
-                                                       } else {
-                                                               LOG.info("No 
traits for entity(typeName=" + typeName + ", id=" + guid + ")");
-                                                       }
-                                                       EntityImpl entity = new 
EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
-
-                                                       showEntity(entity);
-
-                                                       entities.put(guid, 
entity);
-
-                                               }
-                                       }
-                               }
-                       }
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> 
TagAtlasSource.getAllEntities()");
-                       }
-                       return entities;
-               }
-
-
-               // ----- helper methods 
----------------------------------------------------
-
-               private Map<String, Object> getTraitType(String traitName)
-                               throws IOException {
-
-                       Map<String, Object> typeResponse = 
atlasAPI(API_ATLAS_TYPE + traitName);
-
-                       if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-                               String definitionJSON = 
getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
-
-                               Map<String, Object> definition = new 
Gson().fromJson(definitionJSON, mapType);
-
-                               List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
-
-                               if (traitTypes.size() > 0) {
-                                       return (Map<String, Object>) 
traitTypes.get(0);
-                               }
-                       }
-                       return null;
-               }
-
-               private Map<String, TraitImpl> getTraitSuperTypes(Map<String, 
Object> traitType, Map<String, Object> values)
-                               throws IOException {
-
-                       Map<String, TraitImpl> superTypes = new HashMap<>();
-
-                       if (traitType != null) {
-
-                               List<String> superTypeNames = 
getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
-
-                               for (String superTypeName : superTypeNames) {
-
-                                       Map<String, Object> superTraitType = 
getTraitType(superTypeName);
-
-                                       if (superTraitType != null) {
-                                               List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-                                               Map<String, Object> 
superTypeValues = new HashMap<>();
-                                               for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
-
-                                                       String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
-                                                       if 
(values.containsKey(attributeName)) {
-                                                               
superTypeValues.put(attributeName, values.get(attributeName));
-                                                       }
-                                               }
-
-                                               superTypes.put(superTypeName,
-                                                               //new 
TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, 
superTypeName));
-                                                               new 
TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, 
superTypeValues)));
-                                       }
-                               }
-                       }
-                       return superTypes;
-               }
-
-               /*
-               private Map<String, Object> atlasAPI(String endpoint) throws 
IOException {
-                       InputStream in  = streamProvider.readFrom(atlasEndpoint 
+ endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
-                       return new Gson().fromJson(IOUtils.toString(in, 
"UTF-8"), mapType);
-               }
-               */
-
-               private Map<String, Object> atlasAPI(String endpoint)  {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> TagAtlasSource.atlasAPI(" + 
endpoint +")");
-                       }
-                       // Create a REST client and perform a get on it
-                       Map<String, Object> ret = new HashMap<String, Object>();
-
-                       WebResource webResource = 
restClient.getResource(endpoint);
-
-                       ClientResponse response = 
webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-                       if(response != null && response.getStatus() == 200) {
-                               ret = response.getEntity(ret.getClass());
-                       } else {
-                               LOG.error("Atlas REST call returned with 
response={" + response +"}");
-
-                               RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-                               LOG.error("Error getting Atlas Entity. 
request=" + webResource.toString()
-                                               + ", response=" + 
resp.toString());
                        }
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("<== TagAtlasSource.atlasAPI(" + 
endpoint + ")");
-                       }
-                       return ret;
                }
-
-               private <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
-                       return type.cast(map.get(name));
-               }
-
-
-
-               public void showEntity(Entity entity) {
-
-                       LOG.debug("Entity-id    :" + entity.getId());
-
-                       LOG.debug("Type:                " + 
entity.getTypeName());
-
-                       LOG.debug("----- Values -----");
-
-                       for (Map.Entry<String, Object> entry : 
entity.getValues().entrySet()) {
-                               LOG.debug("             Name:   " + 
entry.getKey() + "");
-                               Object value = entry.getValue();
-                               LOG.debug("             Value:  " + 
getValue(value, entities.keySet()));
-                       }
-
-                       LOG.debug("----- Traits -----");
-
-                       for (String traitName : entity.getTraits().keySet()) {
-                               LOG.debug("             Name:" + entity.getId() 
+ ", trait=" + traitName + ">" + traitName);
-                       }
-
-               }
-
-               public void showTrait(Entity entity, String traitId) {
-
-                       String[] traitNames = traitId.split(",");
-
-                       Trait trait = entity.getTraits().get(traitNames[0]);
-
-                       for (int i = 1; i < traitNames.length; ++i ) {
-                               trait = 
trait.getSuperTypes().get(traitNames[i]);
-                       }
-
-                       String typeName = trait.getTypeName();
-
-                       LOG.debug("Trait " + typeName + " for Entity id=" + 
entity.getId());
-
-                       LOG.debug("Type: " + typeName);
-
-                       LOG.debug("----- Values ------");
-
-                       for (Map.Entry<String, Object> entry : 
trait.getValues().entrySet()) {
-                               LOG.debug("Name:" + entry.getKey());
-                               Object value = entry.getValue();
-                               LOG.debug("Value:" + getValue(value, 
entities.keySet()));
-                       }
-
-                       LOG.debug("Super Traits");
-
-
-                       for (String traitName : trait.getSuperTypes().keySet()) 
{
-                               LOG.debug("Name=" + entity.getId() + "&trait=" 
+ traitId + "," + traitName + ">" + traitName);
-                       }
-               }
-
-               // resolve the given value if necessary
-               private String getValue(Object value, Set<String> ids) {
-                       if (value == null) {
-                               return "";
-                       }
-                       String idString = getIdValue(value, ids);
-                       if (idString != null) {
-                               return idString;
-                       }
-
-                       idString = getIdListValue(value, ids);
-                       if (idString != null) {
-                               return idString;
-                       }
-
-                       return value.toString();
-               }
-               // get an id from the given value; return null if the value is 
not an id type
-               private String getIdValue(Object value, Set<String> ids) {
-                       if (value instanceof Map) {
-                               Map map = (Map) value;
-                               if (map.size() == 3 && map.containsKey("id")){
-                                       String id = map.get("id").toString();
-                                       if (ids.contains(id)) {
-                                               return id;
-                                       }
-                               }
-                       }
-                       return null;
-               }
-               // get an id list from the given value; return null if the 
value is not an id list type
-               private String getIdListValue(Object value, Set<String> ids) {
-                       if (value instanceof List) {
-                               List list = (List) value;
-                               if (list.size() > 0) {
-                                       StringBuilder sb = new StringBuilder();
-                                       for (Object o : list) {
-                                               String idString = getIdValue(o, 
ids);
-                                               if (idString == null) {
-                                                       return value.toString();
-                                               }
-                                               if (sb.length() > 0) {
-                                                       sb.append(", ");
-                                               }
-                                               sb.append(idString);
-                                       }
-                                       return sb.toString();
-                               }
-                       }
-                       return null;
-               }
-
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
index 925a712..03a3980 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.file;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.tagsync.model.TagSink;
@@ -59,8 +60,24 @@ public class TagFileSource implements TagSource, Runnable {
 
                boolean ret = true;
 
+               if 
(StringUtils.isBlank(TagSyncConfig.getTagSourceFileName(properties))) {
+                       ret = false;
+                       LOG.error("value of property 
'ranger.tagsync.source.impl.class' is file and no value specified for property 
'ranger.tagsync.filesource.filename'!");
+               }
+
                if (ret) {
 
+                       long fileModTimeCheckIntervalInMs = 
TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
+
+                       if (fileModTimeCheckIntervalInMs <= 0L) {
+                               
LOG.info("'ranger.tagsync.filesource.modtime.check.interval' is zero or 
negative! 'ranger.tagsync.filesource.modtime.check.interval'=" + 
fileModTimeCheckIntervalInMs + "ms");
+                               LOG.info("Setting 
'ranger.tagsync.filesource.modtime.check.interval' to 60 seconds");
+                               fileModTimeCheckIntervalInMs = 60*1000;
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       
LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + 
fileModTimeCheckIntervalInMs + "ms");
+                               }
+                       }
                        sourceFileName = 
TagSyncConfig.getTagSourceFileName(properties);
 
                        if (LOG.isDebugEnabled()) {
@@ -120,7 +137,7 @@ public class TagFileSource implements TagSource, Runnable {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> TagFileSource.run()");
                }
-               long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+               long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
                boolean shutdownFlag = false;
 
                while (!shutdownFlag) {
@@ -143,7 +160,7 @@ public class TagFileSource implements TagSource, Runnable {
                                Thread.sleep(sleepTimeBetweenCycleInMillis);
                        }
                        catch (InterruptedException e) {
-                               LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to 
synchronize tag information", e);
+                               LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to 
tagFileSource", e);
                                shutdownFlag = true;
                        }
                        catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/resources/ranger-tagsync-default.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml 
b/tagsync/src/main/resources/ranger-tagsync-default.xml
index b9e4512..8649d06 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -27,7 +27,7 @@
        </property>
        <property>
                <name>ranger.tagsync.tagadmin.rest.url</name>
-               <value>http://localhost:6080</value>
+               <value></value>
                <description></description>
        </property>
        <property>
@@ -36,18 +36,18 @@
                <description></description>
        </property>
        <property>
-               <name>ranger.tagsync.sleeptimeinmillisbetweensynccycle</name>
+               <name>ranger.tagsync.filesource.modtime.check.interval</name>
                <value>60000</value>
                <description></description>
        </property>
        <property>
                <name>ranger.tagsync.filesource.filename</name>
-               <value>/etc/ranger/data/tags.json</value>
+               <value></value>
                <description></description>
        </property>
        <property>
                <name>ranger.tagsync.source.impl.class</name>
-               <value>file</value>
+               <value></value>
                <description></description>
        </property>
        <property>
@@ -56,13 +56,7 @@
                <description></description>
        </property>
        <property>
-               
<name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name>
-               <value>cl1_hive</value>
-               <description></description>
-       </property>
-       <property>
-               <name>ranger.tagsync.atlassource.endpoint</name>
-               <value>http://localhost:21000/</value>
-               <description></description>
+               <name>ranger.tagsync.atlas.to.service.mapping</name>
+               <value>c1,hive,cl1_hive</value>
        </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/main/resources/ranger-tagsync-site.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-site.xml 
b/tagsync/src/main/resources/ranger-tagsync-site.xml
new file mode 100644
index 0000000..19532e9
--- /dev/null
+++ b/tagsync/src/main/resources/ranger-tagsync-site.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+       <property>
+               <name>ranger.tagsync.enabled</name>
+               <value>true</value>
+       </property>
+       <property>
+               <name>ranger.tagsync.logdir</name>
+               <value>log</value>
+       </property>
+       <property>
+               <name>ranger.tagsync.tagadmin.rest.url</name>
+               <value>localhost:6080</value>
+               <description></description>
+       </property>
+       <property>
+               <name>ranger.tagsync.tagadmin.rest.ssl.config.file</name>
+               <value></value>
+               <description></description>
+       </property>
+       <property>
+               <name>ranger.tagsync.filesource.modtime.check.interval</name>
+               <value>60000</value>
+               <description></description>
+       </property>
+       <property>
+               <name>ranger.tagsync.filesource.filename</name>
+               <value>/etc/ranger/data/tags.json</value>
+               <description></description>
+       </property>
+       <property>
+               <name>ranger.tagsync.source.impl.class</name>
+               <value>File</value>
+               <description></description>
+       </property>
+       <property>
+               <name>ranger.tagsync.atlas.to.service.mapping</name>
+               <value>c1,hive,cl1_hive</value>
+       </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/757d1eb2/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
 
b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
index 10be4e6..9d603d4 100644
--- 
a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
+++ 
b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
@@ -75,20 +75,27 @@ public class TestTagSynchronizer {
        @Test
        public void testTagDownload() {
 
-               boolean initDone = tagSynchronizer.initLoop();
+               boolean initDone = false;
+
+               /* For tagSynchronizer.initialize() to succeed, edit 
ranger-tagsync-site.xml file to contain correct
+               values of the following properties:
+                       ranger.tagsync.tagadmin.rest.url, 
ranger.tagsync.tagadmin.password
+
+               For example:
+                       <property>
+                               <name>ranger.tagsync.tagadmin.rest.url</name>
+                               <value>http://tagsync-test:6080</value>
+                       </property>
+                       <property>
+                               <name>ranger.tagsync.tagadmin.password</name>
+                               <value>rangertagsync</value>
+                       </property>
+               */
 
-               System.out.println("TagSynchronizer initialization result=" + 
initDone);
 
-               /*
-               TagSource tagSource = tagSynchronizer.getTagSource();
+               //initDone = tagSynchronizer.initialize();
 
-               try {
-                       TagAtlasSource tagAtlasSource = (TagAtlasSource) 
tagSource;
-                       //tagAtlasSource.printAllEntities();
-               } catch (ClassCastException exception) {
-                       System.err.println("TagSource is not of 
TagAtlasSource");
-               }
-               */
+               System.out.println("TagSynchronizer initialization result=" + 
initDone);
 
                System.out.println("Exiting testTagDownload()");
        }

Reply via email to