http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java 
b/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java
deleted file mode 100644
index d9142c7..0000000
--- a/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.source.atlas;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.EntityImpl;
-import org.apache.atlas.typesystem.IdImpl;
-import org.apache.atlas.typesystem.TraitImpl;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationConsumer;
-import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.model.TagSink;
-import org.apache.ranger.model.TagSource;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.RangerRESTUtils;
-import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.process.TagSyncConfig;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class TagAtlasSource implements TagSource {
-       private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
-
-
-       private final Map<String, Entity> entities = new LinkedHashMap<>();
-       private TagSink tagSink;
-       private Properties properties;
-       private ConsumerRunnable consumerTask;
-
-       @Override
-       public boolean initialize(Properties properties) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.initialize()");
-               }
-
-               boolean ret = true;
-
-               if (properties == null || MapUtils.isEmpty(properties)) {
-                       LOG.error("No properties specified for TagFileSource 
initialization");
-                       this.properties = new Properties();
-               } else {
-                       this.properties = properties;
-               }
-
-
-               NotificationModule notificationModule = new 
NotificationModule();
-
-               Injector injector = Guice.createInjector(notificationModule);
-
-               EntityNotificationConsumerProvider consumerProvider = 
injector.getInstance(EntityNotificationConsumerProvider.class);
-
-               consumerTask = new ConsumerRunnable(consumerProvider.get());
-
-               //ExecutorService executorService = 
Executors.newFixedThreadPool(1);
-
-               //executorService.submit(new 
ConsumerRunnable(consumerProvider.get()));
-
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAtlasSource.initialize(), result=" + 
ret);
-               }
-               return ret;
-       }
-
-       @Override
-       public void setTagSink(TagSink sink) {
-               if (sink == null) {
-                       LOG.error("Sink is null!!!");
-               } else {
-                       this.tagSink = sink;
-               }
-       }
-
-       @Override
-       public Thread start() {
-               Thread consumerThread = null;
-               if (consumerTask == null) {
-                       LOG.error("No consumerTask!!!");
-               } else {
-                       consumerThread = new Thread(consumerTask);
-                       consumerThread.setDaemon(true);
-                       consumerThread.start();
-               }
-               return consumerThread;
-       }
-
-       @Override
-       public void updateSink() throws Exception {
-       }
-
-       @Override
-       public  boolean isChanged() {
-               return true;
-       }
-
-       // ----- inner class : ConsumerRunnable 
------------------------------------
-
-       private class ConsumerRunnable implements Runnable {
-
-               private final EntityNotificationConsumer consumer;
-
-               private ConsumerRunnable(EntityNotificationConsumer consumer) {
-                       this.consumer = consumer;
-               }
-
-
-               // ----- Runnable 
--------------------------------------------------------
-
-               @Override
-               public void run() {
-                       while (consumer.hasNext()) {
-                               try {
-                                       EntityNotification notification = 
consumer.next();
-                                       Entity entity = 
notification.getEntity();
-                                       printNotification(notification);
-                                       ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification, properties);
-                                       if (serviceTags == null) {
-                                               LOG.error("Failed to map Atlas 
notification to ServiceTags structure");
-                                       } else {
-                                               if (LOG.isDebugEnabled()) {
-                                                       String serviceTagsJSON 
= new Gson().toJson(serviceTags);
-                                                       LOG.debug("Atlas 
notification mapped to serviceTags=" + serviceTagsJSON);
-                                               }
-
-                                               try {
-                                                       
tagSink.uploadServiceTags(serviceTags);
-                                               } catch (Exception exception) {
-                                                       
LOG.error("uploadServiceTags() failed..", exception);
-                                               }
-                                       }
-                               } catch(Exception e){
-                                       LOG.error("Exception encountered when 
processing notification:", e);
-                               }
-                       }
-               }
-
-               public void printNotification(EntityNotification notification) {
-                       Entity entity = notification.getEntity();
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Notification-Type: " + 
notification.getOperationType().name());
-                               LOG.debug("Entity-Id: " + 
entity.getId().getGuid());
-                               LOG.debug("Entity-Type: " + 
entity.getTypeName());
-
-                               LOG.debug("----------- Entity Values 
----------");
-
-
-                               for (Map.Entry<String, Object> entry : 
entity.getValues().entrySet()) {
-                                       LOG.debug("             Name:" + 
entry.getKey());
-                                       Object value = entry.getValue();
-                                       LOG.debug("             Value:" + 
value);
-                               }
-
-                               LOG.debug("----------- Entity Traits 
----------");
-
-
-                               for (Map.Entry<String, ? extends Trait> entry : 
entity.getTraits().entrySet()) {
-                                       LOG.debug("                     
Trait-Name:" + entry.getKey());
-                                       Trait trait = entry.getValue();
-                                       LOG.debug("                     
Trait-Type:" + trait.getTypeName());
-                                       Map<String, Object> traitValues = 
trait.getValues();
-                                       for (Map.Entry<String, Object> 
valueEntry : traitValues.entrySet()) {
-                                               LOG.debug("                     
        Trait-Value-Name:" + valueEntry.getKey());
-                                               LOG.debug("                     
        Trait-Value:" + valueEntry.getValue());
-                                       }
-                               }
-
-                       }
-               }
-
-       }
-
-       public void printAllEntities() {
-               try {
-                       new AtlasUtility().getAllEntities();
-               }
-               catch(java.io.IOException ioException) {
-                       LOG.error("Caught IOException while retrieving Atlas 
Entities:", ioException);
-               }
-       }
-
-       // update the set of entities with current from Atlas
-       public void refreshAllEntities() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.refreshAllEntities()");
-               }
-               AtlasUtility atlasUtility = new AtlasUtility();
-
-               try {
-                       entities.putAll(atlasUtility.getAllEntities());
-               } catch (IOException e) {
-                       LOG.error("getAllEntities() failed", e);
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAtlasSource.refreshAllEntities()");
-               }
-       }
-
-       // Inner class AtlasUtil
-
-       /**
-        * Atlas utility.
-        */
-       @SuppressWarnings("unchecked")
-       private class AtlasUtility {
-
-               /**
-                * Atlas APIs
-                */
-               public static final String API_ATLAS_TYPES    = 
"api/atlas/types";
-               public static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
-               public static final String API_ATLAS_ENTITY   = 
"api/atlas/entities/";
-               public static final String API_ATLAS_TYPE     = 
"api/atlas/types/";
-
-               /**
-                * API Response Attributes
-                */
-               public static final String RESULTS_ATTRIBUTE               = 
"results";
-               public static final String DEFINITION_ATTRIBUTE            = 
"definition";
-               public static final String VALUES_ATTRIBUTE                = 
"values";
-               public static final String TRAITS_ATTRIBUTE                = 
"traits";
-               public static final String TYPE_NAME_ATTRIBUTE             = 
"typeName";
-               public static final String TRAIT_TYPES_ATTRIBUTE           = 
"traitTypes";
-               public static final String SUPER_TYPES_ATTRIBUTE           = 
"superTypes";
-               public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
-               public static final String NAME_ATTRIBUTE                  = 
"name";
-
-               private Type mapType = new TypeToken<Map<String, 
Object>>(){}.getType();
-
-               private RangerRESTClient restClient;
-
-
-               // ----- Constructors 
------------------------------------------------------
-
-               /**
-                * Construct an AtlasUtility
-                *
-                */
-               public AtlasUtility() {
-
-                       String url               = 
TagSyncConfig.getAtlasEndpoint(properties);
-                       String sslConfigFileName = 
TagSyncConfig.getAtlasSslConfigFileName(properties);
-
-
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("Initializing RangerRestClient with 
(url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
-                       }
-
-                       restClient = new RangerRESTClient(url, 
sslConfigFileName);
-
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("Initialized RangerRestClient with 
(url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
-                       }
-               }
-
-
-               // ----- AtlasUtility 
------------------------------------------------------
-
-               /**
-                * Get all of the entities defined in Atlas.
-                *
-                * @return  a mapping of GUIDs to Atlas entities
-                *
-                * @throws IOException if there is an error communicating with 
Atlas
-                */
-               public Map<String, Entity> getAllEntities() throws IOException {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> 
TagAtlasSource.getAllEntities()");
-                       }
-                       Map<String, Entity> entities = new LinkedHashMap<>();
-
-                       Map<String, Object> typesResponse = 
atlasAPI(API_ATLAS_TYPES);
-
-                       List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
-
-                       for (String type : types) {
-
-                               Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
-
-                               List<String> guids = 
getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-                               for (String guid : guids) {
-
-                                       if (StringUtils.isNotBlank(guid)) {
-
-                                               Map<Trait, Map<String, ? 
extends Trait>> traitSuperTypes = new HashMap<>();
-
-                                               Map<String, Object> 
entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
-
-                                               if 
(entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-                                                       String definitionJSON = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
-
-                                                       LOG.info("{");
-                                                       LOG.info("      
\"entity-id\":" + guid + ",");
-                                                       LOG.info("      
\"entity-definition\":" + definitionJSON);
-                                                       LOG.info("}");
-
-                                                       Map<String, Object> 
definition = new Gson().fromJson(definitionJSON, mapType);
-
-                                                       Map<String, Object> 
values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
-                                                       Map<String, Object> 
traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-                                                       String typeName = 
getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                       LOG.info("Received 
entity(typeName=" + typeName + ", id=" + guid + ")");
-
-
-                                                       Map<String, TraitImpl> 
traitMap = new HashMap<>();
-
-                                                       if 
(MapUtils.isNotEmpty(traits)) {
-
-                                                               
LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ 
");
-
-                                                               for 
(Map.Entry<String, Object> entry : traits.entrySet()) {
-
-                                                                       
Map<String, Object> trait = (Map<String, Object>) entry.getValue();
-
-                                                                       
Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, 
Map.class);
-                                                                       String 
traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                                       
Map<String, TraitImpl> superTypes = 
getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-                                                                       
TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
-
-                                                                       
traitSuperTypes.put(trait1, superTypes);
-
-                                                                       
traitMap.put(entry.getKey(), trait1);
-
-
-                                                                       
LOG.info("                      Trait(typeName=" + traitTypeName + ")");
-
-                                                               }
-                                                       } else {
-                                                               LOG.info("No 
traits for entity(typeName=" + typeName + ", id=" + guid + ")");
-                                                       }
-                                                       EntityImpl entity = new 
EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
-
-                                                       showEntity(entity);
-
-                                                       entities.put(guid, 
entity);
-
-                                               }
-                                       }
-                               }
-                       }
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> 
TagAtlasSource.getAllEntities()");
-                       }
-                       return entities;
-               }
-
-
-               // ----- helper methods 
----------------------------------------------------
-
-               private Map<String, Object> getTraitType(String traitName)
-                               throws IOException {
-
-                       Map<String, Object> typeResponse = 
atlasAPI(API_ATLAS_TYPE + traitName);
-
-                       if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-                               String definitionJSON = 
getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
-
-                               Map<String, Object> definition = new 
Gson().fromJson(definitionJSON, mapType);
-
-                               List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
-
-                               if (traitTypes.size() > 0) {
-                                       return (Map<String, Object>) 
traitTypes.get(0);
-                               }
-                       }
-                       return null;
-               }
-
-               private Map<String, TraitImpl> getTraitSuperTypes(Map<String, 
Object> traitType, Map<String, Object> values)
-                               throws IOException {
-
-                       Map<String, TraitImpl> superTypes = new HashMap<>();
-
-                       if (traitType != null) {
-
-                               List<String> superTypeNames = 
getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
-
-                               for (String superTypeName : superTypeNames) {
-
-                                       Map<String, Object> superTraitType = 
getTraitType(superTypeName);
-
-                                       if (superTraitType != null) {
-                                               List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-                                               Map<String, Object> 
superTypeValues = new HashMap<>();
-                                               for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
-
-                                                       String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
-                                                       if 
(values.containsKey(attributeName)) {
-                                                               
superTypeValues.put(attributeName, values.get(attributeName));
-                                                       }
-                                               }
-
-                                               superTypes.put(superTypeName,
-                                                               //new 
TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, 
superTypeName));
-                                                               new 
TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, 
superTypeValues)));
-                                       }
-                               }
-                       }
-                       return superTypes;
-               }
-
-               /*
-               private Map<String, Object> atlasAPI(String endpoint) throws 
IOException {
-                       InputStream in  = streamProvider.readFrom(atlasEndpoint 
+ endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
-                       return new Gson().fromJson(IOUtils.toString(in, 
"UTF-8"), mapType);
-               }
-               */
-
-               private Map<String, Object> atlasAPI(String endpoint)  {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> TagAtlasSource.atlasAPI(" + 
endpoint +")");
-                       }
-                       // Create a REST client and perform a get on it
-                       Map<String, Object> ret = new HashMap<String, Object>();
-
-                       WebResource webResource = 
restClient.getResource(endpoint);
-
-                       ClientResponse response = 
webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-                       if(response != null && response.getStatus() == 200) {
-                               ret = response.getEntity(ret.getClass());
-                       } else {
-                               LOG.error("Atlas REST call returned with 
response={" + response +"}");
-
-                               RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-                               LOG.error("Error getting Atlas Entity. 
request=" + webResource.toString()
-                                               + ", response=" + 
resp.toString());
-                       }
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("<== TagAtlasSource.atlasAPI(" + 
endpoint + ")");
-                       }
-                       return ret;
-               }
-
-               private <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
-                       return type.cast(map.get(name));
-               }
-
-
-
-               public void showEntity(Entity entity) {
-
-                       LOG.debug("Entity-id    :" + entity.getId());
-
-                       LOG.debug("Type:                " + 
entity.getTypeName());
-
-                       LOG.debug("----- Values -----");
-
-                       for (Map.Entry<String, Object> entry : 
entity.getValues().entrySet()) {
-                               LOG.debug("             Name:   " + 
entry.getKey() + "");
-                               Object value = entry.getValue();
-                               LOG.debug("             Value:  " + 
getValue(value, entities.keySet()));
-                       }
-
-                       LOG.debug("----- Traits -----");
-
-                       for (String traitName : entity.getTraits().keySet()) {
-                               LOG.debug("             Name:" + entity.getId() 
+ ", trait=" + traitName + ">" + traitName);
-                       }
-
-               }
-
-               public void showTrait(Entity entity, String traitId) {
-
-                       String[] traitNames = traitId.split(",");
-
-                       Trait trait = entity.getTraits().get(traitNames[0]);
-
-                       for (int i = 1; i < traitNames.length; ++i ) {
-                               trait = 
trait.getSuperTypes().get(traitNames[i]);
-                       }
-
-                       String typeName = trait.getTypeName();
-
-                       LOG.debug("Trait " + typeName + " for Entity id=" + 
entity.getId());
-
-                       LOG.debug("Type: " + typeName);
-
-                       LOG.debug("----- Values ------");
-
-                       for (Map.Entry<String, Object> entry : 
trait.getValues().entrySet()) {
-                               LOG.debug("Name:" + entry.getKey());
-                               Object value = entry.getValue();
-                               LOG.debug("Value:" + getValue(value, 
entities.keySet()));
-                       }
-
-                       LOG.debug("Super Traits");
-
-
-                       for (String traitName : trait.getSuperTypes().keySet()) 
{
-                               LOG.debug("Name=" + entity.getId() + "&trait=" 
+ traitId + "," + traitName + ">" + traitName);
-                       }
-               }
-
-               // resolve the given value if necessary
-               private String getValue(Object value, Set<String> ids) {
-                       if (value == null) {
-                               return "";
-                       }
-                       String idString = getIdValue(value, ids);
-                       if (idString != null) {
-                               return idString;
-                       }
-
-                       idString = getIdListValue(value, ids);
-                       if (idString != null) {
-                               return idString;
-                       }
-
-                       return value.toString();
-               }
-               // get an id from the given value; return null if the value is 
not an id type
-               private String getIdValue(Object value, Set<String> ids) {
-                       if (value instanceof Map) {
-                               Map map = (Map) value;
-                               if (map.size() == 3 && map.containsKey("id")){
-                                       String id = map.get("id").toString();
-                                       if (ids.contains(id)) {
-                                               return id;
-                                       }
-                               }
-                       }
-                       return null;
-               }
-               // get an id list from the given value; return null if the 
value is not an id list type
-               private String getIdListValue(Object value, Set<String> ids) {
-                       if (value instanceof List) {
-                               List list = (List) value;
-                               if (list.size() > 0) {
-                                       StringBuilder sb = new StringBuilder();
-                                       for (Object o : list) {
-                                               String idString = getIdValue(o, 
ids);
-                                               if (idString == null) {
-                                                       return value.toString();
-                                               }
-                                               if (sb.length() > 0) {
-                                                       sb.append(", ");
-                                               }
-                                               sb.append(idString);
-                                       }
-                                       return sb.toString();
-                               }
-                       }
-                       return null;
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java 
b/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java
deleted file mode 100644
index 25083de..0000000
--- a/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.source.file;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.model.TagSink;
-import org.apache.ranger.model.TagSource;
-import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.process.TagSyncConfig;
-
-import java.io.*;
-import java.util.Date;
-import java.util.Properties;
-
-public class TagFileSource implements TagSource, Runnable {
-       private static final Log LOG = LogFactory.getLog(TagFileSource.class);
-
-       private String sourceFileName;
-       private long lastModifiedTimeInMillis = 0L;
-
-       private Gson gson;
-       private TagSink tagSink;
-       private Properties properties;
-
-       @Override
-       public boolean initialize(Properties properties) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.initialize()");
-               }
-
-               if (properties == null || MapUtils.isEmpty(properties)) {
-                       LOG.error("No properties specified for TagFileSource 
initialization");
-                       this.properties = new Properties();
-               } else {
-                       this.properties = properties;
-               }
-
-               boolean ret = true;
-
-               if (ret) {
-
-                       sourceFileName = 
TagSyncConfig.getTagSourceFileName(properties);
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Provided sourceFileName=" + 
sourceFileName);
-                       }
-
-                       String realFileName = 
TagSyncConfig.getResourceFileName(sourceFileName);
-                       if (realFileName != null) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Real sourceFileName=" + 
realFileName);
-                               }
-                               sourceFileName = realFileName;
-                       } else {
-                               LOG.error(sourceFileName + " is not a file or 
is not readable");
-                               ret = false;
-                       }
-               }
-
-               if (ret) {
-                       try {
-                               gson = new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
-                       } catch (Throwable excp) {
-                               LOG.fatal("failed to create GsonBuilder 
object", excp);
-                               ret = false;
-                       }
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.initialize(): 
sourceFileName=" + sourceFileName + ", result=" + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public void setTagSink(TagSink sink) {
-               if (sink == null) {
-                       LOG.error("Sink is null!!!");
-               } else {
-                       this.tagSink = sink;
-               }
-       }
-
-       @Override
-       public Thread start() {
-
-               Thread fileMonitoringThread = null;
-
-               fileMonitoringThread = new Thread(this);
-               fileMonitoringThread.setDaemon(true);
-               fileMonitoringThread.start();
-
-               return fileMonitoringThread;
-       }
-
-       @Override
-       public void run() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.run()");
-               }
-               long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
-               boolean shutdownFlag = false;
-
-               while (!shutdownFlag) {
-
-                       try {
-                               if (isChanged()) {
-                                       LOG.info("Begin: update tags from 
source==>sink");
-                                       if 
(TagSyncConfig.isTagSyncEnabled(properties)) {
-                                               updateSink();
-                                               LOG.info("End: update tags from 
source==>sink");
-                                       } else {
-                                               LOG.info("Tag-sync is not 
enabled.");
-                                       }
-                               } else {
-                                       LOG.debug("TagFileSource: no change 
found for synchronization.");
-                               }
-
-                               LOG.debug("Sleeping for [" + 
sleepTimeBetweenCycleInMillis + "] milliSeconds");
-
-                               Thread.sleep(sleepTimeBetweenCycleInMillis);
-                       }
-                       catch (InterruptedException e) {
-                               LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to 
synchronize tag information", e);
-                               shutdownFlag = true;
-                       }
-                       catch (Throwable t) {
-                               LOG.error("tag-sync thread got an error", t);
-                       }
-               }
-
-               LOG.info("Shutting down the Tag-file-source thread");
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.run()");
-               }
-       }
-
-       @Override
-       public void updateSink() throws Exception {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.updateSink()");
-               }
-               ServiceTags serviceTags = readFromFile();
-
-               if (serviceTags != null) {
-                       tagSink.uploadServiceTags(serviceTags);
-               } else {
-                       LOG.error("Could not read ServiceTags from file");
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.updateSink()");
-               }
-       }
-
-       @Override
-       public  boolean isChanged() {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.isChanged()");
-               }
-               boolean ret = false;
-
-               long modificationTime = getModificationTime();
-
-               if (modificationTime > lastModifiedTimeInMillis) {
-                       if (LOG.isDebugEnabled()) {
-                               Date modifiedDate = new Date(modificationTime);
-                               Date lastModifiedDate = new 
Date(lastModifiedTimeInMillis);
-                               LOG.debug("File modified at " + modifiedDate + 
"last-modified at " + lastModifiedDate);
-                       }
-                       lastModifiedTimeInMillis = modificationTime;
-                       ret = true;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.isChanged(): result=" + 
ret);
-               }
-               return ret;
-       }
-
-       private ServiceTags readFromFile() {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.readFromFile(): 
sourceFileName=" + sourceFileName);
-               }
-
-               ServiceTags ret = null;
-
-               Reader reader = null;
-               try {
-
-                       reader = new 
InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName));
-
-                       ret = gson.fromJson(reader, ServiceTags.class);
-
-               }
-               catch (FileNotFoundException exception) {
-                       LOG.warn("Tag-source file does not exist or not readble 
'" + sourceFileName + "'");
-               }
-               catch (Exception excp) {
-                       LOG.error("failed to load service-tags from Tag-source 
file " + sourceFileName, excp);
-               }
-               finally {
-                       if (reader != null) {
-                               try {
-                                       reader.close();
-                               } catch (Exception excp) {
-                                       LOG.error("error while closing opened 
Tag-source file " + sourceFileName, excp);
-                               }
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.readFromFile(): 
sourceFileName=" + sourceFileName);
-               }
-
-               return ret;
-       }
-
-       private long getModificationTime() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagFileSource.getLastModificationTime(): 
sourceFileName=" + sourceFileName);
-               }
-               long ret = 0L;
-
-               File sourceFile = new File(sourceFileName);
-
-               if (sourceFile.exists() && sourceFile.isFile() && 
sourceFile.canRead()) {
-                       ret = sourceFile.lastModified();
-               } else {
-                       ret = new Date().getTime();
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagFileSource.lastModificationTime(): 
sourceFileName=" + sourceFileName + " result=" + new Date(ret));
-               }
-
-               return ret;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
new file mode 100644
index 0000000..6565159
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.model;
+
+import org.apache.ranger.plugin.store.TagStore;
+import org.apache.ranger.plugin.util.ServiceTags;
+
+import java.util.Properties;
+
+
+public interface TagSink extends TagStore {
+       boolean initialize(Properties properties);
+       void uploadServiceTags(ServiceTags serviceTags) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
new file mode 100644
index 0000000..2df8036
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.model;
+
+import java.util.Properties;
+
+public interface TagSource {
+
+       boolean initialize(Properties properties);
+
+       void setTagSink(TagSink sink);
+
+       void updateSink() throws Exception;
+
+       Thread start();
+
+       boolean isChanged();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
new file mode 100644
index 0000000..7fe6bdb
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+import java.io.*;
+import java.net.URL;
+import java.util.Properties;
+
+public class TagSyncConfig extends Configuration {
+       private static final Logger LOG = Logger.getLogger(TagSyncConfig.class) 
;
+
+       public static final String CONFIG_FILE = "ranger-tagsync-site.xml";
+
+       public static final String DEFAULT_CONFIG_FILE = 
"ranger-tagsync-default.xml";
+
+       public static final String TAGSYNC_ENABLED_PROP = 
"ranger.tagsync.enabled" ;
+
+       public static final String TAGSYNC_LOGDIR_PROP = 
"ranger.tagsync.logdir" ;
+
+       private static final String TAGSYNC_TAGADMIN_REST_URL_PROP = 
"ranger.tagsync.tagadmin.rest.url";
+
+       private static final String TAGSYNC_TAGADMIN_REST_SSL_CONFIG_FILE_PROP 
= "ranger.tagsync.tagadmin.rest.ssl.config.file";
+
+       private static final String 
TAGSYNC_TAGADMIN_SSL_BASICAUTH_USERNAME_PROP = 
"ranger.tagsync.tagadmin.basicauth.username";
+
+       private static final String 
TAGSYNC_TAGADMIN_SSL_BASICAUTH_PASSWORD_PROP = 
"ranger.tagsync.tagadmin.basicauth.password";
+
+       private static final String TAGSYNC_FILESOURCE_FILENAME_PROP = 
"ranger.tagsync.filesource.filename";
+
+       private static final String 
TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP = 
"ranger.tagsync.sleeptimeinmillisbetweensynccycle";
+
+       private static final String TAGSYNC_SOURCE_CLASS_PROP = 
"ranger.tagsync.source.impl.class";
+
+       private static final String TAGSYNC_SINK_CLASS_PROP = 
"ranger.tagsync.sink.impl.class";
+
+       private static final String TAGSYNC_ATLASSOURCE_ENDPOINT_PROP = 
"ranger.tagsync.atlassource.endpoint";
+
+       private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = 
"ranger.tagsync.atlas.";
+
+       private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = 
".ranger.service";
+
+       private static final String 
TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR = "_";
+
+       private static volatile TagSyncConfig instance = null;
+
+       public static TagSyncConfig getInstance() {
+       /*
+               TagSyncConfig ret = instance;
+               if (ret == null) {
+                       synchronized(TagSyncConfig.class) {
+                               if (ret == null) {
+                                       ret = instance = new TagSyncConfig();
+                                       LOG.debug("TagSyncConfig = {" + ret + 
"}");
+                               }
+                       }
+               }
+       */
+               TagSyncConfig newConfig = new TagSyncConfig();
+               return newConfig;
+       }
+
+       public Properties getProperties() {
+               return getProps();
+       }
+
+       public static InputStream getFileInputStream(String path) throws 
FileNotFoundException {
+
+               InputStream ret = null;
+
+               File f = new File(path);
+
+               if (f.exists() && f.isFile() && f.canRead()) {
+                       ret = new FileInputStream(f);
+               } else {
+                       ret = TagSyncConfig.class.getResourceAsStream(path);
+
+                       if (ret == null) {
+                               if (! path.startsWith("/")) {
+                                       ret = 
TagSyncConfig.class.getResourceAsStream("/" + path);
+                               }
+                       }
+
+                       if (ret == null) {
+                               ret = 
ClassLoader.getSystemClassLoader().getResourceAsStream(path) ;
+                               if (ret == null) {
+                                       if (! path.startsWith("/")) {
+                                               ret = 
ClassLoader.getSystemResourceAsStream("/" + path);
+                                       }
+                               }
+                       }
+               }
+
+               return ret;
+       }
+
+       public static String getResourceFileName(String path) {
+
+               String ret = null;
+
+               if (StringUtils.isNotBlank(path)) {
+
+                       File f = new File(path);
+
+                       if (f.exists() && f.isFile() && f.canRead()) {
+                               ret = path;
+                       } else {
+
+                               URL fileURL = 
TagSyncConfig.class.getResource(path);
+                               if (fileURL == null) {
+                                       if (!path.startsWith("/")) {
+                                               fileURL = 
TagSyncConfig.class.getResource("/" + path);
+                                       }
+                               }
+
+                               if (fileURL == null) {
+                                       fileURL = 
ClassLoader.getSystemClassLoader().getResource(path);
+                                       if (fileURL == null) {
+                                               if (!path.startsWith("/")) {
+                                                       fileURL = 
ClassLoader.getSystemClassLoader().getResource("/" + path);
+                                               }
+                                       }
+                               }
+
+                               if (fileURL != null) {
+                                       try {
+                                               ret = fileURL.getFile();
+                                       } catch (Exception exception) {
+                                               LOG.error(path + " is not a 
file", exception);
+                                       }
+                               } else {
+                                       LOG.warn("URL not found for " + path + 
" or no privilege for reading file " + path);
+                               }
+                       }
+               }
+
+               return ret;
+       }
+
+       @Override
+       public String toString() {
+               StringBuffer sb = new StringBuffer();
+
+               
sb.append("DEFAULT_CONFIG_FILE=").append(DEFAULT_CONFIG_FILE).append(", ")
+                               
.append("CONFIG_FILE=").append(CONFIG_FILE).append("\n\n");
+
+               return sb.toString() + super.toString();
+       }
+
+       static public boolean isTagSyncEnabled(Properties prop) {
+               String val = prop.getProperty(TAGSYNC_ENABLED_PROP);
+               return !(val != null && val.trim().equalsIgnoreCase("falae"));
+       }
+
+       static public String getTagSyncLogdir(Properties prop) {
+               String val = prop.getProperty(TAGSYNC_LOGDIR_PROP);
+               return val;
+       }
+
+       static public long getSleepTimeInMillisBetweenCycle(Properties prop) {
+               String val = 
prop.getProperty(TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP);
+               return Long.valueOf(val);
+       }
+
+       static public String getTagSourceClassName(Properties prop) {
+               String val = prop.getProperty(TAGSYNC_SOURCE_CLASS_PROP);
+               if (StringUtils.equalsIgnoreCase(val, "atlas")) {
+                       return 
"org.apache.ranger.tagsync.source.atlas.TagAtlasSource";
+               } else if (StringUtils.equalsIgnoreCase(val, "file")) {
+                       return 
"org.apache.ranger.tagsync.source.file.TagFileSource";
+               } else
+                       return val;
+       }
+
+       static public String getTagSinkClassName(Properties prop) {
+               String val = prop.getProperty(TAGSYNC_SINK_CLASS_PROP);
+               if (StringUtils.equalsIgnoreCase(val, "tagadmin")) {
+                       return 
"org.apache.ranger.tagsync.sink.tagadmin.TagRESTSink";
+               } else
+                       return val;
+       }
+
+       static public String getTagAdminRESTUrl(Properties prop) {
+               String val = prop.getProperty(TAGSYNC_TAGADMIN_REST_URL_PROP);
+               return val;
+       }
+
+       static public String getTagAdminRESTSslConfigFile(Properties prop) {
+               String val = 
prop.getProperty(TAGSYNC_TAGADMIN_REST_SSL_CONFIG_FILE_PROP);
+               return val;
+       }
+
+       static public String getTagAdminUserName(Properties prop) {
+               String val = 
prop.getProperty(TAGSYNC_TAGADMIN_SSL_BASICAUTH_USERNAME_PROP);
+               return val;
+       }
+
+       static public String getTagAdminPassword(Properties prop) {
+               String val = 
prop.getProperty(TAGSYNC_TAGADMIN_SSL_BASICAUTH_PASSWORD_PROP);
+               return val;
+       }
+
+       static public String getTagSourceFileName(Properties prop) {
+               String val = prop.getProperty(TAGSYNC_FILESOURCE_FILENAME_PROP);
+               return val;
+       }
+
+       static public String getAtlasEndpoint(Properties prop) {
+               String val = 
prop.getProperty(TAGSYNC_ATLASSOURCE_ENDPOINT_PROP);
+               return val;
+       }
+
+       static public String getAtlasSslConfigFileName(Properties prop) {
+               return "";
+       }
+
+       static public String getServiceName(String componentName, String 
instanceName, Properties prop) {
+               String propName = TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + 
componentName
+                               + ".instance." + instanceName
+                               + TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX;
+               String val = prop.getProperty(propName);
+               if (StringUtils.isBlank(val)) {
+                       val = instanceName + 
TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + componentName;
+               }
+               return val;
+       }
+
+       private TagSyncConfig() {
+               super(false);
+               init() ;
+       }
+
+       private void init() {
+               readConfigFile(DEFAULT_CONFIG_FILE);
+               readConfigFile(CONFIG_FILE);
+       }
+
+       private void readConfigFile(String fileName) {
+
+               if (StringUtils.isNotBlank(fileName)) {
+                       String fName = getResourceFileName(fileName);
+                       if (StringUtils.isBlank(fName)) {
+                               LOG.warn("Cannot find configuration file " + 
fileName + " in the classpath");
+                       } else {
+                               LOG.info("Loading configuration from " + fName);
+                               addResource(fileName);
+                       }
+               } else {
+                       LOG.error("Configuration fileName is null");
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
new file mode 100644
index 0000000..0235581
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.tagsync.model.TagSource;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TagSynchronizer implements Runnable {
+
+       private static final Logger LOG = 
Logger.getLogger(TagSynchronizer.class);
+
+       private final static int MAX_INIT_RETRIES = 5;
+
+       private boolean shutdownFlag = false;
+       private TagSink tagSink = null;
+       private TagSource tagSource = null;
+       private Properties properties = null;
+
+
+       public static void main(String[] args) {
+
+               TagSyncConfig config = TagSyncConfig.getInstance();
+               Properties props = config.getProperties();
+
+               TagSynchronizer tagSynchronizer = new TagSynchronizer(props);
+
+               tagSynchronizer.run();
+       }
+
+       public TagSynchronizer(Properties properties) {
+               if (properties == null || MapUtils.isEmpty(properties)) {
+                       LOG.error("TagSynchronizer initialized with null 
properties!");
+                       this.properties = new Properties();
+               } else {
+                       this.properties = properties;
+               }
+       }
+
+       public TagSink getTagSink() {
+               return tagSink;
+       }
+
+       public TagSource getTagSource() {
+               return tagSource;
+       }
+
+       @Override
+       public void run() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagSynchronizer.run()");
+               }
+               try {
+                       long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+
+                       boolean initDone = initLoop();
+
+                       if (initDone) {
+
+                               Thread tagSourceThread = tagSource.start();
+
+                               if (tagSourceThread != null) {
+                                       while (!shutdownFlag) {
+                                               try {
+                                                       LOG.debug("Sleeping for 
[" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
+                                                       
Thread.sleep(sleepTimeBetweenCycleInMillis);
+                                               } catch (InterruptedException 
e) {
+                                                       LOG.error("Failed to 
wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting 
to synchronize tag information", e);
+                                               }
+                                       }
+                                       if (shutdownFlag) {
+                                               LOG.info("Interrupting 
tagSourceThread...");
+                                               tagSourceThread.interrupt();
+                                               try {
+                                                       tagSourceThread.join();
+                                               } catch (InterruptedException 
interruptedException) {
+                                                       
LOG.error("tagSourceThread.join() was interrupted");
+                                               }
+                                       }
+                               } else {
+                                       LOG.error("Could not start tagSource 
monitoring thread");
+                               }
+                       } else {
+                               LOG.error("Failed to initialize TagSynchonizer 
after " + MAX_INIT_RETRIES + " retries. Exiting thread");
+                       }
+
+               } catch (Throwable t) {
+                       LOG.error("tag-sync thread got an error", t);
+               } finally {
+                       LOG.error("Shutting down the tag-sync thread");
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagSynchronizer.run()");
+               }
+       }
+
+       public boolean initLoop() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagSynchronizer.initLoop()");
+               }
+               boolean ret = false;
+
+               long sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+
+               for (int initRetries = 0; initRetries < MAX_INIT_RETRIES && 
!ret; initRetries++) {
+
+                       printConfigurationProperties();
+
+                       ret = init();
+
+                       if (!ret) {
+                               LOG.error("Failed to initialize TAG 
source/sink. Will retry after " + sleepTimeBetweenCycleInMillis + " 
milliseconds.");
+                               try {
+                                       LOG.debug("Sleeping for [" + 
sleepTimeBetweenCycleInMillis + "] milliSeconds");
+                                       
Thread.sleep(sleepTimeBetweenCycleInMillis);
+                                       properties = 
TagSyncConfig.getInstance().getProperties();
+                               } catch (Exception e) {
+                                       LOG.error("Failed to wait for [" + 
sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to initialize 
tag source/sink", e);
+                               }
+                       }
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagSynchronizer.initLoop()");
+               }
+               return ret;
+       }
+
+       public boolean init() {
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagSynchronizer.init()");
+               }
+               boolean ret = false;
+               try {
+                       LOG.info("Initializing TAG source and sink");
+                       // Initialize tagSink and tagSource
+                       String tagSourceClassName = 
TagSyncConfig.getTagSourceClassName(properties);
+                       String tagSinkClassName = 
TagSyncConfig.getTagSinkClassName(properties);
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("tagSourceClassName=" + 
tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName);
+                       }
+
+                       Class<TagSource> tagSourceClass = (Class<TagSource>) 
Class.forName(tagSourceClassName);
+                       Class<TagSink> tagSinkClass = (Class<TagSink>) 
Class.forName(tagSinkClassName);
+
+                       tagSink = tagSinkClass.newInstance();
+                       tagSource = tagSourceClass.newInstance();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Created instance of " + 
tagSourceClassName + ", " + tagSinkClassName);
+                       }
+
+                       ret = tagSink.initialize(properties) && 
tagSource.initialize(properties);
+
+                       tagSource.setTagSink(tagSink);
+
+                       LOG.info("Done initializing TAG source and sink");
+               } catch (Throwable t) {
+                       LOG.error("Failed to initialize TAG source/sink. Error 
details: ", t);
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagSynchronizer.init(), result=" + ret);
+               }
+
+               return ret;
+       }
+
+       public void shutdown(String reason) {
+               LOG.info("Received shutdown(), reason=" + reason);
+               this.shutdownFlag = true;
+       }
+
+       public void printConfigurationProperties() {
+               LOG.info("--------------------------------");
+               LOG.info("");
+               LOG.info("Ranger-TagSync Configuration: {\n");
+               if (MapUtils.isNotEmpty(properties)) {
+                       for (Map.Entry<Object, Object> entry : 
properties.entrySet()) {
+                               LOG.info("\tProperty-Name:" + entry.getKey());
+                               LOG.info("\tProperty-Value:" + 
entry.getValue());
+                               LOG.info("\n");
+                       }
+               }
+               LOG.info("\n}");
+               LOG.info("");
+               LOG.info("--------------------------------");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
new file mode 100644
index 0000000..e1bcfbb
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.sink.tagadmin;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.plugin.model.*;
+import org.apache.ranger.plugin.store.PList;
+import org.apache.ranger.plugin.store.ServiceStore;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.plugin.util.SearchFilter;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class TagRESTSink implements TagSink {
+       private static final Log LOG = LogFactory.getLog(TagRESTSink.class);
+
+       private static final String REST_PREFIX = "/service";
+       private static final String MODULE_PREFIX = "/tags";
+
+       private static final String REST_MIME_TYPE_JSON = "application/json" ;
+       private static final String REST_URL_TAGDEFS_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tagdefs/" ;
+       private static final String REST_URL_TAGDEF_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tagdef/" ;
+       private static final String REST_URL_SERVICERESOURCES_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "resources/" ;
+       private static final String REST_URL_SERVICERESOURCE_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "resource/" ;
+       private static final String REST_URL_TAGS_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tags/" ;
+       private static final String REST_URL_TAG_RESOURCE = REST_PREFIX + 
MODULE_PREFIX + "/tag/" ;
+       private static final String REST_URL_TAGRESOURCEMAP_IDS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/tagresourcemapids/";
+       private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = 
REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
+
+       private RangerRESTClient tagRESTClient = null;
+
+       @Override
+       public void init() {}
+
+       @Override
+       public boolean initialize(Properties properties) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> TagRESTSink.initialize()");
+               }
+
+               boolean ret = false;
+
+               String restUrl       = 
TagSyncConfig.getTagAdminRESTUrl(properties);
+               String sslConfigFile = 
TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
+               String userName = TagSyncConfig.getTagAdminUserName(properties);
+               String password = TagSyncConfig.getTagAdminPassword(properties);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("restUrl=" + restUrl);
+                       LOG.debug("sslConfigFile=" + sslConfigFile);
+                       LOG.debug("userName=" + userName);
+                       LOG.debug("password=" + password);
+               }
+               tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
+               if (tagRESTClient != null) {
+                       tagRESTClient.setBasicAuthInfo(userName, password);
+                       ret = true;
+               } else {
+                       LOG.error("Could not create RangerRESTClient");
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== TagRESTSink.initialize(), result=" + 
ret);
+               }
+               return ret;
+       }
+
+       @Override
+       public void setServiceStore(ServiceStore svcStore) {
+
+       }
+
+       @Override
+       public RangerTagDef createTagDef(RangerTagDef tagDef) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> createTagDef(" + tagDef + ")");
+               }
+
+               RangerTagDef ret = null;
+
+               WebResource webResource = 
createWebResource(REST_URL_TAGDEFS_RESOURCE);
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class,
 tagRESTClient.toJson(tagDef));
+
+               if(response != null && response.getStatus() == 200) {
+                       ret = response.getEntity(RangerTagDef.class);
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== createTagDef(" + tagDef + "): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public RangerTagDef updateTagDef(RangerTagDef TagDef) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public void deleteTagDefByName(String name) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public void deleteTagDef(Long id) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> deleteTagDef(" + id  + ")");
+               }
+               WebResource webResource = 
createWebResource(REST_URL_TAGDEF_RESOURCE + Long.toString(id));
+
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
+
+               if(response != null && response.getStatus() == 204) {
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== deleteTagDef(" + id + ")");
+               }
+       }
+
+       @Override
+       public RangerTagDef getTagDef(Long id) throws Exception {
+               throw new Exception("Not implemented");
+
+       }
+
+       @Override
+       public RangerTagDef getTagDefByGuid(String guid) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public RangerTagDef getTagDefByName(String name) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTagDef> getTagDefs(SearchFilter filter) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public PList<RangerTagDef> getPaginatedTagDefs(SearchFilter filter) 
throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<String> getTagTypes() throws Exception {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+
+       @Override
+       public RangerTag createTag(RangerTag tag) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> createTag(" + tag + ")");
+               }
+
+               RangerTag ret = null;
+
+               WebResource webResource = 
createWebResource(REST_URL_TAGS_RESOURCE);
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class,
 tagRESTClient.toJson(tag));
+
+               if(response != null && response.getStatus() == 200) {
+                       ret = response.getEntity(RangerTag.class);
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== createTag(" + tag + "): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public RangerTag updateTag(RangerTag tag) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public void deleteTag(Long id) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> deleteTag(" + id  + ")");
+               }
+               WebResource webResource = 
createWebResource(REST_URL_TAG_RESOURCE + Long.toString(id));
+
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
+
+               if(response != null && response.getStatus() == 204) {
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== deleteTag(" + id + ")");
+               }
+       }
+
+       @Override
+       public RangerTag getTag(Long id) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public RangerTag getTagByGuid(String guid) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTag> getTagsByType(String name) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<Long> getTagIdsForResourceId(Long resourceId) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTag> getTagsForResourceId(Long resourceId) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTag> getTagsForResourceGuid(String resourceGuid) 
throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTag> getTags(SearchFilter filter) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public PList<RangerTag> getPaginatedTags(SearchFilter filter) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+
+       @Override
+       public RangerServiceResource 
createServiceResource(RangerServiceResource resource) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> createServiceResource(" + resource + 
")");
+               }
+
+               RangerServiceResource ret = null;
+
+               WebResource webResource = 
createWebResource(REST_URL_SERVICERESOURCES_RESOURCE);
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class,
 tagRESTClient.toJson(resource));
+
+               if(response != null && response.getStatus() == 200) {
+                       ret = response.getEntity(RangerServiceResource.class);
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== createServiceResource(" + resource + "): 
" + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public RangerServiceResource 
updateServiceResource(RangerServiceResource resource) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public void deleteServiceResource(Long id) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> deleteServiceResource(" + id  + ")");
+               }
+               WebResource webResource = 
createWebResource(REST_URL_SERVICERESOURCE_RESOURCE + Long.toString(id));
+
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class);
+
+               if(response != null && response.getStatus() == 204) {
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== deleteServiceResource(" + id + ")");
+               }
+       }
+
+       @Override
+       public RangerServiceResource getServiceResource(Long id) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public RangerServiceResource getServiceResourceByGuid(String guid) 
throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerServiceResource> getServiceResourcesByService(String 
serviceName) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public RangerServiceResource 
getServiceResourceByResourceSignature(String resourceSignature) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerServiceResource> getServiceResources(SearchFilter 
filter) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public PList<RangerServiceResource> 
getPaginatedServiceResources(SearchFilter filter) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+
+       @Override
+       public RangerTagResourceMap createTagResourceMap(RangerTagResourceMap 
tagResourceMap) throws Exception {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> createTagResourceMap(" + tagResourceMap 
+ ")");
+               }
+
+               RangerTagResourceMap ret = null;
+
+               WebResource webResource = 
createWebResource(REST_URL_TAGRESOURCEMAP_IDS_RESOURCE)
+                               .queryParam("tag-id", 
Long.toString(tagResourceMap.getTagId()))
+                               .queryParam("resource-id", 
Long.toString(tagResourceMap.getResourceId()));
+
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class);
+
+               if(response != null && response.getStatus() == 200) {
+                       ret = response.getEntity(RangerTagResourceMap.class);
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response +"}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       throw new Exception(resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== createTagResourceMap(" + tagResourceMap 
+ "): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public void deleteTagResourceMap(Long id) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public void uploadServiceTags(ServiceTags serviceTags) throws Exception 
{
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> uploadServiceTags()");
+               }
+               WebResource webResource = 
createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE);
+
+               ClientResponse response    = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class,
 tagRESTClient.toJson(serviceTags));
+
+               if(response != null && response.getStatus() == 204) {
+               } else {
+                       LOG.error("RangerAdmin REST call returned with 
response={" + response + "}");
+
+                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
+
+                       LOG.error("Upload of service-tags failed with message " 
+ resp.getMessage());
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== uploadServiceTags()");
+               }
+       }
+
+       @Override
+       public RangerTagResourceMap getTagResourceMap(Long id) throws Exception 
{
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public RangerTagResourceMap getTagResourceMapByGuid(String guid) throws 
Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTagResourceMap> getTagResourceMapsForTagId(Long 
tagId) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTagResourceMap> getTagResourceMapsForTagGuid(String 
tagGuid) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTagResourceMap> getTagResourceMapsForResourceId(Long 
resourceId) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTagResourceMap> 
getTagResourceMapsForResourceGuid(String resourceGuid) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public RangerTagResourceMap getTagResourceMapForTagAndResourceId(Long 
tagId, Long resourceId) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+
+       @Override
+       public RangerTagResourceMap 
getTagResourceMapForTagAndResourceGuid(String tagGuid, String resourceGuid) 
throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public List<RangerTagResourceMap> getTagResourceMaps(SearchFilter 
filter) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       @Override
+       public PList<RangerTagResourceMap> 
getPaginatedTagResourceMaps(SearchFilter filter) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+
+       @Override
+       public ServiceTags getServiceTagsIfUpdated(String serviceName, Long 
lastKnownVersion) throws Exception {
+               throw new Exception("Not implemented");
+       }
+
+       private WebResource createWebResource(String url) {
+               return createWebResource(url, null);
+       }
+
+       private WebResource createWebResource(String url, SearchFilter filter) {
+               WebResource ret = tagRESTClient.getResource(url);
+
+               if(filter != null && !MapUtils.isEmpty(filter.getParams())) {
+                       for(Map.Entry<String, String> e : 
filter.getParams().entrySet()) {
+                               String name  = e.getKey();
+                               String value = e.getValue();
+
+                               ret.queryParam(name, value);
+                       }
+               }
+
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
new file mode 100644
index 0000000..973b7fb
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.typesystem.api.Entity;
+import org.apache.atlas.typesystem.api.Trait;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.plugin.model.RangerTag;
+import org.apache.ranger.plugin.model.RangerTagDef;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.*;
+
+class AtlasNotificationMapper {
+       private static final Log LOG = 
LogFactory.getLog(AtlasNotificationMapper.class);
+
+       public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+       public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+       public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+
+       public static final String RANGER_TYPE_HIVE_DB = "database";
+       public static final String RANGER_TYPE_HIVE_TABLE = "table";
+       public static final String RANGER_TYPE_HIVE_COLUMN = "column";
+
+       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
+       public static final String QUALIFIED_NAME_FORMAT_DELIMITER_STRING = 
"\\.";
+
+
+       private static Properties properties = null;
+
+       public static ServiceTags processEntityNotification(EntityNotification 
entityNotification, Properties props) {
+
+               ServiceTags ret = null;
+               properties = props;
+
+               try {
+                       if (isEntityMappable(entityNotification.getEntity())) {
+                               ret = createServiceTags(entityNotification);
+                       } else {
+                               LOG.info("Ranger not interested in Entity 
Notification for entity-type " + entityNotification.getEntity().getTypeName());
+                       }
+               } catch (Exception exception) {
+                       LOG.error("createServiceTags() failed!! ", exception);
+               }
+               return ret;
+       }
+
+       static private boolean isEntityMappable(Entity entity) {
+               boolean ret = false;
+
+               String entityTypeName = entity.getTypeName();
+
+               if (StringUtils.isNotBlank(entityTypeName)) {
+                       if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_DB) ||
+                                       StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE) ||
+                                       StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
+                               ret = true;
+                       }
+               }
+               return ret;
+       }
+
+       static private ServiceTags createServiceTags(EntityNotification 
entityNotification) throws Exception {
+
+               ServiceTags ret = null;
+
+               EntityNotification.OperationType opType = 
entityNotification.getOperationType();
+               Entity entity = entityNotification.getEntity();
+
+               String opName = entityNotification.getOperationType().name();
+               switch (opType) {
+                       case ENTITY_CREATED: {
+                               ret = getServiceTags(entity, opType);
+                               break;
+                       }
+                       case ENTITY_UPDATED: {
+                               ret = handleEntityUpdate(entity);
+                               break;
+                       }
+                       case TRAIT_ADDED: {
+                               ret = getServiceTags(entity, opType);
+                               break;
+                       }
+                       case TRAIT_DELETED: {
+                               ret = handleTraitDelete(entity);
+                               break;
+                       }
+                       default:
+                               LOG.error("Unknown notification received. Will 
not be handled, notificationType=" + opName);
+               }
+
+               return ret;
+       }
+
+       static private ServiceTags getServiceTags(Entity entity, 
EntityNotification.OperationType opType) throws Exception {
+               ServiceTags ret = null;
+
+
+               List<RangerServiceResource> serviceResources = new 
ArrayList<RangerServiceResource>();
+
+               RangerServiceResource serviceResource = 
getServiceResource(entity, opType);
+               serviceResources.add(serviceResource);
+
+               Map<Long, RangerTag> tags = getTags(entity);
+
+               Map<Long, RangerTagDef> tagDefs = getTagDefs(tags, 
EntityNotification.OperationType.ENTITY_CREATED);
+
+               Map<Long, List<Long>> resourceIdToTagIds = null;
+
+               if (MapUtils.isNotEmpty(tags)) {
+                       resourceIdToTagIds = new HashMap<Long, List<Long>>();
+
+                       List<Long> tagList = new ArrayList<Long>();
+                       for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
+                               tagList.add(entry.getKey());
+                       }
+                       resourceIdToTagIds.put(1L, tagList);
+               }
+
+               ret = new ServiceTags();
+
+               ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+               ret.setServiceName(serviceResource.getServiceName());
+               ret.setServiceResources(serviceResources);
+               ret.setTagDefinitions(tagDefs);
+               ret.setTags(tags);
+               ret.setResourceToTagIds(resourceIdToTagIds);
+
+               return ret;
+       }
+
+
+       static private RangerServiceResource getServiceResource(Entity entity, 
EntityNotification.OperationType opType) throws Exception {
+
+               RangerServiceResource ret = null;
+
+               Map<String, RangerPolicy.RangerPolicyResource> elements = null;
+               String serviceName = null;
+
+               if (opType == EntityNotification.OperationType.ENTITY_CREATED) {
+
+                       elements = new HashMap<String, 
RangerPolicy.RangerPolicyResource>();
+
+                       //String[] components = 
getQualifiedNameComponents(entity);
+                       String[] components = getTempNameComponents(entity);
+                       // components should contain qualifiedName, 
instanceName, dbName, tableName, columnName in that order
+
+
+                       String entityTypeName = entity.getTypeName();
+
+                       String instanceName, dbName, tableName, columnName;
+
+                       if (components.length > 1) {
+                               instanceName = components[1];
+                               serviceName = getServiceName(instanceName, 
entityTypeName);
+                       }
+
+                       if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_DB)) {
+                               if (components.length > 2) {
+                                       dbName = components[2];
+                                       RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                                       elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+
+                               } else {
+                                       LOG.error("invalid qualifiedName for 
HIVE_DB, qualifiedName=" + components[0]);
+                               }
+                       } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE)) {
+                               if (components.length > 3) {
+                                       dbName = components[2];
+                                       tableName = components[3];
+                                       RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                                       elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+                                       RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
+                                       elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
+                               } else {
+                                       LOG.error("invalid qualifiedName for 
HIVE_TABLE, qualifiedName=" + components[0]);
+                               }
+                       } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
+                               LOG.error("HIVE_COLUMN creation is not 
handled.");
+                               throw new Exception("HIVE_COLUMN 
entity-creation not implemented");
+
+                               /*
+                               if (components.length > 4) {
+                                       dbName = components[2];
+                                       tableName = components[3];
+                                       columnName = components[4];
+                                       RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                                       elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+                                       RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
+                                       elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
+                                       RangerPolicy.RangerPolicyResource 
columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName);
+                                       elements.put(RANGER_TYPE_HIVE_COLUMN, 
columnPolicyResource);
+                               } else {
+                                       LOG.error("invalid qualifiedName for 
HIVE_COLUMN, qualifiedName=" + components[0]);
+                               }
+                                                                */
+
+                       }
+               }
+
+               ret = new RangerServiceResource();
+               ret.setGuid(entity.getId().getGuid());
+               ret.setId(1L);
+               ret.setServiceName(serviceName);
+               ret.setResourceElements(elements);
+
+               return ret;
+       }
+
+       static private Map<Long, RangerTag> getTags(Entity entity) {
+               Map<Long, RangerTag> ret = null;
+
+               Map<String, ? extends Trait> traits = entity.getTraits();
+
+               if (MapUtils.isNotEmpty(traits)) {
+                       ret = new HashMap<Long, RangerTag>();
+                       long index = 1;
+
+                       for (Map.Entry<String, ? extends Trait> entry : 
traits.entrySet()) {
+                               String traitName = entry.getKey();
+                               Trait trait = entry.getValue();
+
+                               Map<String, Object> attrValues = 
trait.getValues();
+
+                               Map<String, String> tagAttrValues = new 
HashMap<String, String>();
+
+                               for (Map.Entry<String, Object> attrValueEntry : 
attrValues.entrySet()) {
+                                       String attrName = 
attrValueEntry.getKey();
+                                       Object attrValue = 
attrValueEntry.getValue();
+                                       try {
+                                               String strValue = 
String.class.cast(attrValue);
+                                               tagAttrValues.put(attrName, 
strValue);
+                                       } catch (ClassCastException exception) {
+                                               LOG.error("Cannot cast 
attribute-value to String, skipping... attrName=" + attrName);
+                                       }
+                               }
+
+                               RangerTag tag = new RangerTag();
+
+                               tag.setGuid(entity.getId().getGuid() + "-" + 
traitName);
+                               tag.setType(traitName);
+                               tag.setAttributes(tagAttrValues);
+
+                               ret.put(index++, tag);
+                       }
+               }
+
+               return ret;
+       }
+
+       static private Map<Long, RangerTagDef> getTagDefs(Map<Long, RangerTag> 
tags, EntityNotification.OperationType opType) {
+
+               Map<Long, RangerTagDef> ret = null;
+
+               if (opType == EntityNotification.OperationType.ENTITY_CREATED 
|| opType == EntityNotification.OperationType.TRAIT_ADDED) {
+                       if (MapUtils.isNotEmpty(tags)) {
+                               ret = new HashMap<Long, RangerTagDef>();
+                               for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
+                                       RangerTagDef tagDef = new 
RangerTagDef();
+                                       
tagDef.setName(entry.getValue().getType());
+                                       tagDef.setId(entry.getKey());
+                                       ret.put(entry.getKey(), tagDef);
+                               }
+                       }
+               }
+
+               return ret;
+       }
+
+       static private String[] getQualifiedNameComponents(Entity entity) {
+               String ret[] = new String[5];
+
+               if (StringUtils.equals(entity.getTypeName(), 
ENTITY_TYPE_HIVE_DB)) {
+                       ret[1] = getAttribute(entity.getValues(), 
"clusterName", String.class);
+                       ret[2] = getAttribute(entity.getValues(), "name", 
String.class);
+                       ret[3] = null;
+                       ret[0] = ret[1] + "." + ret[2];
+               } else {
+                       String qualifiedName = getAttribute(entity.getValues(), 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+                       String nameHierarchy[] = 
qualifiedName.split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING);
+
+                       int hierarchyLevels = nameHierarchy.length;
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("----- Entity-Id:" + 
entity.getId().getGuid());
+                               LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
+                               LOG.debug("----- Entity-Qualified-Name:" + 
qualifiedName);
+                               LOG.debug("-----        
Entity-Qualified-Name-Components -----");
+                               for (int i = 0; i < hierarchyLevels; i++) {
+                                       LOG.debug("-----                Index:" 
+ i + " Value:" + nameHierarchy[i]);
+                               }
+                       }
+
+                       int i;
+                       for (i = 0; i < ret.length; i++) {
+                               ret[i] = null;
+                       }
+                       ret[0] = qualifiedName;
+
+                       for (i = 0; i < hierarchyLevels; i++) {
+                               ret[i + 1] = nameHierarchy[i];
+                       }
+               }
+               return ret;
+       }
+
+       static private String getServiceName(String instanceName, String 
entityTypeName) {
+               // Parse entityTypeName to get the Apache-component Name
+               String apacheComponents[] = entityTypeName.split("_");
+               String apacheComponent = null;
+               if (apacheComponents.length > 0) {
+                       apacheComponent = apacheComponents[0].toLowerCase();
+               }
+
+               return TagSyncConfig.getServiceName(apacheComponent, 
instanceName, properties);
+       }
+
+       static private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
+               return type.cast(map.get(name));
+       }
+
+       // Temporary stuff, until qualifiedName is implemented by Atlas
+       static private String[] getTempNameComponents(Entity entity) {
+               String ret[] = new String[4];
+               if (StringUtils.equals(entity.getTypeName(), 
ENTITY_TYPE_HIVE_DB)) {
+                       ret[1] = getAttribute(entity.getValues(), 
"clusterName", String.class);
+                       ret[2] = getAttribute(entity.getValues(), "name", 
String.class);
+                       ret[3] = null;
+                       ret[0] = ret[1] + "." + ret[2];
+               } else if (StringUtils.equals(entity.getTypeName(), 
ENTITY_TYPE_HIVE_TABLE)) {
+                       String qualifiedName = getAttribute(entity.getValues(), 
"name", String.class);
+                       String nameHierarchy[] = qualifiedName.split("\\.@");
+
+                       int hierarchyLevels = nameHierarchy.length;
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("----- Entity-Id:" + 
entity.getId().getGuid());
+                               LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
+                               LOG.debug("----- Entity-Qualified-Name:" + 
qualifiedName);
+                               LOG.debug("-----        
Entity-Qualified-Name-Components -----");
+                               for (int i = 0; i < hierarchyLevels; i++) {
+                                       LOG.debug("-----                Index:" 
+ i + " Value:" + nameHierarchy[i]);
+                               }
+                       }
+
+                       int i;
+                       for (i = 0; i < ret.length; i++) {
+                               ret[i] = null;
+                       }
+                       ret[0] = qualifiedName;
+                       if (hierarchyLevels > 2) {
+                               ret[1] = nameHierarchy[2];
+                       }
+                       if (hierarchyLevels > 1) {
+                               ret[2] = nameHierarchy[1];
+                       }
+                       if (hierarchyLevels > 0) {
+                               ret[3] = nameHierarchy[0];
+                       }
+
+
+               }
+               return ret;
+       }
+
+
+       static private ServiceTags handleEntityUpdate(Entity entity) throws 
Exception {
+
+               throw new Exception("Not implemented");
+
+       }
+
+       static private ServiceTags handleTraitDelete(Entity entity) throws 
Exception {
+
+               throw new Exception("Not implemented");
+       }
+}

Reply via email to