Repository: incubator-ranger
Updated Branches:
  refs/heads/master fec84603f -> 5b9c094ff


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 666c2c8..803a8a9 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -23,76 +23,57 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerServiceResource;
 import org.apache.ranger.plugin.model.RangerTag;
 import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
 
 import java.util.*;
 
 public class AtlasNotificationMapper {
        private static final Log LOG = 
LogFactory.getLog(AtlasNotificationMapper.class);
 
-       public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
-       public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
-       public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
-
-       public static final String RANGER_TYPE_HIVE_DB = "database";
-       public static final String RANGER_TYPE_HIVE_TABLE = "table";
-       public static final String RANGER_TYPE_HIVE_COLUMN = "column";
-
-       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
-       public static final String 
ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name";
-       public static final String QUALIFIED_NAME_FORMAT_DELIMITER_STRING = 
"\\.";
-       public static final String 
QUALIFIED_NAME_FORMAT_CLUSTER_DELIMITER_STRING = "@";
-
-       private static Properties properties = null;
-
-       public static ServiceTags processEntityNotification(EntityNotification 
entityNotification, Properties props) {
+       public static ServiceTags processEntityNotification(EntityNotification 
entityNotification) {
 
                ServiceTags ret = null;
-               properties = props;
 
-               try {
-                       IReferenceableInstance entity = 
entityNotification.getEntity();
-
-                       if (isEntityMappable(entity)) {
-                               ret = createServiceTags(entityNotification);
-                       } else {
-                               if(LOG.isDebugEnabled()) {
-                                       LOG.debug("Ranger not interested in 
Entity Notification for entity-type " + 
entityNotification.getEntity().getTypeName());
+               if (isNotificationHandled(entityNotification)) {
+                       try {
+                               IReferenceableInstance entity = 
entityNotification.getEntity();
+
+                               if 
(AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) {
+                                       AtlasEntityWithTraits entityWithTraits 
= new AtlasEntityWithTraits(entityNotification.getEntity(), 
entityNotification.getAllTraits());
+                                       ret = 
buildServiceTags(entityWithTraits, 1L, 1L, null);
+                               } else {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Ranger not 
interested in Entity Notification for entity-type " + 
entityNotification.getEntity().getTypeName());
+                                       }
                                }
+                       } catch (Exception exception) {
+                               LOG.error("createServiceTags() failed!! ", 
exception);
                        }
-               } catch (Exception exception) {
-                       LOG.error("createServiceTags() failed!! ", exception);
                }
                return ret;
        }
 
-       static private boolean isEntityMappable(IReferenceableInstance entity) {
-               boolean ret = false;
-
-               String entityTypeName = entity.getTypeName();
+       public static Map<String, ServiceTags> 
processEntitiesWithTraits(List<AtlasEntityWithTraits> atlasEntities) {
+               Map<String, ServiceTags> ret = null;
 
-               if (StringUtils.isNotBlank(entityTypeName)) {
-                       if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_DB) ||
-                                       StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE) ||
-                                       StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
-                               ret = true;
-                       }
+               try {
+                       ret = buildServiceTags(atlasEntities);
+               } catch (Exception exception) {
+                       LOG.error("Failed to build serviceTags", exception);
                }
+
                return ret;
        }
 
-       static private ServiceTags createServiceTags(EntityNotification 
entityNotification) throws Exception {
-
-               ServiceTags ret = null;
+       static private boolean isNotificationHandled(EntityNotification 
entityNotification) {
+               boolean ret = false;
 
                EntityNotification.OperationType opType = 
entityNotification.getOperationType();
 
@@ -101,17 +82,10 @@ public class AtlasNotificationMapper {
                                LOG.debug("ENTITY_CREATE notification is not 
handled, as Ranger will get necessary information from any subsequent 
TRAIT_ADDED notification");
                                break;
                        }
-                       case ENTITY_UPDATE: {
-                               ret = getServiceTags(entityNotification);
-                               if (MapUtils.isEmpty(ret.getTags())) {
-                                       LOG.debug("No traits associated with 
this entity update notification. Ignoring it altogether");
-                                       ret = null;
-                               }
-                               break;
-                       }
+                       case ENTITY_UPDATE:
                        case TRAIT_ADD:
                        case TRAIT_DELETE: {
-                               ret = getServiceTags(entityNotification);
+                               ret = true;
                                break;
                        }
                        default:
@@ -121,130 +95,116 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private ServiceTags getServiceTags(EntityNotification 
entityNotification) throws Exception {
-               ServiceTags ret = null;
+       static private Map<String, ServiceTags> 
buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws 
Exception {
 
-               IReferenceableInstance entity = entityNotification.getEntity();
+               Map<String, ServiceTags> ret = new HashMap<String, 
ServiceTags>();
 
-               List<RangerServiceResource> serviceResources = new 
ArrayList<RangerServiceResource>();
+               long serviceResourceIndex = 1L;
+               long tagIndex = 1L;
 
-               RangerServiceResource serviceResource = 
getServiceResource(entity);
-               serviceResources.add(serviceResource);
+               for (AtlasEntityWithTraits element : entitiesWithTraits) {
 
-               Map<Long, RangerTag> tags = getTags(entityNotification);
+                       ServiceTags serviceTags = buildServiceTags(element, 
serviceResourceIndex, tagIndex, ret);
 
-               Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
+                       serviceResourceIndex++;
 
-               Map<Long, List<Long>> resourceIdToTagIds = null;
+                       tagIndex += CollectionUtils.size(serviceTags.getTags());
 
-               resourceIdToTagIds = new HashMap<Long, List<Long>>();
-               List<Long> tagList = new ArrayList<Long>();
+               }
 
+               // Remove duplicate tag definitions
+               for (Map.Entry<String, ServiceTags> serviceTagsMapEntry : 
ret.entrySet()){
 
-               if (MapUtils.isNotEmpty(tags)) {
-                       resourceIdToTagIds = new HashMap<Long, List<Long>>();
+                       Map<Long, RangerTagDef> allTagDefs = 
serviceTagsMapEntry.getValue().getTagDefinitions();
 
-                       for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
-                               tagList.add(entry.getKey());
+                       Map<String, String> tagTypeIndex = new HashMap<String, 
String>();
+                       Map<Long, RangerTagDef> uniqueTagDefs = new 
HashMap<Long, RangerTagDef>();
+
+                       for (Map.Entry<Long, RangerTagDef> entry : 
allTagDefs.entrySet()) {
+                               String tagTypeName = entry.getValue().getName();
+
+                               if (tagTypeIndex.get(tagTypeName) == null) {
+                                       tagTypeIndex.put(tagTypeName, 
tagTypeName);
+                                       uniqueTagDefs.put(entry.getKey(), 
entry.getValue());
+                               }
                        }
+                       
serviceTagsMapEntry.getValue().setTagDefinitions(uniqueTagDefs);
                }
 
-               resourceIdToTagIds.put(1L, tagList);
+               return ret;
+       }
+
+       static private ServiceTags buildServiceTags(AtlasEntityWithTraits 
entityWithTraits, long index, long tagIndex, Map<String, ServiceTags> 
serviceTagsMap) throws Exception {
 
+               ServiceTags ret = null;
 
-               ret = new ServiceTags();
+               IReferenceableInstance entity = entityWithTraits.getEntity();
 
-               ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-               ret.setTagModel(ServiceTags.TAGMODEL_RESOURCE_PRIVATE);
-               ret.setServiceName(serviceResource.getServiceName());
-               ret.setServiceResources(serviceResources);
-               ret.setTagDefinitions(tagDefs);
-               ret.setTags(tags);
-               ret.setResourceToTagIds(resourceIdToTagIds);
+               RangerServiceResource serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
 
-               return ret;
-       }
+               if (serviceResource != null) {
 
+                       serviceResource.setId(index);
 
-       static private RangerServiceResource 
getServiceResource(IReferenceableInstance entity) throws Exception {
+                       String serviceName = serviceResource.getServiceName();
 
-               RangerServiceResource ret = null;
+                       Map<Long, RangerTag> tags = getTags(entityWithTraits, 
tagIndex);
 
-               Map<String, RangerPolicy.RangerPolicyResource> elements = null;
-               String serviceName = null;
+                       Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
 
+                       Map<Long, List<Long>> resourceIdToTagIds = null;
 
-               elements = new HashMap<String, 
RangerPolicy.RangerPolicyResource>();
+                       resourceIdToTagIds = new HashMap<Long, List<Long>>();
+                       List<Long> tagList = new ArrayList<Long>();
+
+                       if (MapUtils.isNotEmpty(tags)) {
+                               resourceIdToTagIds = new HashMap<Long, 
List<Long>>();
 
-               List<String> components = getQualifiedNameComponents(entity);
-               // components should contain qualifiedName, clusterName, 
dbName, tableName, columnName in that order
+                               for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
+                                       tagList.add(entry.getKey());
+                               }
+                       }
 
-               String entityTypeName = entity.getTypeName();
+                       resourceIdToTagIds.put(index, tagList);
 
-               String qualifiedName = components.get(0);
+                       ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
 
-               String clusterName, dbName, tableName, columnName;
+                       ret.getServiceResources().add(serviceResource);
+                       ret.getTagDefinitions().putAll(tagDefs);
+                       ret.getTags().putAll(tags);
+                       ret.getResourceToTagIds().putAll(resourceIdToTagIds);
 
-               if (components.size() > 1) {
-                       clusterName = components.get(1);
-                       serviceName = getServiceName(clusterName, 
entityTypeName);
+               } else {
+                       LOG.error("AtlasResourceMapper not found for 
entity-type:" + entity.getTypeName());
                }
 
-               if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) {
-                       if (components.size() > 2) {
-                               dbName = components.get(2);
-                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+               return ret;
+       }
+
+       static private ServiceTags createOrGetServiceTags(Map<String, 
ServiceTags> serviceTagsMap, String serviceName) {
+               ServiceTags ret = serviceTagsMap == null ? null : 
serviceTagsMap.get(serviceName);
 
-                       } else {
-                               LOG.error("invalid qualifiedName for HIVE_DB, 
qualifiedName=" + qualifiedName);
-                       }
-               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE)) {
-                       if (components.size() > 3) {
-                               dbName = components.get(2);
-                               tableName = components.get(3);
-                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
-                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
-                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
-                       } else {
-                               LOG.error("invalid qualifiedName for 
HIVE_TABLE, qualifiedName=" + qualifiedName);
-                       }
-               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
-                       if (components.size() > 4) {
-                               dbName = components.get(2);
-                               tableName = components.get(3);
-                               columnName = components.get(4);
-                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
-                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
-                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
-                               RangerPolicy.RangerPolicyResource 
columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName);
-                               elements.put(RANGER_TYPE_HIVE_COLUMN, 
columnPolicyResource);
-                       } else {
-                               LOG.error("invalid qualifiedName for 
HIVE_COLUMN, qualifiedName=" + qualifiedName);
+               if (ret == null) {
+                       ret = new ServiceTags();
+
+                       if (serviceTagsMap != null) {
+                               serviceTagsMap.put(serviceName, ret);
                        }
 
+                       ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+                       ret.setTagModel(ServiceTags.TAGMODEL_RESOURCE_PRIVATE);
+                       ret.setServiceName(serviceName);
                }
 
-
-               ret = new RangerServiceResource();
-               ret.setGuid(entity.getId()._getId());
-               ret.setId(1L);
-               ret.setServiceName(serviceName);
-               ret.setResourceElements(elements);
-
                return ret;
        }
 
-       static private Map<Long, RangerTag> getTags(EntityNotification 
entityNotification) {
+       static private Map<Long, RangerTag> getTags(AtlasEntityWithTraits 
entityWithTraits, long index) {
                Map<Long, RangerTag> ret = null;
 
                ret = new HashMap<Long, RangerTag>();
 
-               long index = 1;
-
-               List<IStruct> traits = entityNotification.getAllTraits();
+               List<IStruct> traits = entityWithTraits.getAllTraits();
 
                for (IStruct trait : traits) {
 
@@ -288,113 +248,18 @@ public class AtlasNotificationMapper {
 
                if (MapUtils.isNotEmpty(tags)) {
                        ret = new HashMap<Long, RangerTagDef>();
+
                        for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
+
                                RangerTagDef tagDef = new RangerTagDef();
                                tagDef.setName(entry.getValue().getType());
                                tagDef.setId(entry.getKey());
                                ret.put(entry.getKey(), tagDef);
-                       }
-               }
-
-               return ret;
-       }
-
-       static private String getQualifiedNameAttributeName(String 
entityTypeName) {
-               String ret = StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE) ?
-                               ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE 
: ENTITY_ATTRIBUTE_QUALIFIED_NAME;
-
-               return ret;
-       }
-
-       static private List<String> 
getQualifiedNameComponents(IReferenceableInstance entity) throws Exception {
 
-               List<String> ret = null;
-
-               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entity.getTypeName());
-
-               String qualifiedName = getEntityAttribute(entity, 
qualifiedNameAttributeName, String.class);
-
-               ret = getQualifiedNameComponents(entity.getTypeName(), 
qualifiedName);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("----- Entity-Id:" + entity.getId()._getId());
-                       LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
-                       LOG.debug("-----        Entity-Components -----");
-                       int i = 0;
-                       for (String value : ret) {
-                               LOG.debug("-----                Index:" + i++ + 
"       Value:" + value);
                        }
                }
-               return ret;
-       }
-
-       static public List<String> getQualifiedNameComponents(String 
entityTypeName, String qualifiedName) throws Exception {
-
-               List<String> ret = null;
-
-               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entityTypeName);
-
-               if (StringUtils.isBlank(qualifiedName)) {
-                       throw new Exception("Could not get a valid value for " 
+ qualifiedNameAttributeName + " attribute from entity notification.");
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Received .... " + qualifiedNameAttributeName 
+ "=" + qualifiedName + " for entity type " + entityTypeName);
-               }
-
-               String components[] = 
qualifiedName.split(QUALIFIED_NAME_FORMAT_CLUSTER_DELIMITER_STRING);
-
-               if (components == null || components.length != 2) {
-                       throw new Exception("Qualified Name does not contain 
cluster-name, qualifiedName=" + qualifiedName);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("name-hierarchy=" + components[0] + ", 
cluster-name=" + components[1]);
-               }
-
-               String nameHierarchy[] = 
components[0].split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING);
-
-               int hierarchyLevels = nameHierarchy.length;
-
-               ret = new ArrayList<String>();
-
-               ret.add(qualifiedName);
-               ret.add(components[1]);
-
-               for (int i = 0; i < hierarchyLevels; i++) {
-                       ret.add(nameHierarchy[i]);
-               }
 
                return ret;
        }
 
-       static private String getServiceName(String clusterName, String 
entityTypeName) {
-               // Parse entityTypeName to get the Apache-component Name
-               // Assumption: entityTypeName is 
<componentName>_<component_specific_type_name>
-               // such as hive_table, hadoop_path, hbase_queue, etc.
-               String apacheComponents[] = entityTypeName.split("_");
-               String apacheComponent = null;
-               if (apacheComponents.length > 0) {
-                       apacheComponent = apacheComponents[0].toLowerCase();
-               }
-
-               return TagSyncConfig.getServiceName(apacheComponent, 
clusterName, properties);
-       }
-
-       static private <T> T getEntityAttribute(IReferenceableInstance entity, 
String name, Class<T> type) {
-               T ret = null;
-
-               try {
-                       Map<String, Object> valueMap = entity.getValuesMap();
-                       ret = getAttribute(valueMap, name, type);
-               } catch (AtlasException exception) {
-                       LOG.error("Cannot get map of values for entity: " + 
entity.getId()._getId(), exception);
-               }
-
-               return ret;
-       }
-       static private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
-               return type.cast(map.get(name));
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
deleted file mode 100644
index 42ba7c7..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import com.google.gson.Gson;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Provider;
-
-import org.apache.atlas.AtlasException;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-
-import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.plugin.util.ServiceTags;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-public class TagAtlasSource implements TagSource {
-       private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
-
-       public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"application.properties";
-
-       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.kafka.bootstrap.servers";
-       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
-       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
-
-       private TagSink tagSink;
-       private Properties properties;
-       private ConsumerRunnable consumerTask;
-
-       @Override
-       public boolean initialize(Properties properties) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.initialize()");
-               }
-
-               boolean ret = true;
-
-               if (properties == null || MapUtils.isEmpty(properties)) {
-                       LOG.error("No properties specified for TagFileSource 
initialization");
-                       this.properties = new Properties();
-               } else {
-                       this.properties = properties;
-               }
-
-               Properties atlasProperties = new Properties();
-
-               InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
-
-               if (inputStream != null) {
-                       try {
-                               atlasProperties.load(inputStream);
-                       } catch (Exception exception) {
-                               ret = false;
-                               LOG.error("Cannot load Atlas application 
properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, exception);
-                       } finally {
-                               try {
-                                       inputStream.close();
-                               } catch (IOException ioException) {
-                                       LOG.error("Cannot close Atlas 
application properties file, file-name:\" + 
TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
-                               }
-                       }
-               } else {
-                       ret = false;
-                       LOG.error("Cannot find Atlas application properties 
file");
-               }
-
-               if (ret) {
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
-                       }
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
-                       }
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
-                       }
-               }
-
-               if (ret) {
-                       NotificationModule notificationModule = new 
NotificationModule();
-
-                       Injector injector = 
Guice.createInjector(notificationModule);
-
-                       Provider<NotificationInterface> consumerProvider = 
injector.getProvider(NotificationInterface.class);
-                       NotificationInterface notification = 
consumerProvider.get();
-                       List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
-
-                       consumerTask = new ConsumerRunnable(iterators.get(0));
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAtlasSource.initialize(), result=" + 
ret);
-               }
-               return ret;
-       }
-
-       @Override
-       public void setTagSink(TagSink sink) {
-               if (sink == null) {
-                       LOG.error("Sink is null!!!");
-               } else {
-                       this.tagSink = sink;
-               }
-       }
-
-       @Override
-       public Thread start() {
-               Thread consumerThread = null;
-               if (consumerTask == null) {
-                       LOG.error("No consumerTask!!!");
-               } else {
-                       consumerThread = new Thread(consumerTask);
-                       consumerThread.setDaemon(true);
-                       consumerThread.start();
-               }
-               return consumerThread;
-       }
-
-       @Override
-       public void updateSink() throws Exception {
-       }
-
-       @Override
-       public  boolean isChanged() {
-               return true;
-       }
-
-       // ----- inner class : ConsumerRunnable 
------------------------------------
-
-       private class ConsumerRunnable implements Runnable {
-
-               private final NotificationConsumer<EntityNotification> 
consumerIterator;
-
-               private 
ConsumerRunnable(NotificationConsumer<EntityNotification> consumerIterator) {
-                       this.consumerIterator = consumerIterator;
-               }
-
-               // ----- Runnable 
--------------------------------------------------------
-
-               @Override
-               public void run() {
-                       while (consumerIterator.hasNext()) {
-                               try {
-                                       EntityNotification notification = 
consumerIterator.next();
-                                       if (notification != null) {
-                                               printNotification(notification);
-                                               ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification, properties);
-                                               if (serviceTags == null) {
-                                                       
if(LOG.isDebugEnabled()) {
-                                                               LOG.debug("Did 
not create ServiceTags structure for notification type:" + 
notification.getOperationType());
-                                                       }
-                                               } else {
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               String 
serviceTagsJSON = new Gson().toJson(serviceTags);
-                                                               
LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON);
-                                                       }
-
-                                                       try {
-                                                               
tagSink.uploadServiceTags(serviceTags);
-                                                       } catch (Exception 
exception) {
-                                                               
LOG.error("uploadServiceTags() failed..", exception);
-                                                       }
-                                               }
-                                       }
-                               } catch(Exception e){
-                                       LOG.error("Exception encountered when 
processing notification:", e);
-                               }
-                       }
-               }
-
-               public void printNotification(EntityNotification notification) {
-                       IReferenceableInstance entity = 
notification.getEntity();
-                       if (LOG.isDebugEnabled()) {
-                               try {
-                                       LOG.debug("Notification-Type: " + 
notification.getOperationType());
-                                       LOG.debug("Entity-Id: " + 
entity.getId()._getId());
-                                       LOG.debug("Entity-Type: " + 
entity.getTypeName());
-
-                                       LOG.debug("----------- Entity Values 
----------");
-
-
-                                       for (Map.Entry<String, Object> entry : 
entity.getValuesMap().entrySet()) {
-                                               LOG.debug("             Name:" 
+ entry.getKey());
-                                               Object value = entry.getValue();
-                                               LOG.debug("             Value:" 
+ value);
-                                       }
-
-                                       LOG.debug("----------- Entity Traits 
----------");
-
-                                       List<IStruct> traits = 
notification.getAllTraits();
-
-                                       for (IStruct trait : traits) {
-                                               LOG.debug("                     
Trait-Type-Name:" + trait.getTypeName());
-                                               Map<String, Object> traitValues 
= trait.getValuesMap();
-                                               for (Map.Entry<String, Object> 
valueEntry : traitValues.entrySet()) {
-                                                       LOG.debug("             
                Trait-Value-Name:" + valueEntry.getKey());
-                                                       LOG.debug("             
                Trait-Value:" + valueEntry.getValue());
-                                               }
-                                       }
-                               } catch (AtlasException exception) {
-                                       LOG.error("Cannot print notification - 
", exception);
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
new file mode 100644
index 0000000..c8ed948
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlasrest;
+
+import com.google.gson.Gson;
+
+import com.google.gson.GsonBuilder;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
+import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
+import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
+
+import java.util.*;
+
+public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
+       private static final Log LOG = 
LogFactory.getLog(AtlasRESTTagSource.class);
+
+       private String atlasEndpoint;
+       private long sleepTimeBetweenCycleInMillis;
+
+       @Override
+       public boolean initialize(Properties properties) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasRESTTagSource.initialize()");
+               }
+
+               boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
+
+               atlasEndpoint = TagSyncConfig.getAtlasEndpoint(properties);
+               sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
+
+               if (StringUtils.isEmpty(atlasEndpoint)) {
+                       LOG.info("No AtlasEndpoint specified, Initial download 
of Atlas-entities cannot be done.");
+                       ret = false;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AtlasRESTTagSource.initialize(), 
result=" + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public boolean start() {
+
+               Thread atlasRESTInvokerThread = new Thread(this);
+               atlasRESTInvokerThread.setDaemon(true);
+               atlasRESTInvokerThread.start();
+
+               return atlasRESTInvokerThread != null;
+       }
+
+       @Override
+       public void run() {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasRESTTagSource.run()");
+               }
+
+               while (!shutdown) {
+
+                       synchUp();
+
+                       LOG.debug("Sleeping for [" + 
sleepTimeBetweenCycleInMillis + "] milliSeconds");
+
+                       try {
+
+                               Thread.sleep(sleepTimeBetweenCycleInMillis);
+
+                       } catch (InterruptedException exception) {
+                               LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to 
tagFileSource", exception);
+                       }
+               }
+               LOG.info("Shutting down the Tag-Atlasrest-source thread");
+       }
+
+       @Override
+       public boolean isChanged() {
+               return true;
+       }
+
+       @Override
+       public void synchUp() {
+
+               AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint);
+
+               List<AtlasEntityWithTraits> atlasEntitiesWithTraits = 
atlasRESTUtil.getEntitiesWithTraits();
+
+               if (CollectionUtils.isNotEmpty(atlasEntitiesWithTraits)) {
+                       if (LOG.isDebugEnabled()) {
+                               for (AtlasEntityWithTraits element : 
atlasEntitiesWithTraits) {
+                                       LOG.debug(element);
+                               }
+                       }
+
+                       Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processEntitiesWithTraits(atlasEntitiesWithTraits);
+
+                       if (MapUtils.isNotEmpty(serviceTagsMap)) {
+                               for (Map.Entry<String, ServiceTags> entry : 
serviceTagsMap.entrySet()) {
+                                       if (LOG.isDebugEnabled()) {
+                                               Gson gsonBuilder = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z")
+                                                               
.setPrettyPrinting()
+                                                               .create();
+                                               String serviceTagsString = 
gsonBuilder.toJson(entry.getValue());
+
+                                               LOG.debug("serviceTags=" + 
serviceTagsString);
+                                       }
+                                       updateSink(entry.getValue());
+                               }
+                       }
+               }
+
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
new file mode 100644
index 0000000..7f4676a
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlasrest;
+
+import com.google.gson.Gson;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
+import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
+
+import java.util.*;
+
+@SuppressWarnings("unchecked")
+public class AtlasRESTUtil {
+       private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class);
+
+       private static final String REST_MIME_TYPE_JSON = "application/json" ;
+       private static final String API_ATLAS_TYPES    = "api/atlas/types";
+       private static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
+       private static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
+       private static final String API_ATLAS_TYPE     = "api/atlas/types/";
+
+       private static final String RESULTS_ATTRIBUTE               = "results";
+       private static final String DEFINITION_ATTRIBUTE            = 
"definition";
+       private static final String VALUES_ATTRIBUTE                = "values";
+       private static final String TRAITS_ATTRIBUTE                = "traits";
+       private static final String TYPE_NAME_ATTRIBUTE             = 
"typeName";
+       private static final String TRAIT_TYPES_ATTRIBUTE           = 
"traitTypes";
+       private static final String SUPER_TYPES_ATTRIBUTE           = 
"superTypes";
+       private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
+       private static final String NAME_ATTRIBUTE                  = "name";
+
+       private final Gson gson = new Gson();
+
+       private RangerRESTClient atlasRESTClient;
+
+       public AtlasRESTUtil(String atlasEndpoint) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasRESTUtil(" + atlasEndpoint + ")");
+               }
+
+               if (!atlasEndpoint.endsWith("/")) {
+                       atlasEndpoint += "/";
+               }
+
+               // This uses RangerRESTClient to invoke REST APIs on Atlas. It 
will work only if scheme of URL is http
+               atlasRESTClient = new RangerRESTClient();
+               atlasRESTClient.setUrl(atlasEndpoint);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AtlasRESTUtil(" + atlasEndpoint + ")");
+               }
+       }
+
+       public List<AtlasEntityWithTraits> getEntitiesWithTraits() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getEntriesWithTraits()");
+               }
+
+               List<AtlasEntityWithTraits> ret = new 
ArrayList<AtlasEntityWithTraits>();
+
+               Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
+
+               List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
+
+               if (CollectionUtils.isNotEmpty(types)) {
+
+                       for (String type : types) {
+
+                               if 
(!AtlasResourceMapperUtil.isEntityTypeHandled(type)) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Not fetching Atlas 
entities of type:" + type);
+                                       }
+                                       continue;
+                               }
+
+                               Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
+
+                               List<String> guids = 
getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
+
+                               if (CollectionUtils.isEmpty(guids)) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("No Atlas entities 
for type:" + type);
+                                       }
+                                       continue;
+                               }
+
+                               for (String guid : guids) {
+
+                                       Map<String, Object> entityResponse = 
atlasAPI(API_ATLAS_ENTITY + guid);
+
+                                       if (MapUtils.isNotEmpty(entityResponse) 
&& entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+
+                                               Map<String, Object> definition 
= getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
+                                               Map<String, Object> 
traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+
+                                               if 
(MapUtils.isNotEmpty(traitsAttribute)) {
+
+                                                       List<IStruct> allTraits 
= new LinkedList<>();
+
+                                                       for (Map.Entry<String, 
Object> entry : traitsAttribute.entrySet()) {
+
+                                                               Map<String, 
Object> trait = (Map<String, Object>) entry.getValue();
+
+                                                               Map<String, 
Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+                                                               String 
traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+
+                                                               List<IStruct> 
superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+
+                                                               Struct trait1 = 
new Struct(traitTypeName, traitValues);
+
+                                                               
allTraits.add(trait1);
+                                                               
allTraits.addAll(superTypes);
+                                                       }
+
+                                                       IReferenceableInstance 
entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), 
true);
+
+                                                       if (entity != null) {
+                                                               
AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, 
allTraits);
+                                                               
ret.add(entityWithTraits);
+                                                       } else {
+                                                               if 
(LOG.isInfoEnabled()) {
+                                                                       
LOG.info("Could not create Atlas entity from its definition, type=" + type + ", 
guid=" + guid);
+                                                               }
+                                                       }
+
+                                               }
+                                       }
+                               }
+                       }
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("<== getEntriesWithTraits()");
+                       }
+               }
+
+               return ret;
+       }
+
+       private Map<String, Object> getTraitType(String traitName) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getTraitType(" + traitName + ")");
+               }
+               Map<String, Object> ret = null;
+
+               Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + 
traitName);
+
+               if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+
+                       Map<String, Object> definition = 
getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
+
+                       List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
+
+                       if (traitTypes.size() > 0) {
+                               ret = (Map<String, Object>) traitTypes.get(0);
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== getTraitType(" + traitName + ")");
+               }
+               return ret;
+       }
+
+       private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, 
Map<String, Object> values) {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getTraitSuperTypes()");
+               }
+               List<IStruct> ret = new LinkedList<>();
+
+               if (traitType != null) {
+
+                       List<String> superTypeNames = getAttribute(traitType, 
SUPER_TYPES_ATTRIBUTE, List.class);
+
+                       for (String superTypeName : superTypeNames) {
+
+                               Map<String, Object> superTraitType = 
getTraitType(superTypeName);
+
+                               if (superTraitType != null) {
+                                       List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+
+                                       Map<String, Object> superTypeValues = 
new HashMap<>();
+                                       for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
+
+                                               String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
+                                               if 
(values.containsKey(attributeName)) {
+                                                       
superTypeValues.put(attributeName, values.get(attributeName));
+                                               }
+                                       }
+
+                                       List<IStruct> superTraits = 
getTraitSuperTypes(getTraitType(superTypeName), values);
+
+                                       Struct superTrait = new 
Struct(superTypeName, superTypeValues);
+
+                                       ret.add(superTrait);
+                                       ret.addAll(superTraits);
+                               }
+                       }
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== getTraitSuperTypes()");
+               }
+               return ret;
+       }
+
+       private Map<String, Object> atlasAPI(String endpoint) {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> atlasAPI(" + endpoint + ")");
+               }
+               Map<String, Object> ret = new HashMap<String, Object>();
+
+               try {
+                       WebResource webResource = 
atlasRESTClient.getResource(endpoint);
+                       ClientResponse response = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class);
+
+                       if (response != null && response.getStatus() == 200) {
+                               ret = response.getEntity(ret.getClass());
+                       } else {
+                               RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+                               LOG.error("Error getting atlas data request=" + 
webResource.toString()
+                                               + ", response=" + 
resp.toString());
+                       }
+               } catch (Exception exception) {
+                       LOG.error("Exception when fetching Atlas objects.", 
exception);
+                       ret = null;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== atlasAPI(" + endpoint + ")");
+               }
+               return ret;
+       }
+
+       private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
+               return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : 
null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
new file mode 100644
index 0000000..43eb3b5
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.file;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.*;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Date;
+import java.util.Properties;
+
+public class FileTagSource extends AbstractTagSource implements Runnable {
+       private static final Log LOG = LogFactory.getLog(FileTagSource.class);
+
+       private String serviceTagsFileName;
+       private URL serviceTagsFileURL;
+       private long lastModifiedTimeInMillis = -1L;
+
+       private Gson gsonBuilder;
+       private Properties properties;
+       private long fileModTimeCheckIntervalInMs;
+
+       @Override
+       public boolean initialize(Properties props) {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> FileTagSource.initialize()");
+               }
+
+               if (props == null || MapUtils.isEmpty(props)) {
+                       LOG.error("No properties specified for FileTagSource 
initialization");
+                       this.properties = new Properties();
+               } else {
+                       this.properties = props;
+               }
+
+               gsonBuilder = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+
+               boolean ret = true;
+
+               serviceTagsFileName = 
TagSyncConfig.getTagSourceFileName(properties);
+
+               if (StringUtils.isBlank(serviceTagsFileName)) {
+                       ret = false;
+                       LOG.error("value of property 
'ranger.tagsync.source.impl.class' is file and no value specified for property 
'ranger.tagsync.filesource.filename'!");
+               }
+
+               if (ret) {
+
+                       fileModTimeCheckIntervalInMs = 
TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Provided serviceTagsFileName=" + 
serviceTagsFileName);
+                               
LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + 
fileModTimeCheckIntervalInMs + "ms");
+                       }
+
+                       InputStream serviceTagsFileStream = null;
+
+                       File f = new File(serviceTagsFileName);
+
+                       if (f.exists() && f.isFile() && f.canRead()) {
+                               try {
+                                       serviceTagsFileStream = new 
FileInputStream(f);
+                                       serviceTagsFileURL = f.toURI().toURL();
+                               } catch (FileNotFoundException exception) {
+                                       LOG.error("Error processing input 
file:" + serviceTagsFileName + " or no privilege for reading file " + 
serviceTagsFileName, exception);
+                               } catch (MalformedURLException 
malformedException) {
+                                       LOG.error("Error processing input 
file:" + serviceTagsFileName + " cannot be converted to URL " + 
serviceTagsFileName, malformedException);
+                               }
+                       } else {
+
+                               URL fileURL = 
getClass().getResource(serviceTagsFileName);
+                               if (fileURL == null) {
+                                       if 
(!serviceTagsFileName.startsWith("/")) {
+                                               fileURL = 
getClass().getResource("/" + serviceTagsFileName);
+                                       }
+                               }
+
+                               if (fileURL == null) {
+                                       fileURL = 
ClassLoader.getSystemClassLoader().getResource(serviceTagsFileName);
+                                       if (fileURL == null) {
+                                               if 
(!serviceTagsFileName.startsWith("/")) {
+                                                       fileURL = 
ClassLoader.getSystemClassLoader().getResource("/" + serviceTagsFileName);
+                                               }
+                                       }
+                               }
+
+                               if (fileURL != null) {
+
+                                       try {
+                                               serviceTagsFileStream = 
fileURL.openStream();
+                                               serviceTagsFileURL = fileURL;
+                                       } catch (Exception exception) {
+                                               LOG.error(serviceTagsFileName + 
" is not a file", exception);
+                                       }
+                               } else {
+                                       LOG.warn("Error processing input file: 
URL not found for " + serviceTagsFileName + " or no privilege for reading file 
" + serviceTagsFileName);
+                               }
+                       }
+
+                       if (serviceTagsFileStream != null) {
+                               try {
+                                       serviceTagsFileStream.close();
+                               } catch (Exception e) {
+                                       // Ignore
+                               }
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== FileTagSource.initialize(): 
sourceFileName=" + serviceTagsFileName + ", result=" + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public boolean start() {
+
+               Thread fileMonitoringThread = new Thread(this);
+               fileMonitoringThread.setDaemon(true);
+               fileMonitoringThread.start();
+
+               return fileMonitoringThread != null;
+       }
+
+       @Override
+       public void run() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> FileTagSource.run()");
+               }
+
+               while (!shutdown) {
+
+                       try {
+                               synchUp();
+
+                               LOG.debug("Sleeping for [" + 
fileModTimeCheckIntervalInMs + "] milliSeconds");
+
+                               Thread.sleep(fileModTimeCheckIntervalInMs);
+                       }
+                       catch (InterruptedException e) {
+                               LOG.error("Failed to wait for [" + 
fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to 
tagFileSource", e);
+                       }
+                       catch (Throwable t) {
+                               LOG.error("tag-sync thread got an error", t);
+                       }
+               }
+
+               LOG.info("Shutting down the Tag-file-source thread");
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== FileTagSource.run()");
+               }
+       }
+
+       @Override
+       public  boolean isChanged() {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> FileTagSource.isChanged()");
+               }
+               boolean ret = false;
+
+               long modificationTime = getModificationTime();
+
+               if (modificationTime > lastModifiedTimeInMillis) {
+                       if (LOG.isDebugEnabled()) {
+                               Date modifiedDate = new Date(modificationTime);
+                               Date lastModifiedDate = new 
Date(lastModifiedTimeInMillis);
+                               LOG.debug("File modified at " + modifiedDate + 
"last-modified at " + lastModifiedDate);
+                       }
+                       lastModifiedTimeInMillis = modificationTime;
+                       ret = true;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== FileTagSource.isChanged(): result=" + 
ret);
+               }
+               return ret;
+       }
+
+       @Override
+       public void synchUp() {
+               if (isChanged()) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Begin: update tags from 
source==>sink");
+                       }
+
+                       ServiceTags serviceTags = readFromFile();
+                       updateSink(serviceTags);
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("End: update tags from 
source==>sink");
+                       }
+
+               } else {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("FileTagSource: no change found for 
synchronization.");
+                       }
+               }
+       }
+       private ServiceTags readFromFile() {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> FileTagSource.readFromFile(): 
sourceFileName=" + serviceTagsFileName);
+               }
+
+               ServiceTags ret = null;
+
+               if (serviceTagsFileURL != null) {
+                       try (
+                                       InputStream serviceTagsFileStream = 
serviceTagsFileURL.openStream();
+                                       Reader reader = new 
InputStreamReader(serviceTagsFileStream, Charset.forName("UTF-8"))
+                       ) {
+
+                               ret = gsonBuilder.fromJson(reader, 
ServiceTags.class);
+
+                       } catch (IOException e) {
+                               LOG.warn("Error processing input file: or no 
privilege for reading file " + serviceTagsFileName, e);
+                       }
+               } else {
+                       LOG.error("Error reading file: " + serviceTagsFileName);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== FileTagSource.readFromFile(): 
sourceFileName=" + serviceTagsFileName);
+               }
+
+               return ret;
+       }
+
+       private long getModificationTime() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> FileTagSource.getLastModificationTime(): 
sourceFileName=" + serviceTagsFileName);
+               }
+               long ret = 0L;
+
+               File sourceFile = new File(serviceTagsFileName);
+
+               if (sourceFile.exists() && sourceFile.isFile() && 
sourceFile.canRead()) {
+                       ret = sourceFile.lastModified();
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== FileTagSource.lastModificationTime(): 
serviceTagsFileName=" + serviceTagsFileName + " result=" + new Date(ret));
+               }
+
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
deleted file mode 100644
index 92f24b2..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.file;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.tagsync.model.TagSink;
-import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.io.*;
-import java.util.Date;
-import java.util.Properties;
-
-public class TagFileSource implements TagSource, Runnable {
-       private static final Log LOG = LogFactory.getLog(TagFileSource.class);
-
-       private String sourceFileName;
-       private long lastModifiedTimeInMillis = 0L;
-
-       private Gson gson;
-       private TagSink tagSink;
-       private Properties properties;
-
-       @Override
-       public boolean initialize(Properties props) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.initialize()");
-               }
-
-               if (props == null || MapUtils.isEmpty(props)) {
-                       LOG.error("No properties specified for TagFileSource 
initialization");
-                       this.properties = new Properties();
-               } else {
-                       this.properties = props;
-               }
-
-               boolean ret = true;
-
-               if 
(StringUtils.isBlank(TagSyncConfig.getTagSourceFileName(properties))) {
-                       ret = false;
-                       LOG.error("value of property 
'ranger.tagsync.source.impl.class' is file and no value specified for property 
'ranger.tagsync.filesource.filename'!");
-               }
-
-               if (ret) {
-
-                       long fileModTimeCheckIntervalInMs = 
TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
-
-                       if (fileModTimeCheckIntervalInMs <= 0L) {
-                               
LOG.info("'ranger.tagsync.filesource.modtime.check.interval' is zero or 
negative! 'ranger.tagsync.filesource.modtime.check.interval'=" + 
fileModTimeCheckIntervalInMs + "ms");
-                               LOG.info("Setting 
'ranger.tagsync.filesource.modtime.check.interval' to 60 seconds");
-                               fileModTimeCheckIntervalInMs = 60*1000;
-                       } else {
-                               if (LOG.isDebugEnabled()) {
-                                       
LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + 
fileModTimeCheckIntervalInMs + "ms");
-                               }
-                       }
-                       sourceFileName = 
TagSyncConfig.getTagSourceFileName(properties);
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Provided sourceFileName=" + 
sourceFileName);
-                       }
-
-                       String realFileName = 
TagSyncConfig.getResourceFileName(sourceFileName);
-                       if (realFileName != null) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Real sourceFileName=" + 
realFileName);
-                               }
-                               sourceFileName = realFileName;
-                       } else {
-                               LOG.error(sourceFileName + " is not a file or 
is not readable");
-                               ret = false;
-                       }
-               }
-
-               if (ret) {
-                       try {
-                               gson = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
-                       } catch (Throwable excp) {
-                               LOG.fatal("failed to create GsonBuilder 
object", excp);
-                               ret = false;
-                       }
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.initialize(): 
sourceFileName=" + sourceFileName + ", result=" + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public void setTagSink(TagSink sink) {
-               if (sink == null) {
-                       LOG.error("Sink is null!!!");
-               } else {
-                       this.tagSink = sink;
-               }
-       }
-
-       @Override
-       public Thread start() {
-
-               Thread fileMonitoringThread = null;
-
-               fileMonitoringThread = new Thread(this);
-               fileMonitoringThread.setDaemon(true);
-               fileMonitoringThread.start();
-
-               return fileMonitoringThread;
-       }
-
-       @Override
-       public void run() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.run()");
-               }
-               long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties);
-               boolean shutdownFlag = false;
-
-               while (!shutdownFlag) {
-
-                       try {
-                               if (isChanged()) {
-                                       LOG.info("Begin: update tags from 
source==>sink");
-                                       if 
(TagSyncConfig.isTagSyncEnabled(properties)) {
-                                               updateSink();
-                                               LOG.info("End: update tags from 
source==>sink");
-                                       } else {
-                                               LOG.info("Tag-sync is not 
enabled.");
-                                       }
-                               } else {
-                                       LOG.debug("TagFileSource: no change 
found for synchronization.");
-                               }
-
-                               LOG.debug("Sleeping for [" + 
sleepTimeBetweenCycleInMillis + "] milliSeconds");
-
-                               Thread.sleep(sleepTimeBetweenCycleInMillis);
-                       }
-                       catch (InterruptedException e) {
-                               LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to 
tagFileSource", e);
-                               shutdownFlag = true;
-                       }
-                       catch (Throwable t) {
-                               LOG.error("tag-sync thread got an error", t);
-                       }
-               }
-
-               LOG.info("Shutting down the Tag-file-source thread");
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.run()");
-               }
-       }
-
-       @Override
-       public void updateSink() throws Exception {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.updateSink()");
-               }
-               ServiceTags serviceTags = readFromFile();
-
-               if (serviceTags != null) {
-                       tagSink.uploadServiceTags(serviceTags);
-               } else {
-                       LOG.error("Could not read ServiceTags from file");
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.updateSink()");
-               }
-       }
-
-       @Override
-       public  boolean isChanged() {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.isChanged()");
-               }
-               boolean ret = false;
-
-               long modificationTime = getModificationTime();
-
-               if (modificationTime > lastModifiedTimeInMillis) {
-                       if (LOG.isDebugEnabled()) {
-                               Date modifiedDate = new Date(modificationTime);
-                               Date lastModifiedDate = new 
Date(lastModifiedTimeInMillis);
-                               LOG.debug("File modified at " + modifiedDate + 
"last-modified at " + lastModifiedDate);
-                       }
-                       lastModifiedTimeInMillis = modificationTime;
-                       ret = true;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.isChanged(): result=" + 
ret);
-               }
-               return ret;
-       }
-
-       private ServiceTags readFromFile() {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.readFromFile(): 
sourceFileName=" + sourceFileName);
-               }
-
-               ServiceTags ret = null;
-
-               Reader reader = null;
-               try {
-
-                       reader = new 
InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName));
-
-                       ret = gson.fromJson(reader, ServiceTags.class);
-
-               }
-               catch (FileNotFoundException exception) {
-                       LOG.warn("Tag-source file does not exist or not readble 
'" + sourceFileName + "'");
-               }
-               catch (Exception excp) {
-                       LOG.error("failed to load service-tags from Tag-source 
file " + sourceFileName, excp);
-               }
-               finally {
-                       if (reader != null) {
-                               try {
-                                       reader.close();
-                               } catch (Exception excp) {
-                                       LOG.error("error while closing opened 
Tag-source file " + sourceFileName, excp);
-                               }
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.readFromFile(): 
sourceFileName=" + sourceFileName);
-               }
-
-               return ret;
-       }
-
-       private long getModificationTime() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.getLastModificationTime(): 
sourceFileName=" + sourceFileName);
-               }
-               long ret = 0L;
-
-               File sourceFile = new File(sourceFileName);
-
-               if (sourceFile.exists() && sourceFile.isFile() && 
sourceFile.canRead()) {
-                       ret = sourceFile.lastModified();
-               } else {
-                       ret = new Date().getTime();
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.lastModificationTime(): 
sourceFileName=" + sourceFileName + " result=" + new Date(ret));
-               }
-
-               return ret;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/resources/etc/ranger/data/tags.json
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/etc/ranger/data/tags.json 
b/tagsync/src/main/resources/etc/ranger/data/tags.json
index 274cf69..b4cd736 100644
--- a/tagsync/src/main/resources/etc/ranger/data/tags.json
+++ b/tagsync/src/main/resources/etc/ranger/data/tags.json
@@ -1,9 +1,7 @@
 {
     "op":"add_or_update",
-    "tagModel":"shared",
+    "tagModel": "resource_private",
     "serviceName": "cl1_hive",
-    "tagVersion": 24,
-    "tagUpdateTime": "20150901-20:03:17.000-+0000",
     "tagDefinitions": {
       "1": {
         "name": "EXPIRES_ON",
@@ -15,12 +13,7 @@
           }
         ],
         "id": 1,
-        "guid": "1441137512654_323_77",
-        "isEnabled": true,
-        "createdBy": "Admin",
-        "updatedBy": "Admin",
-        "createTime": "20150901-19:58:33.000-+0000",
-        "updateTime": "20150901-19:58:33.000-+0000"
+        "guid": "tagdefinition-1-guid"
       }
     },
     "tags": {
@@ -30,12 +23,7 @@
           "expiry_date": "2014/12/31"
         },
         "id": 1,
-        "guid": "1441137512698_844_80",
-        "isEnabled": true,
-        "createdBy": "Admin",
-        "updatedBy": "Admin",
-        "createTime": "20150901-19:58:33.000-+0000",
-        "updateTime": "20150901-20:03:17.000-+0000"
+        "guid": "tag-1-guid"
       }
     },
     "serviceResources": [
@@ -57,14 +45,8 @@
             "isRecursive": false
           }
         },
-        "resourceSignature": "c1114679b35a65d28e0dca4fdffc27d6",
         "id": 1,
-        "guid": "1441137512756_887_83",
-        "isEnabled": true,
-        "createdBy": "Admin",
-        "updatedBy": "Admin",
-        "createTime": "20150901-19:58:33.000-+0000",
-        "updateTime": "20150901-19:58:33.000-+0000"
+        "guid": "serviceresource-1-guid"
       }
     ],
     "resourceToTagIds": {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/resources/ranger-tagsync-default.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml 
b/tagsync/src/main/resources/ranger-tagsync-default.xml
index b917d29..3784df8 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -55,4 +55,9 @@
                <value>tagadmin</value>
                <description></description>
        </property>
+       <property>
+               <name>ranger.tagsync.atlasrestsource.download.interval</name>
+               <value>600000</value>
+               <description></description>
+       </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
 
b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
index 59d521c..43c22d0 100644
--- 
a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
+++ 
b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
@@ -20,12 +20,7 @@
 package org.apache.ranger.tagsync.process;
 
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.ranger.tagsync.model.TagSource;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-import org.apache.ranger.tagsync.process.TagSynchronizer;
-import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
-import org.apache.ranger.tagsync.source.atlas.TagAtlasSource;
+import org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,8 +29,6 @@ import java.io.*;
 import java.util.List;
 import java.util.Properties;
 
-import static org.junit.Assert.*;
-
 
 public class TestTagSynchronizer {
 
@@ -79,7 +72,7 @@ public class TestTagSynchronizer {
        @Test
        public void testTagDownload() {
 
-               boolean initDone = false;
+               boolean initDone = true;
 
                /* For tagSynchronizer.initialize() to succeed, edit 
ranger-tagsync-site.xml file to contain correct
                values of the following properties:
@@ -97,10 +90,12 @@ public class TestTagSynchronizer {
                */
 
 
-               //initDone = tagSynchronizer.initialize();
+//             initDone = tagSynchronizer.initialize(null);
 
                System.out.println("TagSynchronizer initialization result=" + 
initDone);
 
+               assert(initDone);
+
                System.out.println("Exiting testTagDownload()");
        }
 
@@ -108,14 +103,15 @@ public class TestTagSynchronizer {
        public void testQualifiedNames() {
 
                List<String> components;
+               AtlasHiveResourceMapper hiveResourceBuilder = new 
AtlasHiveResourceMapper();
                try {
-                       components = 
AtlasNotificationMapper.getQualifiedNameComponents("hive_db", 
"database@cluster");
+                       components = 
hiveResourceBuilder.getQualifiedNameComponents("hive_db", "database@cluster");
                        printComponents(components);
 
-                       components = 
AtlasNotificationMapper.getQualifiedNameComponents("hive_table", 
"database.table@cluster");
+                       components = 
hiveResourceBuilder.getQualifiedNameComponents("hive_table", 
"database.table@cluster");
                        printComponents(components);
 
-                       components = 
AtlasNotificationMapper.getQualifiedNameComponents("hive_column", 
"database.table.column@cluster");
+                       components = 
hiveResourceBuilder.getQualifiedNameComponents("hive_column", 
"database.table.column@cluster");
                        printComponents(components);
 
                        assert(true);

Reply via email to