RANGER-807: TagSync should support periodic full sync with Apache Atlas

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/5b9c094f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/5b9c094f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/5b9c094f

Branch: refs/heads/master
Commit: 5b9c094ff501ec591f6334bcea10ce37a2163711
Parents: fec8460
Author: Abhay Kulkarni <[email protected]>
Authored: Tue Dec 15 19:08:02 2015 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Sat Jan 9 23:59:10 2016 -0800

----------------------------------------------------------------------
 .../ranger/plugin/util/RangerRESTClient.java    |   3 +-
 .../apache/ranger/plugin/util/ServiceTags.java  |  27 +-
 .../conf/templates/installprop2xml.properties   |   6 +-
 .../conf/templates/ranger-tagsync-template.xml  |  13 +-
 tagsync/scripts/install.properties              |  12 +-
 tagsync/scripts/ranger-tagsync-services.sh      |   6 +-
 tagsync/scripts/ranger-tagsync-sync.sh          |  66 ++++
 tagsync/scripts/setup.py                        |  19 +-
 .../ranger/tagsync/model/AbstractTagSource.java |  65 ++++
 .../apache/ranger/tagsync/model/TagSource.java  |   6 +-
 .../ranger/tagsync/process/TagSyncConfig.java   |  68 ++--
 .../ranger/tagsync/process/TagSynchronizer.java | 193 +++++++----
 .../tagsync/sink/tagadmin/TagAdminRESTSink.java | 162 +++++++++
 .../tagsync/sink/tagadmin/TagRESTSink.java      | 160 ---------
 .../source/Atlas/AtlasHiveResourceMapper.java   | 203 ++++++++++++
 .../source/Atlas/AtlasResourceMapper.java       |  74 +++++
 .../source/Atlas/AtlasResourceMapperUtil.java   | 124 +++++++
 .../tagsync/source/Atlas/AtlasTagSource.java    | 197 +++++++++++
 .../source/atlas/AtlasEntityWithTraits.java     |  96 ++++++
 .../source/atlas/AtlasNotificationMapper.java   | 327 ++++++-------------
 .../tagsync/source/atlas/TagAtlasSource.java    | 241 --------------
 .../source/atlasrest/AtlasRESTTagSource.java    | 143 ++++++++
 .../tagsync/source/atlasrest/AtlasRESTUtil.java | 264 +++++++++++++++
 .../tagsync/source/file/FileTagSource.java      | 278 ++++++++++++++++
 .../tagsync/source/file/TagFileSource.java      | 281 ----------------
 .../main/resources/etc/ranger/data/tags.json    |  26 +-
 .../main/resources/ranger-tagsync-default.xml   |   5 +
 .../tagsync/process/TestTagSynchronizer.java    |  22 +-
 28 files changed, 2020 insertions(+), 1067 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java
index c311670..7cfd040 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTClient.java
@@ -212,8 +212,7 @@ public class RangerRESTClient {
                        client = Client.create(config);
                }
 
-               // TODO: for testing only
-               if(!StringUtils.isEmpty(mUsername) || 
!StringUtils.isEmpty(mPassword)) {
+               if(!StringUtils.isEmpty(mUsername) && 
!StringUtils.isEmpty(mPassword)) {
                        client.addFilter(new HTTPBasicAuthFilter(mUsername, 
mPassword)); 
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
index 7800e9a..c3fb4cf 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
@@ -23,6 +23,8 @@ package org.apache.ranger.plugin.util;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -51,7 +53,6 @@ public class ServiceTags implements java.io.Serializable {
        public static final String TAGMODEL_SHARED        = "shared";
        public static final String TAGMODEL_RESOURCE_PRIVATE        = 
"resource_private";
 
-
        private String                      op = OP_ADD_OR_UPDATE;
        private String                      tagModel = TAGMODEL_SHARED;
        private String                      serviceName;
@@ -62,6 +63,22 @@ public class ServiceTags implements java.io.Serializable {
        private List<RangerServiceResource> serviceResources;
        private Map<Long, List<Long>>       resourceToTagIds;
 
+       public ServiceTags() {
+               this(OP_ADD_OR_UPDATE, TAGMODEL_SHARED, null, 0L, null, null, 
null, null, null);
+       }
+
+       public ServiceTags(String op, String tagModel, String serviceName, Long 
tagVersion, Date tagUpdateTime, Map<Long, RangerTagDef> tagDefinitions,
+                                          Map<Long, RangerTag> tags, 
List<RangerServiceResource> serviceResources, Map<Long, List<Long>> 
resourceToTagIds) {
+               setOp(op);
+               setTagModel(tagModel);
+               setServiceName(serviceName);
+               setTagVersion(tagVersion);
+               setTagUpdateTime(tagUpdateTime);
+               setTagDefinitions(tagDefinitions);
+               setTags(tags);
+               setServiceResources(serviceResources);
+               setResourceToTagIds(resourceToTagIds);
+       }
        /**
         * @return the op
         */
@@ -138,7 +155,7 @@ public class ServiceTags implements java.io.Serializable {
        }
 
        public void setTagDefinitions(Map<Long, RangerTagDef> tagDefinitions) {
-               this.tagDefinitions = tagDefinitions;
+               this.tagDefinitions = tagDefinitions == null ? new 
HashMap<Long, RangerTagDef>() : tagDefinitions;
        }
 
        public Map<Long, RangerTag> getTags() {
@@ -146,7 +163,7 @@ public class ServiceTags implements java.io.Serializable {
        }
 
        public void setTags(Map<Long, RangerTag> tags) {
-               this.tags = tags;
+               this.tags = tags == null ? new HashMap<Long, RangerTag>() : 
tags;
        }
 
        public List<RangerServiceResource> getServiceResources() {
@@ -154,7 +171,7 @@ public class ServiceTags implements java.io.Serializable {
        }
 
        public void setServiceResources(List<RangerServiceResource> 
serviceResources) {
-               this.serviceResources = serviceResources;
+               this.serviceResources = serviceResources == null ? new 
ArrayList<RangerServiceResource>() : serviceResources;
        }
 
        public Map<Long, List<Long>> getResourceToTagIds() {
@@ -162,7 +179,7 @@ public class ServiceTags implements java.io.Serializable {
        }
 
        public void setResourceToTagIds(Map<Long, List<Long>> resourceToTagIds) 
{
-               this.resourceToTagIds = resourceToTagIds;
+               this.resourceToTagIds = resourceToTagIds == null ? new 
HashMap<Long, List<Long>>() : resourceToTagIds;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/conf/templates/installprop2xml.properties
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/installprop2xml.properties 
b/tagsync/conf/templates/installprop2xml.properties
index 101a1ba..a6840b0 100644
--- a/tagsync/conf/templates/installprop2xml.properties
+++ b/tagsync/conf/templates/installprop2xml.properties
@@ -29,13 +29,15 @@ TAGADMIN_SSL_CONFIG_FILENAME = 
ranger.tagsync.tagadmin.rest.ssl.config.file
 
 TAGSYNC_KEYSTORE_FILENAME = ranger.tagsync.tagadmin.keystore
 
-
+TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
 TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 
ranger.tagsync.filesource.modtime.check.interval
 
-TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
+TAG_SOURCE_ATLAS_REST_URL = ranger.tagsync.atlasrestsource.endpoint
+TAG_SOURCE_ATLAS_REST_DOWNLOAD_INTERVAL = 
ranger.tagsync.atlasrestsource.download.interval
 
 TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.kafka.bootstrap.servers
 TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.kafka.zookeeper.connect
 TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.kafka.entities.group.id
 
 TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING = 
ranger.tagsync.atlas.to.service.mapping
+TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS = 
ranger.tagsync.source.atlas.custom.resource.mappers
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/conf/templates/ranger-tagsync-template.xml
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/ranger-tagsync-template.xml 
b/tagsync/conf/templates/ranger-tagsync-template.xml
index 9a88681..bad71bd 100644
--- a/tagsync/conf/templates/ranger-tagsync-template.xml
+++ b/tagsync/conf/templates/ranger-tagsync-template.xml
@@ -32,6 +32,10 @@
                <value></value>
        </property>
        <property>
+               <name>ranger.tagsync.atlasrestsource.download.interval</name>
+               <value></value>
+       </property>
+       <property>
                <name>ranger.tagsync.tagadmin.rest.ssl.config.file</name>
                <value></value>
        </property>
@@ -59,4 +63,11 @@
        <name>ranger.tagsync.atlas.to.service.mapping</name>
                <value></value>
        </property>
-</configuration>
+       <property>
+               <name>ranger.tagsync.atlasrestsource.endpoint</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger.tagsync.source.atlas.custom.resource.mappers</name>
+               <value></value>
+       </property></configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/install.properties
----------------------------------------------------------------------
diff --git a/tagsync/scripts/install.properties 
b/tagsync/scripts/install.properties
index 95e87e5..b6665d1 100644
--- a/tagsync/scripts/install.properties
+++ b/tagsync/scripts/install.properties
@@ -37,9 +37,13 @@ TAGADMIN_ENDPOINT = http://localhost:6080
 # SSL config file name for TagAdmin
 TAGADMIN_SSL_CONFIG_FILENAME =
 
-# Source for tags (either 'atlas' or 'file')
+# Source for tags (either 'atlas' or 'file' or 'atlasrest')
 
 TAG_SOURCE = atlas
+TAG_SOURCE_ATLAS_REST_URL = http://localhost:21000
+
+# Interval for checking the source for any changes in case of TAG_SOURCE = 
atlasrest
+TAG_SOURCE_ATLAS_REST_DOWNLOAD_INTERVAL = 900000
 
 # File name to be used for reading tags information if TAG_SOURCE = file
 
@@ -73,3 +77,9 @@ TAGSYNC_ATLAS_CONSUMER_GROUP = ranger_entities_consumer
 #
 
 TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING=
+
+# A comma separated list of custom mapper class names which convert Atlas 
entities to
+# RangerServiceResource structures are specified here. If there are no custom 
mappers,
+# then it can be left blank
+
+TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/ranger-tagsync-services.sh
----------------------------------------------------------------------
diff --git a/tagsync/scripts/ranger-tagsync-services.sh 
b/tagsync/scripts/ranger-tagsync-services.sh
index ca82ead..058ebf3 100755
--- a/tagsync/scripts/ranger-tagsync-services.sh
+++ b/tagsync/scripts/ranger-tagsync-services.sh
@@ -16,8 +16,8 @@
 # limitations under the License.
 
 if [[ -z $1 ]]; then
-        echo "Invalid argument [$1];"
-        echo "Usage: Only start | stop | restart | version, are supported."
+        echo "No argument provided.."
+        echo "Usage: $0 {start | stop | restart | version}"
         exit;
 fi
 action=$1
@@ -119,7 +119,7 @@ elif [ "${action}" == "VERSION" ]; then
        exit
 else 
        echo "Invalid argument [$1];"
-    echo "Usage: Only start | stop | restart | version, are supported."
+       echo "Usage: $0 {start | stop | restart | version}"
     exit;
 fi
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/ranger-tagsync-sync.sh
----------------------------------------------------------------------
diff --git a/tagsync/scripts/ranger-tagsync-sync.sh 
b/tagsync/scripts/ranger-tagsync-sync.sh
new file mode 100755
index 0000000..f8990e3
--- /dev/null
+++ b/tagsync/scripts/ranger-tagsync-sync.sh
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [[ -z $1 ]]; then
+        echo "No argument provided.."
+        echo "Usage: $0 {FILE | ATLAS_DOWNLOAD}"
+        exit -1;
+fi
+action=$1
+action=`echo $action | tr '[:lower:]' '[:upper:]'`
+realScriptPath=`readlink -f $0`
+realScriptDir=`dirname $realScriptPath`
+cd $realScriptDir
+cdir=`pwd`
+
+if [ "${action}" == "FILE" ]; then
+       action=file
+elif [ "${action}" == "ATLAS_DOWNLOAD" ]; then
+       action=atlasrest
+else
+        echo "Invalid argument [$action];"
+        echo "Usage: $0 {FILE | ATLAS_DOWNLOAD}"
+        exit -1;
+fi
+
+if [ -f ${cdir}/conf/java_home.sh ]; then
+       . ${cdir}/conf/java_home.sh
+fi
+
+for custom_env_script in `find ${cdir}/conf.dist/ -name 
"ranger-tagsync-env*"`; do
+               if [ -f $custom_env_script ]; then
+                       . $custom_env_script
+        fi
+done
+
+if [ "$JAVA_HOME" != "" ]; then
+               export PATH=$JAVA_HOME/bin:$PATH
+fi
+
+logdir=/var/log/ranger/tagsync-$action
+
+if [ ! -d $logdir ]; then
+       mkdir -p $logdir
+       chmod 777 $logdir
+fi
+
+cp="${cdir}/conf:${cdir}/dist/*:${cdir}/lib/*"
+
+cd ${cdir}
+umask 0077
+java -Dproc_rangertagsync-$action ${JAVA_OPTS} -Dlogdir="${logdir}" -cp 
"${cp}" org.apache.ranger.tagsync.process.TagSynchronizer  $action > 
${logdir}/tagsync.out 2>&1
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/scripts/setup.py
----------------------------------------------------------------------
diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py
index 1c8552b..c100f73 100755
--- a/tagsync/scripts/setup.py
+++ b/tagsync/scripts/setup.py
@@ -77,8 +77,8 @@ TAGSYNC_INSTALL_PROP_PREFIX_FOR_ATLAS_RANGER_MAPPING = 
'ranger.tagsync.atlas.'
 TAGSYNC_ATLAS_CLUSTER_IDENTIFIER = '.instance.'
 TAGSYNC_INSTALL_PROP_SUFFIX_FOR_ATLAS_RANGER_MAPPING = '.ranger.service'
 TAG_SOURCE_ATLAS = 'atlas'
+TAG_SOURCE_ATLASREST = 'atlasrest'
 TAG_SOURCE_FILE = 'file'
-TAG_SOURCE_LIST = [ TAG_SOURCE_ATLAS, TAG_SOURCE_FILE ]
 
 def archiveFile(originalFileName):
     archiveDir = dirname(originalFileName)
@@ -199,20 +199,11 @@ def convertInstallPropsToXML(props):
                else:
                        print "Direct Key not found:%s" % (k)
 
-       ret['ranger.tagsync.sink.impl.class'] = 
'org.apache.ranger.sink.policymgr.TagRESTSink'
+       ret['ranger.tagsync.sink.impl.class'] = 
'org.apache.ranger.sink.policymgr.TagAdminRESTSink'
+
        if (TAG_SOURCE_KEY in ret):
-               tagSource = ret[TAG_SOURCE_KEY]
-               if (tagSource == TAG_SOURCE_ATLAS):
-                       ret['ranger.tagsync.source.impl.class'] = 'atlas'
-               elif (tagSource == TAG_SOURCE_FILE):
-                       ret['ranger.tagsync.source.impl.class'] = 'file'
-               else:
-                       print "ERROR: Invalid value (%s) defined for %s in 
install.properties. Only valid values are %s" % (tagSource, 
TAG_SOURCE_ATLAS,TAG_SOURCE_FILE)
-                       sys.exit(1)
-               del ret['TAG_SOURCE']
-       else:
-               print "ERROR: No value defined for TAG_SOURCE in 
install.properties. valid values are %s" % (TAG_SOURCE_ATLAS, TAG_SOURCE_FILE)
-               sys.exit(1)
+               ret['ranger.tagsync.source.impl.class'] = ret[TAG_SOURCE_KEY]
+               del ret[TAG_SOURCE_KEY]
 
        atlasOutFile.close()
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
new file mode 100644
index 0000000..d6baeb2
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.model;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.util.ServiceTags;
+
+public abstract  class AbstractTagSource implements TagSource {
+       private static final Log LOG = 
LogFactory.getLog(AbstractTagSource.class);
+       private TagSink tagSink;
+       protected boolean shutdown = false;
+
+       @Override
+       public void setTagSink(TagSink sink) {
+               if (sink == null) {
+                       LOG.error("Sink is null!!!");
+               } else {
+                       this.tagSink = sink;
+               }
+       }
+       @Override
+       public void synchUp() {}
+
+       public void updateSink(final ServiceTags serviceTags) {
+               if (serviceTags == null) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("No ServiceTags to upload");
+                       }
+               } else {
+                       if (LOG.isDebugEnabled()) {
+                               String serviceTagsJSON = new 
Gson().toJson(serviceTags);
+                               LOG.debug("Uploading serviceTags=" + 
serviceTagsJSON);
+                       }
+
+                       try {
+                               tagSink.uploadServiceTags(serviceTags);
+                       } catch (Exception exception) {
+                               LOG.error("uploadServiceTags() failed..", 
exception);
+                       }
+               }
+       }
+
+       public void stop() {
+               shutdown = true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
index 2df8036..7d19562 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
@@ -27,9 +27,11 @@ public interface TagSource {
 
        void setTagSink(TagSink sink);
 
-       void updateSink() throws Exception;
+       void synchUp();
 
-       Thread start();
+       boolean start();
+
+       void stop();
 
        boolean isChanged();
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index e1b5130..0dfea25 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -47,22 +47,29 @@ public class TagSyncConfig extends Configuration {
 
        private static final String 
TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP = 
"ranger.tagsync.filesource.modtime.check.interval";
 
+       private static final String 
TAGSYNC_ATLAS_REST_SOURCE_DOWNLOAD_INTERVAL_PROP = 
"ranger.tagsync.atlasrestsource.download.interval";
+
        private static final String TAGSYNC_SOURCE_CLASS_PROP = 
"ranger.tagsync.source.impl.class";
 
        private static final String TAGSYNC_SINK_CLASS_PROP = 
"ranger.tagsync.sink.impl.class";
 
-       private static final String TAGSYNC_ATLASSOURCE_ENDPOINT_PROP = 
"ranger.tagsync.atlassource.endpoint";
+       private static final String TAGSYNC_ATLASSOURCE_ENDPOINT_PROP = 
"ranger.tagsync.atlasrestsource.endpoint";
 
-       private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = 
"ranger.tagsync.atlas.";
+       public static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = 
"ranger.tagsync.atlas.";
 
-       private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = 
".ranger.service";
+       public static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = 
".ranger.service";
 
-       private static final String 
TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR = "_";
+       public static final String 
TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR = "_";
 
        private static final String TAGSYNC_TAGADMIN_KEYSTORE_PROP = 
"ranger.tagsync.tagadmin.keystore";
        private static final String TAGSYNC_TAGADMIN_ALIAS_PROP = 
"ranger.tagsync.tagadmin.alias";
        private static final String TAGSYNC_TAGADMIN_PASSWORD_PROP = 
"ranger.tagsync.tagadmin.password";
        private static final String DEFAULT_TAGADMIN_USERNAME = "rangertagsync";
+       private static final String 
TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP = 
"ranger.tagsync.source.atlas.custom.resource.mappers";
+
+       private static final long 
DEFAULT_TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL = 60000;
+
+       private static final long DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL 
= 900000;
 
        public static TagSyncConfig getInstance() {
                TagSyncConfig newConfig = new TagSyncConfig();
@@ -158,7 +165,7 @@ public class TagSyncConfig extends Configuration {
 
        static public boolean isTagSyncEnabled(Properties prop) {
                String val = prop.getProperty(TAGSYNC_ENABLED_PROP);
-               return !(val != null && val.trim().equalsIgnoreCase("falae"));
+               return !(val != null && val.trim().equalsIgnoreCase("false"));
        }
 
        static public String getTagSyncLogdir(Properties prop) {
@@ -168,17 +175,39 @@ public class TagSyncConfig extends Configuration {
 
        static public long 
getTagSourceFileModTimeCheckIntervalInMillis(Properties prop) {
                String val = 
prop.getProperty(TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL_PROP);
-               return Long.valueOf(val);
+               long ret = DEFAULT_TAGSYNC_FILESOURCE_MOD_TIME_CHECK_INTERVAL;
+               if (StringUtils.isNotBlank(val)) {
+                       try {
+                               ret = Long.valueOf(val);
+                       } catch (NumberFormatException exception) {
+                               // Ignore
+                       }
+               }
+               return ret;
+       }
+
+       static public long getTagSourceAtlasDownloadIntervalInMillis(Properties 
prop) {
+               String val = 
prop.getProperty(TAGSYNC_ATLAS_REST_SOURCE_DOWNLOAD_INTERVAL_PROP);
+               long ret = DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL;
+               if (StringUtils.isNotBlank(val)) {
+                       try {
+                               ret = Long.valueOf(val);
+                       } catch (NumberFormatException exception) {
+                               // Ignore
+                       }
+               }
+               return ret;
        }
 
-       static public String getTagSourceClassName(Properties prop) {
-               String val = prop.getProperty(TAGSYNC_SOURCE_CLASS_PROP);
-               if (StringUtils.equalsIgnoreCase(val, "atlas")) {
-                       return 
"org.apache.ranger.tagsync.source.atlas.TagAtlasSource";
-               } else if (StringUtils.equalsIgnoreCase(val, "file")) {
-                       return 
"org.apache.ranger.tagsync.source.file.TagFileSource";
+       static public String getTagSourceClassName(String sourceName) {
+               if (StringUtils.equalsIgnoreCase(sourceName, "atlas")) {
+                       return 
"org.apache.ranger.tagsync.source.atlas.AtlasTagSource";
+               } else if (StringUtils.equalsIgnoreCase(sourceName, "file")) {
+                       return 
"org.apache.ranger.tagsync.source.file.FileTagSource";
+               } else if (StringUtils.equalsIgnoreCase(sourceName, 
"atlasrest")) {
+                       return 
"org.apache.ranger.tagsync.source.atlasrest.AtlasRESTTagSource";
                } else
-                       return val;
+                       return sourceName;
        }
 
        static public String getTagSource(Properties prop) {
@@ -188,7 +217,7 @@ public class TagSyncConfig extends Configuration {
        static public String getTagSinkClassName(Properties prop) {
                String val = prop.getProperty(TAGSYNC_SINK_CLASS_PROP);
                if (StringUtils.equalsIgnoreCase(val, "tagadmin")) {
-                       return 
"org.apache.ranger.tagsync.sink.tagadmin.TagRESTSink";
+                       return 
"org.apache.ranger.tagsync.sink.tagadmin.TagAdminRESTSink";
                } else
                        return val;
        }
@@ -250,15 +279,8 @@ public class TagSyncConfig extends Configuration {
                return "";
        }
 
-       static public String getServiceName(String componentName, String 
instanceName, Properties prop) {
-               String propName = TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + 
componentName
-                               + ".instance." + instanceName
-                               + TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX;
-               String val = prop.getProperty(propName);
-               if (StringUtils.isBlank(val)) {
-                       val = instanceName + 
TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + componentName;
-               }
-               return val;
+       static public String getCustomAtlasResourceMappers(Properties prop) {
+               return 
prop.getProperty(TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP);
        }
 
        private TagSyncConfig() {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 7bae973..c1a1c39 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -19,49 +19,50 @@
 
 package org.apache.ranger.tagsync.process;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
 
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 public class TagSynchronizer {
 
        private static final Logger LOG = 
Logger.getLogger(TagSynchronizer.class);
 
        private boolean shutdownFlag = false;
-       private TagSource tagSource = null;
+       private List<TagSource> tagSources;
        private Properties properties = null;
 
        public static void main(String[] args) {
 
-               boolean tagSynchronizerInitialized = false;
                TagSynchronizer tagSynchronizer = new TagSynchronizer();
 
-               while (!tagSynchronizerInitialized) {
+               TagSyncConfig config = TagSyncConfig.getInstance();
+               Properties props = config.getProperties();
 
-                       TagSyncConfig config = TagSyncConfig.getInstance();
-                       Properties props = config.getProperties();
+               tagSynchronizer.setProperties(props);
 
-                       tagSynchronizer.setProperties(props);
-                       tagSynchronizerInitialized = 
tagSynchronizer.initialize();
+               String tagSourceName = null;
+               if (args.length > 0) {
+                       tagSourceName = args[0];
+                       LOG.info("TagSource is set to " + args[0]);
+               }
 
-                       if (!tagSynchronizerInitialized) {
-                               LOG.error("TagSynchronizer failed to initialize 
correctly");
+               boolean tagSynchronizerInitialized = 
tagSynchronizer.initialize(tagSourceName);
 
-                               try {
-                                       LOG.error("Sleeping for [60] seconds 
before attempting to re-read configuration XML files");
-                                       Thread.sleep(60 * 1000);
-                               } catch (InterruptedException e) {
-                                       LOG.error("Failed to wait for [60] 
seconds", e);
-                               }
+               if (tagSynchronizerInitialized) {
+                       if (StringUtils.isNotBlank(tagSourceName)) {
+                               tagSynchronizer.getFirstTagSource().synchUp();
+                       } else {
+                               tagSynchronizer.run();
                        }
+               } else {
+                       LOG.error("TagSynchronizer failed to initialize 
correctly, exiting..");
+                       System.exit(-1);
                }
-
-               tagSynchronizer.run();
        }
 
        public TagSynchronizer() {
@@ -80,64 +81,124 @@ public class TagSynchronizer {
                }
        }
 
-       public boolean initialize() {
+       public boolean initialize(String source) {
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagSynchronizer.initialize()");
+                       LOG.debug("==> TagSynchronizer.initialize(" + source + 
")");
                }
 
                printConfigurationProperties();
 
-               boolean ret = true;
+               boolean ret = false;
+
+               String tagSourceNames = StringUtils.isNotBlank(source) ? source 
:
+                               TagSyncConfig.getTagSource(properties);
+
+               if (StringUtils.isNotBlank(tagSourceNames)) {
+
+                       LOG.info("Initializing TAG source and sink");
 
-               String tagSourceName = TagSyncConfig.getTagSource(properties);
+                       TagSink tagSink = initializeTagSink();
 
-               if (StringUtils.isBlank(tagSourceName) ||
-                               (!StringUtils.equalsIgnoreCase(tagSourceName, 
"file") && !StringUtils.equalsIgnoreCase(tagSourceName, "atlas"))) {
-                       ret = false;
-                       LOG.error("'ranger.tagsync.source.impl.class' value is 
invalid!, 'ranger.tagsync.source.impl.class'=" + tagSourceName + ". Supported 
'ranger.tagsync.source.impl.class' values are : file, atlas");
+                       if (tagSink != null) {
+
+                               tagSources = 
initializeTagSources(tagSourceNames);
+
+                               if (CollectionUtils.isNotEmpty(tagSources)) {
+                                       for (TagSource tagSource : tagSources) {
+                                               tagSource.setTagSink(tagSink);
+                                       }
+                                       ret = true;
+                               }
+                       }
+               } else {
+                       LOG.error("'ranger.tagsync.source.impl.class' value is 
not specified or is empty!");
                }
 
-               if (ret) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagSynchronizer.initialize(" + 
tagSourceNames + ") : " + ret);
+               }
+
+               return ret;
+       }
+
+       public TagSink initializeTagSink() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagSynchronizer.initializeTagSink()");
+               }
 
-                       try {
-                               LOG.info("Initializing TAG source and sink");
-                               // Initialize tagSink and tagSource
-                               String tagSourceClassName = 
TagSyncConfig.getTagSourceClassName(properties);
-                               String tagSinkClassName = 
TagSyncConfig.getTagSinkClassName(properties);
+               TagSink ret = null;
 
+               try {
+                       String tagSinkClassName = 
TagSyncConfig.getTagSinkClassName(properties);
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("tagSinkClassName=" + 
tagSinkClassName);
+                       }
+                       @SuppressWarnings("unchecked")
+                       Class<TagSink> tagSinkClass = (Class<TagSink>) 
Class.forName(tagSinkClassName);
+
+                       ret = tagSinkClass.newInstance();
+
+                       if (!ret.initialize(properties)) {
+                               LOG.error("Failed to initialize TAG sink " + 
tagSinkClassName);
+
+                               ret = null;
+                       }
+               } catch (Throwable t) {
+                       LOG.error("Failed to initialize TAG sink. Error 
details: ", t);
+                       ret = null;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagSynchronizer.initializeTagSink()");
+               }
+               return ret;
+       }
+
+       public List<TagSource> initializeTagSources(String tagSourceNames) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagSynchronizer.initializeTagSources(" + 
tagSourceNames + ")");
+               }
+
+               List<TagSource> ret = new ArrayList<TagSource>();
+
+               String[] tagSourcesArray = tagSourceNames.split(",");
+
+               List<String> tagSourceList = Arrays.asList(tagSourcesArray);
+
+               try {
+                       for (String tagSourceName : tagSourceList) {
+
+                               String tagSourceClassName = 
TagSyncConfig.getTagSourceClassName(tagSourceName.trim());
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("tagSourceClassName=" + 
tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName);
+                                       LOG.debug("tagSourceClassName=" + 
tagSourceClassName);
                                }
 
                                @SuppressWarnings("unchecked")
                                Class<TagSource> tagSourceClass = 
(Class<TagSource>) Class.forName(tagSourceClassName);
-
-                               @SuppressWarnings("unchecked")
-                               Class<TagSink> tagSinkClass = (Class<TagSink>) 
Class.forName(tagSinkClassName);
-
-                               TagSink tagSink = tagSinkClass.newInstance();
-                               tagSource = tagSourceClass.newInstance();
+                               TagSource tagSource = 
tagSourceClass.newInstance();
 
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Created instance of " + 
tagSourceClassName + ", " + tagSinkClassName);
+                                       LOG.debug("Created instance of " + 
tagSourceClassName);
                                }
 
-                               ret = tagSink.initialize(properties) && 
tagSource.initialize(properties);
+                               if (!tagSource.initialize(properties)) {
+                                       LOG.error("Failed to initialize TAG 
source " + tagSourceClassName);
 
-                               if (ret) {
-                                       tagSource.setTagSink(tagSink);
+                                       ret.clear();
+                                       break;
                                }
-
-                               LOG.info("Done initializing TAG source and 
sink");
-                       } catch (Throwable t) {
-                               LOG.error("Failed to initialize TAG 
source/sink. Error details: ", t);
-                               ret = false;
+                               ret.add(tagSource);
                        }
+
+               } catch (Throwable t) {
+                       LOG.error("Failed to initialize TAG sources. Error 
details: ", t);
+                       ret.clear();
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagSynchronizer.initialize(), result=" + 
ret);
+                       LOG.debug("<== TagSynchronizer.initializeTagSources(" + 
tagSourceNames + ")");
                }
 
                return ret;
@@ -150,35 +211,31 @@ public class TagSynchronizer {
 
                long shutdownCheckIntervalInMs = 60*1000;
 
-               Thread tagSourceThread = null;
+               boolean tagSourcesStarted = true;
 
                try {
-                       tagSourceThread = tagSource.start();
+                       for (TagSource tagSource : tagSources) {
+                               tagSourcesStarted = tagSourcesStarted && 
tagSource.start();
+                       }
 
-                       if (tagSourceThread != null) {
+                       if (tagSourcesStarted) {
                                while (!shutdownFlag) {
                                        try {
                                                LOG.debug("Sleeping for [" + 
shutdownCheckIntervalInMs + "] milliSeconds");
                                                
Thread.sleep(shutdownCheckIntervalInMs);
                                        } catch (InterruptedException e) {
-                                               LOG.error("Failed to wait for 
[" + shutdownCheckIntervalInMs + "] milliseconds before attempting to 
synchronize tag information", e);
+                                               LOG.error("Failed to wait for 
[" + shutdownCheckIntervalInMs + "] milliseconds before attempting to 
synchronize tag information ", e);
+                                               break;
                                        }
                                }
                        }
                } catch (Throwable t) {
-                       LOG.error("tag-sync thread got an error", t);
+                       LOG.error("tag-sync main thread got an error", t);
                } finally {
-                       if (tagSourceThread != null) {
-                               LOG.error("Shutting down the tag-sync thread");
-                               LOG.info("Interrupting tagSourceThread...");
-                               tagSourceThread.interrupt();
-                               try {
-                                       tagSourceThread.join();
-                               } catch (InterruptedException 
interruptedException) {
-                                       LOG.error("tagSourceThread.join() was 
interrupted");
-                               }
-                       } else {
-                               LOG.error("Could not start tagSource monitoring 
thread");
+                       LOG.info("Stopping all tagSources");
+
+                       for (TagSource tagSource : tagSources) {
+                               tagSource.stop();
                        }
                }
 
@@ -192,6 +249,10 @@ public class TagSynchronizer {
                this.shutdownFlag = true;
        }
 
+       public TagSource getFirstTagSource() {
+               return tagSources.get(0);
+       }
+
        public void printConfigurationProperties() {
                LOG.info("--------------------------------");
                LOG.info("");

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
new file mode 100644
index 0000000..1541034
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.sink.tagadmin;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.plugin.util.SearchFilter;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TagAdminRESTSink implements TagSink {
+       private static final Log LOG = 
LogFactory.getLog(TagAdminRESTSink.class);
+
+       private static final String REST_PREFIX = "/service";
+       private static final String MODULE_PREFIX = "/tags";
+
+       private static final String REST_MIME_TYPE_JSON = "application/json" ;
+
+       private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
+
+       private RangerRESTClient tagRESTClient = null;
+
+       @Override
+       public boolean initialize(Properties properties) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAdminRESTSink.initialize()");
+               }
+
+               boolean ret = true;
+
+               String restUrl       = 
TagSyncConfig.getTagAdminRESTUrl(properties);
+               String sslConfigFile = 
TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
+               String userName = TagSyncConfig.getTagAdminUserName(properties);
+               String password = TagSyncConfig.getTagAdminPassword(properties);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("restUrl=" + restUrl);
+                       LOG.debug("sslConfigFile=" + sslConfigFile);
+                       LOG.debug("userName=" + userName);
+               }
+
+               if (StringUtils.isBlank(restUrl)) {
+                       ret = false;
+                       LOG.error("No value specified for property 
'ranger.tagsync.tagadmin.rest.url'!");
+               } else {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("ranger.tagsync.tagadmin.rest.url:" + 
restUrl);
+                       }
+               }
+
+               if (ret) {
+                       tagRESTClient = new RangerRESTClient(restUrl, 
sslConfigFile);
+                       tagRESTClient.setBasicAuthInfo(userName, password);
+
+                       ret = testConnection();
+               }
+
+               if (!ret) {
+                       LOG.error("Cannot connect to Tag Admin. Please recheck 
configuration properties and/or check if Tag Admin is running and responsive");
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagAdminRESTSink.initialize(), result=" 
+ ret);
+               }
+
+               return ret;
+       }
+
+       public boolean testConnection() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagAdminRESTSink.testConnection()");
+               }
+
+               boolean ret = true;
+
+               try {
+                       // build a dummy serviceTags structure and upload it to 
test connectivity
+                       ServiceTags serviceTags = new ServiceTags();
+                       serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+                       uploadServiceTags(serviceTags);
+               } catch (Exception exception) {
+                       LOG.error("test-upload of serviceTags failed.", 
exception);
+                       ret = false;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagAdminRESTSink.testConnection(), 
result=" + ret);
+               }
+               return ret;
+       }
+
+       @Override
+       synchronized public void uploadServiceTags(ServiceTags serviceTags) 
throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> uploadServiceTags()");
+               }
+               WebResource webResource = 
createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE);
+
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class,
 tagRESTClient.toJson(serviceTags));
+
+               if(response == null || response.getStatus() != 204) {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       LOG.error("Upload of service-tags failed with message " 
+ resp.getMessage());
+
+                       throw new Exception("Upload of service-tags failed with 
response: " + response);
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== uploadServiceTags()");
+               }
+       }
+
+       private WebResource createWebResource(String url) {
+               return createWebResource(url, null);
+       }
+
+       private WebResource createWebResource(String url, SearchFilter filter) {
+               WebResource ret = tagRESTClient.getResource(url);
+
+               if(filter != null && !MapUtils.isEmpty(filter.getParams())) {
+                       for(Map.Entry<String, String> e : 
filter.getParams().entrySet()) {
+                               String name  = e.getKey();
+                               String value = e.getValue();
+
+                               ret.queryParam(name, value);
+                       }
+               }
+
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
deleted file mode 100644
index 41085d0..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.sink.tagadmin;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.SearchFilter;
-import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class TagRESTSink implements TagSink {
-       private static final Log LOG = LogFactory.getLog(TagRESTSink.class);
-
-       private static final String REST_PREFIX = "/service";
-       private static final String MODULE_PREFIX = "/tags";
-
-       private static final String REST_MIME_TYPE_JSON = "application/json" ;
-
-       private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
-
-       private RangerRESTClient tagRESTClient = null;
-
-       @Override
-       public boolean initialize(Properties properties) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagRESTSink.initialize()");
-               }
-
-               boolean ret = true;
-
-               String restUrl       = 
TagSyncConfig.getTagAdminRESTUrl(properties);
-               String sslConfigFile = 
TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
-               String userName = TagSyncConfig.getTagAdminUserName(properties);
-               String password = TagSyncConfig.getTagAdminPassword(properties);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("restUrl=" + restUrl);
-                       LOG.debug("sslConfigFile=" + sslConfigFile);
-                       LOG.debug("userName=" + userName);
-               }
-
-               if (StringUtils.isBlank(restUrl)) {
-                       ret = false;
-                       LOG.error("No value specified for property 
'ranger.tagsync.tagadmin.rest.url'!");
-               } else {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("ranger.tagsync.tagadmin.rest.url:" + 
restUrl);
-                       }
-               }
-
-               if (ret) {
-                       tagRESTClient = new RangerRESTClient(restUrl, 
sslConfigFile);
-                       tagRESTClient.setBasicAuthInfo(userName, password);
-
-                       ret = testConnection();
-               }
-
-               if (!ret) {
-                       LOG.error("Cannot connect to Tag Admin. Please recheck 
configuration properties and/or check if Tag Admin is running and responsive");
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagRESTSink.initialize(), result=" + 
ret);
-               }
-
-               return ret;
-       }
-
-       public boolean testConnection() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagRESTSink.testConnection()");
-               }
-
-               boolean ret = true;
-
-               try {
-                       // build a dummy serviceTags structure and upload it to 
test connectivity
-                       ServiceTags serviceTags = new ServiceTags();
-                       serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-                       uploadServiceTags(serviceTags);
-               } catch (Exception exception) {
-                       LOG.error("test-upload of serviceTags failed.", 
exception);
-                       ret = false;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagRESTSink.testConnection(), result=" + 
ret);
-               }
-               return ret;
-       }
-
-       @Override
-       public void uploadServiceTags(ServiceTags serviceTags) throws Exception 
{
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> uploadServiceTags()");
-               }
-               WebResource webResource = 
createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE);
-
-               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class,
 tagRESTClient.toJson(serviceTags));
-
-               if(response == null || response.getStatus() != 204) {
-                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-
-                       LOG.error("Upload of service-tags failed with message " 
+ resp.getMessage());
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== uploadServiceTags()");
-               }
-       }
-
-       private WebResource createWebResource(String url) {
-               return createWebResource(url, null);
-       }
-
-       private WebResource createWebResource(String url, SearchFilter filter) {
-               WebResource ret = tagRESTClient.getResource(url);
-
-               if(filter != null && !MapUtils.isEmpty(filter.getParams())) {
-                       for(Map.Entry<String, String> e : 
filter.getParams().entrySet()) {
-                               String name  = e.getKey();
-                               String value = e.getValue();
-
-                               ret.queryParam(name, value);
-                       }
-               }
-
-               return ret;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
new file mode 100644
index 0000000..a17d611
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class AtlasHiveResourceMapper extends AtlasResourceMapper {
+       private static final Log LOG = 
LogFactory.getLog(AtlasHiveResourceMapper.class);
+
+       public static final String COMPONENT_NAME = "hive";
+
+       public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+       public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+       public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+
+       public static final String RANGER_TYPE_HIVE_DB = "database";
+       public static final String RANGER_TYPE_HIVE_TABLE = "table";
+       public static final String RANGER_TYPE_HIVE_COLUMN = "column";
+
+       public static final String 
ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name";
+
+       static protected final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
+
+       private static String clusterDelimiter = "@";
+
+       private static String qualifiedNameDelimiter = "\\.";
+
+       public static final String[] supportedEntityTypes = { 
ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN };
+
+       public AtlasHiveResourceMapper() {
+               super();
+       }
+
+       @Override
+       public List<String> getSupportedEntityTypes() {
+               return Arrays.asList(supportedEntityTypes);
+       }
+
+       @Override
+       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
+
+               Map<String, RangerPolicy.RangerPolicyResource> elements = new 
HashMap<String, RangerPolicy.RangerPolicyResource>();
+
+               String serviceName = null;
+
+               List<String> components = getQualifiedNameComponents(entity);
+               // components should contain qualifiedName, clusterName, 
dbName, tableName, columnName in that order
+
+               String entityTypeName = entity.getTypeName();
+
+               String qualifiedName = components.get(0);
+
+               String clusterName, dbName, tableName, columnName;
+
+               if (components.size() > 1) {
+                       clusterName = components.get(1);
+                       serviceName = getRangerServiceName(clusterName);
+               }
+
+               if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) {
+                       if (components.size() > 2) {
+                               dbName = components.get(2);
+                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+
+                       } else {
+                               LOG.error("invalid qualifiedName for HIVE_DB, 
qualifiedName=" + qualifiedName);
+                       }
+               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE)) {
+                       if (components.size() > 3) {
+                               dbName = components.get(2);
+                               tableName = components.get(3);
+                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
+                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
+                       } else {
+                               LOG.error("invalid qualifiedName for 
HIVE_TABLE, qualifiedName=" + qualifiedName);
+                       }
+               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
+                       if (components.size() > 4) {
+                               dbName = components.get(2);
+                               tableName = components.get(3);
+                               columnName = components.get(4);
+                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
+                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
+                               RangerPolicy.RangerPolicyResource 
columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName);
+                               elements.put(RANGER_TYPE_HIVE_COLUMN, 
columnPolicyResource);
+                       } else {
+                               LOG.error("invalid qualifiedName for 
HIVE_COLUMN, qualifiedName=" + qualifiedName);
+                       }
+
+               }
+
+               RangerServiceResource ret = new RangerServiceResource();
+
+               ret.setGuid(entity.getId()._getId());
+               ret.setServiceName(serviceName);
+               ret.setResourceElements(elements);
+
+               return ret;
+       }
+
+       public String getRangerServiceName(String clusterName) {
+               String ret = getRangerServiceName(COMPONENT_NAME, clusterName);
+
+               if (StringUtils.isBlank(ret)) {
+                       ret = clusterName + 
TagSyncConfig.TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + 
COMPONENT_NAME;
+               }
+               return ret;
+       }
+
+       public final List<String> 
getQualifiedNameComponents(IReferenceableInstance entity) throws Exception {
+
+               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entity.getTypeName());
+
+               String qualifiedName = getEntityAttribute(entity, 
qualifiedNameAttributeName, String.class);
+
+               List<String> ret = 
getQualifiedNameComponents(entity.getTypeName(), qualifiedName);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("----- Entity-Id:" + entity.getId()._getId());
+                       LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
+                       LOG.debug("-----        Entity-Components -----");
+                       int i = 0;
+                       for (String value : ret) {
+                               LOG.debug("-----                Index:" + i++ + 
"       Value:" + value);
+                       }
+               }
+               return ret;
+       }
+
+       public final List<String> getQualifiedNameComponents(String 
entityTypeName, String qualifiedName) throws Exception {
+
+               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entityTypeName);
+
+               if (StringUtils.isBlank(qualifiedName)) {
+                       throw new Exception("Could not get a valid value for " 
+ qualifiedNameAttributeName + " attribute from entity.");
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Received .... " + qualifiedNameAttributeName 
+ "=" + qualifiedName + " for entity type " + entityTypeName);
+               }
+
+               String components[] = qualifiedName.split(clusterDelimiter);
+
+               if (components.length != 2) {
+                       throw new Exception("Qualified Name does not contain 
cluster-name, qualifiedName=" + qualifiedName);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("name-hierarchy=" + components[0] + ", 
cluster-name=" + components[1]);
+               }
+
+               String nameHierarchy[] = 
components[0].split(qualifiedNameDelimiter);
+
+               List<String> ret = new ArrayList<String>();
+
+               ret.add(qualifiedName);
+               ret.add(components[1]);
+
+               ret.addAll(Arrays.asList(nameHierarchy));
+
+               return ret;
+       }
+
+       public String getQualifiedNameAttributeName(String entityTypeName) {
+               return  StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE) ?
+                               ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE 
: ENTITY_ATTRIBUTE_QUALIFIED_NAME;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
new file mode 100644
index 0000000..fd94928
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.Properties;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AtlasResourceMapper {
+       private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapper.class);
+
+       protected Properties properties;
+
+       public AtlasResourceMapper() {
+       }
+
+       public void initialize(Properties properties) {
+               this.properties = properties;
+       }
+
+       abstract public List<String> getSupportedEntityTypes();
+
+       abstract public RangerServiceResource buildResource(final 
IReferenceableInstance entity) throws Exception;
+
+
+       protected String getRangerServiceName(String componentName, String 
atlasInstanceName) {
+               String propName = 
TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName
+                               + ".instance." + atlasInstanceName
+                               + 
TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX;
+
+               return properties.getProperty(propName);
+       }
+
+       static protected <T> T getEntityAttribute(IReferenceableInstance 
entity, String name, Class<T> type) {
+               T ret = null;
+
+               try {
+                       Map<String, Object> valueMap = entity.getValuesMap();
+                       ret = getAttribute(valueMap, name, type);
+               } catch (AtlasException exception) {
+                       LOG.error("Cannot get map of values for entity: " + 
entity.getId()._getId(), exception);
+               }
+
+               return ret;
+       }
+
+       static protected <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
+               return type.cast(map.get(name));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
new file mode 100644
index 0000000..f05d814
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for th
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+public class AtlasResourceMapperUtil {
+       private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapperUtil.class);
+
+       private static Map<String, AtlasResourceMapper> atlasResourceMappers = 
new HashMap<String, AtlasResourceMapper>();
+
+       public static boolean isEntityTypeHandled(String entityTypeName) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ")");
+               }
+
+               AtlasResourceMapper mapper = 
atlasResourceMappers.get(entityTypeName);
+
+               boolean ret = mapper != null;
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ") : " + ret);
+               }
+
+               return ret;
+       }
+
+       public static RangerServiceResource 
getRangerServiceResource(IReferenceableInstance atlasEntity) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getRangerServiceResource(" + 
atlasEntity.getId()._getId() +")");
+               }
+
+               RangerServiceResource resource = null;
+
+               AtlasResourceMapper mapper = 
atlasResourceMappers.get(atlasEntity.getTypeName());
+
+               if (mapper != null) {
+                       try {
+                               resource = mapper.buildResource(atlasEntity);
+                       } catch (Exception exception) {
+                               LOG.error("Could not get serviceResource for 
atlas entity:" + atlasEntity.getId()._getId() + ": ", exception);
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== getRangerServiceResource(" + 
atlasEntity.getId()._getId() +"): resource=" + resource);
+               }
+
+               return resource;
+       }
+
+       static public boolean initializeAtlasResourceMappers(Properties 
properties) {
+               final String MAPPER_NAME_DELIMIER = ",";
+
+               String customMapperNames = 
TagSyncConfig.getCustomAtlasResourceMappers(properties);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
customMapperNames + ")");
+               }
+               boolean ret = true;
+
+               String allResourceMappers = 
"org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper";
+
+               if (StringUtils.isNotBlank(customMapperNames)) {
+                       allResourceMappers = allResourceMappers + 
MAPPER_NAME_DELIMIER + customMapperNames;
+               }
+
+               String[] mapperNamesArray = 
allResourceMappers.split(MAPPER_NAME_DELIMIER);
+
+               List<String> mapperNames = Arrays.asList(mapperNamesArray);
+
+               for (String mapperName : mapperNames) {
+                       mapperName = mapperName.trim();
+                       try {
+                               Class clazz = Class.forName(mapperName);
+                               AtlasResourceMapper resourceMapper = 
(AtlasResourceMapper) clazz.newInstance();
+
+                               resourceMapper.initialize(properties);
+
+                               for (String entityTypeName : 
resourceMapper.getSupportedEntityTypes()) {
+                                       add(entityTypeName, resourceMapper);
+                               }
+
+                       } catch (Exception exception) {
+                               LOG.error("Failed to create 
AtlasResourceMapper:" + mapperName + ": ", exception);
+                               ret = false;
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
allResourceMappers + "): " + ret);
+               }
+               return ret;
+       }
+
+       private static void add(String entityType, AtlasResourceMapper mapper) {
+               atlasResourceMappers.put(entityType, mapper);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
new file mode 100644
index 0000000..7694b37
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provider;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.EntityNotification;
+
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+public class AtlasTagSource extends AbstractTagSource {
+       private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
+
+       public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"application.properties";
+
+       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.kafka.bootstrap.servers";
+       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
+       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
+
+       private ConsumerRunnable consumerTask;
+
+       @Override
+       public boolean initialize(Properties properties) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasTagSource.initialize()");
+               }
+
+               Properties atlasProperties = new Properties();
+
+               boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
+
+               if (ret) {
+
+                       InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
+
+                       if (inputStream != null) {
+                               try {
+                                       atlasProperties.load(inputStream);
+                               } catch (Exception exception) {
+                                       ret = false;
+                                       LOG.error("Cannot load Atlas 
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, 
exception);
+                               } finally {
+                                       try {
+                                               inputStream.close();
+                                       } catch (IOException ioException) {
+                                               LOG.error("Cannot close Atlas 
application properties file, file-name:\" + 
TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
+                                       }
+                               }
+                       } else {
+                               ret = false;
+                               LOG.error("Cannot find Atlas application 
properties file");
+                       }
+               }
+
+               if (ret) {
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
+                       }
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
+                       }
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
+                       }
+               }
+
+               if (ret) {
+                       NotificationModule notificationModule = new 
NotificationModule();
+
+                       Injector injector = 
Guice.createInjector(notificationModule);
+
+                       Provider<NotificationInterface> consumerProvider = 
injector.getProvider(NotificationInterface.class);
+                       NotificationInterface notification = 
consumerProvider.get();
+                       List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
+
+                       consumerTask = new ConsumerRunnable(iterators.get(0));
+
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AtlasTagSource.initialize(), result=" + 
ret);
+               }
+               return ret;
+       }
+
+       @Override
+       public boolean start() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasTagSource.start()");
+               }
+               Thread consumerThread = null;
+               if (consumerTask == null) {
+                       LOG.error("No consumerTask!!!");
+               } else {
+                       consumerThread = new Thread(consumerTask);
+                       consumerThread.setDaemon(true);
+                       consumerThread.start();
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AtlasTagSource.start()");
+               }
+               return consumerThread != null;
+       }
+
+       @Override
+       public boolean isChanged() {
+               return true;
+       }
+
+       private static void printEntityNotification(EntityNotification 
notification) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Notification-Type: " + 
notification.getOperationType());
+                       AtlasEntityWithTraits entityWithTraits = new 
AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits());
+                       LOG.debug(entityWithTraits);
+               }
+       }
+
+       private class ConsumerRunnable implements Runnable {
+
+               private final NotificationConsumer<EntityNotification> consumer;
+
+               private 
ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
+                       this.consumer = consumer;
+               }
+
+               private boolean hasNext() {
+                       boolean ret = false;
+                       try {
+                               ret = consumer.hasNext();
+                       } catch (Exception exception) {
+                               LOG.error("EntityNotification consumer threw 
exception, IGNORING...:", exception);
+                       }
+                       return ret;
+               }
+
+               @Override
+               public void run() {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("==> ConsumerRunnable.run()");
+                       }
+                       while (!shutdown) {
+                               if (hasNext()) {
+                                       EntityNotification notification = 
consumer.next();
+                                       if (notification != null) {
+                                               
printEntityNotification(notification);
+
+                                               ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification);
+                                               if (serviceTags == null) {
+                                                       if 
(LOG.isDebugEnabled()) {
+                                                               LOG.debug("Did 
not create ServiceTags structure for notification type:" + 
notification.getOperationType());
+                                                       }
+                                               } else {
+                                                       updateSink(serviceTags);
+                                               }
+                                       }
+                               }
+                       }
+                       LOG.info("Shutting down the Tag-Atlas-source thread");
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
new file mode 100644
index 0000000..e6a4b36
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class AtlasEntityWithTraits {
+
+       private final IReferenceableInstance entity;
+       private final List<IStruct> traits;
+
+       public AtlasEntityWithTraits(IReferenceableInstance entity, 
List<IStruct> traits) {
+               this.entity = entity;
+               this.traits = traits;
+       }
+
+       public IReferenceableInstance getEntity() {
+               return entity;
+       }
+
+       public List<IStruct> getAllTraits() {
+               return traits == null ? new LinkedList<IStruct>() : traits;
+       }
+
+       @Override
+       public String toString( ) {
+               StringBuilder sb = new StringBuilder();
+
+               toString(sb);
+
+               return sb.toString();
+       }
+
+       public void toString(StringBuilder sb) {
+
+               sb.append("AtlasEntityWithTraits={ ");
+
+               sb.append("Entity-Id: " + entity.getId()._getId()).append(", ")
+                               .append("Entity-Type: " + 
entity.getTypeName()).append(", ");
+
+               sb.append("Entity-Values={ ");
+               try {
+                       for (Map.Entry<String, Object> entry : 
entity.getValuesMap().entrySet()) {
+                               sb.append("{").append(entry.getKey()).append(", 
").append(entry.getValue()).append("}, ");
+                       }
+               } catch (AtlasException exception) {
+                               // Ignore
+               }
+               sb.append(" }");
+
+               sb.append(", Entity-Traits={ ");
+               for (IStruct trait : traits) {
+                       try {
+                               
sb.append("{traitType=").append(trait.getTypeName()).append(", ");
+                               Map<String, Object> traitValues = 
trait.getValuesMap();
+                               sb.append("{");
+                               for (Map.Entry<String, Object> valueEntry : 
traitValues.entrySet()) {
+                                       
sb.append("{").append(valueEntry.getKey()).append(", 
").append(valueEntry.getValue()).append("}");
+                               }
+                               sb.append("}");
+
+                               sb.append(" }");
+                       } catch (AtlasException exception) {
+                               // Ignore
+                       }
+               }
+               sb.append(" }");
+
+               sb.append(" }");
+
+       }
+
+}

Reply via email to