Repository: incubator-ranger
Updated Branches:
  refs/heads/master ffb895e2c -> 9ea1d4ad7


RANGER-807: tagsync fixes to comply with good coding practices


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/9ea1d4ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/9ea1d4ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/9ea1d4ad

Branch: refs/heads/master
Commit: 9ea1d4ad74b1ac7ffc0530aefadf87ffb4f33ec2
Parents: ffb895e
Author: Madhan Neethiraj <[email protected]>
Authored: Tue Jan 12 16:30:35 2016 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Tue Jan 12 16:58:36 2016 -0800

----------------------------------------------------------------------
 .../source/Atlas/AtlasHiveResourceMapper.java   | 203 ------------------
 .../source/Atlas/AtlasResourceMapper.java       |  74 -------
 .../source/Atlas/AtlasResourceMapperUtil.java   | 124 -----------
 .../tagsync/source/Atlas/AtlasTagSource.java    | 197 ------------------
 .../source/atlas/AtlasHiveResourceMapper.java   | 206 +++++++++++++++++++
 .../source/atlas/AtlasNotificationMapper.java   | 190 ++++++++---------
 .../source/atlas/AtlasResourceMapper.java       |  74 +++++++
 .../source/atlas/AtlasResourceMapperUtil.java   | 124 +++++++++++
 .../tagsync/source/atlas/AtlasTagSource.java    | 199 ++++++++++++++++++
 9 files changed, 700 insertions(+), 691 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
deleted file mode 100644
index a17d611..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasHiveResourceMapper.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.plugin.model.RangerPolicy;
-import org.apache.ranger.plugin.model.RangerServiceResource;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class AtlasHiveResourceMapper extends AtlasResourceMapper {
-       private static final Log LOG = 
LogFactory.getLog(AtlasHiveResourceMapper.class);
-
-       public static final String COMPONENT_NAME = "hive";
-
-       public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
-       public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
-       public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
-
-       public static final String RANGER_TYPE_HIVE_DB = "database";
-       public static final String RANGER_TYPE_HIVE_TABLE = "table";
-       public static final String RANGER_TYPE_HIVE_COLUMN = "column";
-
-       public static final String 
ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name";
-
-       static protected final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
-
-       private static String clusterDelimiter = "@";
-
-       private static String qualifiedNameDelimiter = "\\.";
-
-       public static final String[] supportedEntityTypes = { 
ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN };
-
-       public AtlasHiveResourceMapper() {
-               super();
-       }
-
-       @Override
-       public List<String> getSupportedEntityTypes() {
-               return Arrays.asList(supportedEntityTypes);
-       }
-
-       @Override
-       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
-
-               Map<String, RangerPolicy.RangerPolicyResource> elements = new 
HashMap<String, RangerPolicy.RangerPolicyResource>();
-
-               String serviceName = null;
-
-               List<String> components = getQualifiedNameComponents(entity);
-               // components should contain qualifiedName, clusterName, 
dbName, tableName, columnName in that order
-
-               String entityTypeName = entity.getTypeName();
-
-               String qualifiedName = components.get(0);
-
-               String clusterName, dbName, tableName, columnName;
-
-               if (components.size() > 1) {
-                       clusterName = components.get(1);
-                       serviceName = getRangerServiceName(clusterName);
-               }
-
-               if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) {
-                       if (components.size() > 2) {
-                               dbName = components.get(2);
-                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
-
-                       } else {
-                               LOG.error("invalid qualifiedName for HIVE_DB, 
qualifiedName=" + qualifiedName);
-                       }
-               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE)) {
-                       if (components.size() > 3) {
-                               dbName = components.get(2);
-                               tableName = components.get(3);
-                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
-                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
-                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
-                       } else {
-                               LOG.error("invalid qualifiedName for 
HIVE_TABLE, qualifiedName=" + qualifiedName);
-                       }
-               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
-                       if (components.size() > 4) {
-                               dbName = components.get(2);
-                               tableName = components.get(3);
-                               columnName = components.get(4);
-                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
-                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
-                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
-                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
-                               RangerPolicy.RangerPolicyResource 
columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName);
-                               elements.put(RANGER_TYPE_HIVE_COLUMN, 
columnPolicyResource);
-                       } else {
-                               LOG.error("invalid qualifiedName for 
HIVE_COLUMN, qualifiedName=" + qualifiedName);
-                       }
-
-               }
-
-               RangerServiceResource ret = new RangerServiceResource();
-
-               ret.setGuid(entity.getId()._getId());
-               ret.setServiceName(serviceName);
-               ret.setResourceElements(elements);
-
-               return ret;
-       }
-
-       public String getRangerServiceName(String clusterName) {
-               String ret = getRangerServiceName(COMPONENT_NAME, clusterName);
-
-               if (StringUtils.isBlank(ret)) {
-                       ret = clusterName + 
TagSyncConfig.TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + 
COMPONENT_NAME;
-               }
-               return ret;
-       }
-
-       public final List<String> 
getQualifiedNameComponents(IReferenceableInstance entity) throws Exception {
-
-               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entity.getTypeName());
-
-               String qualifiedName = getEntityAttribute(entity, 
qualifiedNameAttributeName, String.class);
-
-               List<String> ret = 
getQualifiedNameComponents(entity.getTypeName(), qualifiedName);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("----- Entity-Id:" + entity.getId()._getId());
-                       LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
-                       LOG.debug("-----        Entity-Components -----");
-                       int i = 0;
-                       for (String value : ret) {
-                               LOG.debug("-----                Index:" + i++ + 
"       Value:" + value);
-                       }
-               }
-               return ret;
-       }
-
-       public final List<String> getQualifiedNameComponents(String 
entityTypeName, String qualifiedName) throws Exception {
-
-               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entityTypeName);
-
-               if (StringUtils.isBlank(qualifiedName)) {
-                       throw new Exception("Could not get a valid value for " 
+ qualifiedNameAttributeName + " attribute from entity.");
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Received .... " + qualifiedNameAttributeName 
+ "=" + qualifiedName + " for entity type " + entityTypeName);
-               }
-
-               String components[] = qualifiedName.split(clusterDelimiter);
-
-               if (components.length != 2) {
-                       throw new Exception("Qualified Name does not contain 
cluster-name, qualifiedName=" + qualifiedName);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("name-hierarchy=" + components[0] + ", 
cluster-name=" + components[1]);
-               }
-
-               String nameHierarchy[] = 
components[0].split(qualifiedNameDelimiter);
-
-               List<String> ret = new ArrayList<String>();
-
-               ret.add(qualifiedName);
-               ret.add(components[1]);
-
-               ret.addAll(Arrays.asList(nameHierarchy));
-
-               return ret;
-       }
-
-       public String getQualifiedNameAttributeName(String entityTypeName) {
-               return  StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE) ?
-                               ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE 
: ENTITY_ATTRIBUTE_QUALIFIED_NAME;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
deleted file mode 100644
index fd94928..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.plugin.model.RangerServiceResource;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.util.Properties;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AtlasResourceMapper {
-       private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapper.class);
-
-       protected Properties properties;
-
-       public AtlasResourceMapper() {
-       }
-
-       public void initialize(Properties properties) {
-               this.properties = properties;
-       }
-
-       abstract public List<String> getSupportedEntityTypes();
-
-       abstract public RangerServiceResource buildResource(final 
IReferenceableInstance entity) throws Exception;
-
-
-       protected String getRangerServiceName(String componentName, String 
atlasInstanceName) {
-               String propName = 
TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName
-                               + ".instance." + atlasInstanceName
-                               + 
TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX;
-
-               return properties.getProperty(propName);
-       }
-
-       static protected <T> T getEntityAttribute(IReferenceableInstance 
entity, String name, Class<T> type) {
-               T ret = null;
-
-               try {
-                       Map<String, Object> valueMap = entity.getValuesMap();
-                       ret = getAttribute(valueMap, name, type);
-               } catch (AtlasException exception) {
-                       LOG.error("Cannot get map of values for entity: " + 
entity.getId()._getId(), exception);
-               }
-
-               return ret;
-       }
-
-       static protected <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
-               return type.cast(map.get(name));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
deleted file mode 100644
index f05d814..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasResourceMapperUtil.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for th
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.commons.lang.StringUtils;
-import org.apache.ranger.plugin.model.RangerServiceResource;
-
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-public class AtlasResourceMapperUtil {
-       private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapperUtil.class);
-
-       private static Map<String, AtlasResourceMapper> atlasResourceMappers = 
new HashMap<String, AtlasResourceMapper>();
-
-       public static boolean isEntityTypeHandled(String entityTypeName) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ")");
-               }
-
-               AtlasResourceMapper mapper = 
atlasResourceMappers.get(entityTypeName);
-
-               boolean ret = mapper != null;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ") : " + ret);
-               }
-
-               return ret;
-       }
-
-       public static RangerServiceResource 
getRangerServiceResource(IReferenceableInstance atlasEntity) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getRangerServiceResource(" + 
atlasEntity.getId()._getId() +")");
-               }
-
-               RangerServiceResource resource = null;
-
-               AtlasResourceMapper mapper = 
atlasResourceMappers.get(atlasEntity.getTypeName());
-
-               if (mapper != null) {
-                       try {
-                               resource = mapper.buildResource(atlasEntity);
-                       } catch (Exception exception) {
-                               LOG.error("Could not get serviceResource for 
atlas entity:" + atlasEntity.getId()._getId() + ": ", exception);
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getRangerServiceResource(" + 
atlasEntity.getId()._getId() +"): resource=" + resource);
-               }
-
-               return resource;
-       }
-
-       static public boolean initializeAtlasResourceMappers(Properties 
properties) {
-               final String MAPPER_NAME_DELIMIER = ",";
-
-               String customMapperNames = 
TagSyncConfig.getCustomAtlasResourceMappers(properties);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
customMapperNames + ")");
-               }
-               boolean ret = true;
-
-               String allResourceMappers = 
"org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper";
-
-               if (StringUtils.isNotBlank(customMapperNames)) {
-                       allResourceMappers = allResourceMappers + 
MAPPER_NAME_DELIMIER + customMapperNames;
-               }
-
-               String[] mapperNamesArray = 
allResourceMappers.split(MAPPER_NAME_DELIMIER);
-
-               List<String> mapperNames = Arrays.asList(mapperNamesArray);
-
-               for (String mapperName : mapperNames) {
-                       mapperName = mapperName.trim();
-                       try {
-                               Class clazz = Class.forName(mapperName);
-                               AtlasResourceMapper resourceMapper = 
(AtlasResourceMapper) clazz.newInstance();
-
-                               resourceMapper.initialize(properties);
-
-                               for (String entityTypeName : 
resourceMapper.getSupportedEntityTypes()) {
-                                       add(entityTypeName, resourceMapper);
-                               }
-
-                       } catch (Exception exception) {
-                               LOG.error("Failed to create 
AtlasResourceMapper:" + mapperName + ": ", exception);
-                               ret = false;
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
allResourceMappers + "): " + ret);
-               }
-               return ret;
-       }
-
-       private static void add(String entityType, AtlasResourceMapper mapper) {
-               atlasResourceMappers.put(entityType, mapper);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
deleted file mode 100644
index 7694b37..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/Atlas/AtlasTagSource.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Provider;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.notification.entity.EntityNotification;
-
-import org.apache.ranger.tagsync.model.AbstractTagSource;
-import org.apache.ranger.plugin.util.ServiceTags;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-public class AtlasTagSource extends AbstractTagSource {
-       private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
-
-       public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"application.properties";
-
-       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.kafka.bootstrap.servers";
-       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
-       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
-
-       private ConsumerRunnable consumerTask;
-
-       @Override
-       public boolean initialize(Properties properties) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> AtlasTagSource.initialize()");
-               }
-
-               Properties atlasProperties = new Properties();
-
-               boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
-
-               if (ret) {
-
-                       InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
-
-                       if (inputStream != null) {
-                               try {
-                                       atlasProperties.load(inputStream);
-                               } catch (Exception exception) {
-                                       ret = false;
-                                       LOG.error("Cannot load Atlas 
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, 
exception);
-                               } finally {
-                                       try {
-                                               inputStream.close();
-                                       } catch (IOException ioException) {
-                                               LOG.error("Cannot close Atlas 
application properties file, file-name:\" + 
TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
-                                       }
-                               }
-                       } else {
-                               ret = false;
-                               LOG.error("Cannot find Atlas application 
properties file");
-                       }
-               }
-
-               if (ret) {
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
-                       }
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
-                       }
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
-                       }
-               }
-
-               if (ret) {
-                       NotificationModule notificationModule = new 
NotificationModule();
-
-                       Injector injector = 
Guice.createInjector(notificationModule);
-
-                       Provider<NotificationInterface> consumerProvider = 
injector.getProvider(NotificationInterface.class);
-                       NotificationInterface notification = 
consumerProvider.get();
-                       List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
-
-                       consumerTask = new ConsumerRunnable(iterators.get(0));
-
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== AtlasTagSource.initialize(), result=" + 
ret);
-               }
-               return ret;
-       }
-
-       @Override
-       public boolean start() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> AtlasTagSource.start()");
-               }
-               Thread consumerThread = null;
-               if (consumerTask == null) {
-                       LOG.error("No consumerTask!!!");
-               } else {
-                       consumerThread = new Thread(consumerTask);
-                       consumerThread.setDaemon(true);
-                       consumerThread.start();
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== AtlasTagSource.start()");
-               }
-               return consumerThread != null;
-       }
-
-       @Override
-       public boolean isChanged() {
-               return true;
-       }
-
-       private static void printEntityNotification(EntityNotification 
notification) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Notification-Type: " + 
notification.getOperationType());
-                       AtlasEntityWithTraits entityWithTraits = new 
AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits());
-                       LOG.debug(entityWithTraits);
-               }
-       }
-
-       private class ConsumerRunnable implements Runnable {
-
-               private final NotificationConsumer<EntityNotification> consumer;
-
-               private 
ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
-                       this.consumer = consumer;
-               }
-
-               private boolean hasNext() {
-                       boolean ret = false;
-                       try {
-                               ret = consumer.hasNext();
-                       } catch (Exception exception) {
-                               LOG.error("EntityNotification consumer threw 
exception, IGNORING...:", exception);
-                       }
-                       return ret;
-               }
-
-               @Override
-               public void run() {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("==> ConsumerRunnable.run()");
-                       }
-                       while (!shutdown) {
-                               if (hasNext()) {
-                                       EntityNotification notification = 
consumer.next();
-                                       if (notification != null) {
-                                               
printEntityNotification(notification);
-
-                                               ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification);
-                                               if (serviceTags == null) {
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               LOG.debug("Did 
not create ServiceTags structure for notification type:" + 
notification.getOperationType());
-                                                       }
-                                               } else {
-                                                       updateSink(serviceTags);
-                                               }
-                                       }
-                               }
-                       }
-                       LOG.info("Shutting down the Tag-Atlas-source thread");
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
new file mode 100644
index 0000000..9a6fc13
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class AtlasHiveResourceMapper extends AtlasResourceMapper {
+       private static final Log LOG = 
LogFactory.getLog(AtlasHiveResourceMapper.class);
+
+       public static final String COMPONENT_NAME = "hive";
+
+       public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+       public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+       public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+
+       public static final String RANGER_TYPE_HIVE_DB = "database";
+       public static final String RANGER_TYPE_HIVE_TABLE = "table";
+       public static final String RANGER_TYPE_HIVE_COLUMN = "column";
+
+       public static final String 
ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name";
+
+       static protected final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
+
+       private static String clusterDelimiter = "@";
+
+       private static String qualifiedNameDelimiter = "\\.";
+
+       public static final String[] supportedEntityTypes = { 
ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN };
+
+       public AtlasHiveResourceMapper() {
+               super();
+       }
+
+       @Override
+       public List<String> getSupportedEntityTypes() {
+               return Arrays.asList(supportedEntityTypes);
+       }
+
+       @Override
+       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
+
+               Map<String, RangerPolicy.RangerPolicyResource> elements = new 
HashMap<String, RangerPolicy.RangerPolicyResource>();
+
+               String serviceName = null;
+
+               List<String> components = getQualifiedNameComponents(entity);
+               // components should contain qualifiedName, clusterName, 
dbName, tableName, columnName in that order
+
+               String entityTypeName = entity.getTypeName();
+
+               String qualifiedName = components.get(0);
+
+               String clusterName, dbName, tableName, columnName;
+
+               if (components.size() > 1) {
+                       clusterName = components.get(1);
+                       serviceName = getRangerServiceName(clusterName);
+               }
+
+               if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) {
+                       if (components.size() > 2) {
+                               dbName = components.get(2);
+                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+
+                       } else {
+                               LOG.error("invalid qualifiedName for HIVE_DB, 
qualifiedName=" + qualifiedName);
+                               throw new Exception("invalid qualifiedName for 
HIVE_DB, qualifiedName=" + qualifiedName);
+                       }
+               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE)) {
+                       if (components.size() > 3) {
+                               dbName = components.get(2);
+                               tableName = components.get(3);
+                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
+                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
+                       } else {
+                               LOG.error("invalid qualifiedName for 
HIVE_TABLE, qualifiedName=" + qualifiedName);
+                               throw new Exception("invalid qualifiedName for 
HIVE_TABLE, qualifiedName=" + qualifiedName);
+                       }
+               } else if (StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_COLUMN)) {
+                       if (components.size() > 4) {
+                               dbName = components.get(2);
+                               tableName = components.get(3);
+                               columnName = components.get(4);
+                               RangerPolicy.RangerPolicyResource 
dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName);
+                               elements.put(RANGER_TYPE_HIVE_DB, 
dbPolicyResource);
+                               RangerPolicy.RangerPolicyResource 
tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName);
+                               elements.put(RANGER_TYPE_HIVE_TABLE, 
tablePolicyResource);
+                               RangerPolicy.RangerPolicyResource 
columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName);
+                               elements.put(RANGER_TYPE_HIVE_COLUMN, 
columnPolicyResource);
+                       } else {
+                               LOG.error("invalid qualifiedName for 
HIVE_COLUMN, qualifiedName=" + qualifiedName);
+                               throw new Exception("invalid qualifiedName for 
HIVE_COLUMN, qualifiedName=" + qualifiedName);
+                       }
+
+               }
+
+               RangerServiceResource ret = new RangerServiceResource();
+
+               ret.setGuid(entity.getId()._getId());
+               ret.setServiceName(serviceName);
+               ret.setResourceElements(elements);
+
+               return ret;
+       }
+
+       public String getRangerServiceName(String clusterName) {
+               String ret = getRangerServiceName(COMPONENT_NAME, clusterName);
+
+               if (StringUtils.isBlank(ret)) {
+                       ret = clusterName + 
TagSyncConfig.TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + 
COMPONENT_NAME;
+               }
+               return ret;
+       }
+
+       public final List<String> 
getQualifiedNameComponents(IReferenceableInstance entity) throws Exception {
+
+               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entity.getTypeName());
+
+               String qualifiedName = getEntityAttribute(entity, 
qualifiedNameAttributeName, String.class);
+
+               List<String> ret = 
getQualifiedNameComponents(entity.getTypeName(), qualifiedName);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("----- Entity-Id:" + entity.getId()._getId());
+                       LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
+                       LOG.debug("-----        Entity-Components -----");
+                       int i = 0;
+                       for (String value : ret) {
+                               LOG.debug("-----                Index:" + i++ + 
"       Value:" + value);
+                       }
+               }
+               return ret;
+       }
+
+       public final List<String> getQualifiedNameComponents(String 
entityTypeName, String qualifiedName) throws Exception {
+
+               String qualifiedNameAttributeName = 
getQualifiedNameAttributeName(entityTypeName);
+
+               if (StringUtils.isBlank(qualifiedName)) {
+                       throw new Exception("Could not get a valid value for " 
+ qualifiedNameAttributeName + " attribute from entity.");
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Received .... " + qualifiedNameAttributeName 
+ "=" + qualifiedName + " for entity type " + entityTypeName);
+               }
+
+               String components[] = qualifiedName.split(clusterDelimiter);
+
+               if (components.length != 2) {
+                       throw new Exception("Qualified Name does not contain 
cluster-name, qualifiedName=" + qualifiedName);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("name-hierarchy=" + components[0] + ", 
cluster-name=" + components[1]);
+               }
+
+               String nameHierarchy[] = 
components[0].split(qualifiedNameDelimiter);
+
+               List<String> ret = new ArrayList<String>();
+
+               ret.add(qualifiedName);
+               ret.add(components[1]);
+
+               ret.addAll(Arrays.asList(nameHierarchy));
+
+               return ret;
+       }
+
+       public String getQualifiedNameAttributeName(String entityTypeName) {
+               return  StringUtils.equals(entityTypeName, 
ENTITY_TYPE_HIVE_TABLE) ?
+                               ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE 
: ENTITY_ATTRIBUTE_QUALIFIED_NAME;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 803a8a9..2168983 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -27,9 +27,11 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.ranger.plugin.model.RangerServiceResource;
 import org.apache.ranger.plugin.model.RangerTag;
 import org.apache.ranger.plugin.model.RangerTagDef;
+import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef;
 import org.apache.ranger.plugin.util.ServiceTags;
 
 import java.util.*;
@@ -47,7 +49,7 @@ public class AtlasNotificationMapper {
 
                                if 
(AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) {
                                        AtlasEntityWithTraits entityWithTraits 
= new AtlasEntityWithTraits(entityNotification.getEntity(), 
entityNotification.getAllTraits());
-                                       ret = 
buildServiceTags(entityWithTraits, 1L, 1L, null);
+                                       ret = 
buildServiceTags(entityWithTraits, null);
                                } else {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Ranger not 
interested in Entity Notification for entity-type " + 
entityNotification.getEntity().getTypeName());
@@ -99,83 +101,90 @@ public class AtlasNotificationMapper {
 
                Map<String, ServiceTags> ret = new HashMap<String, 
ServiceTags>();
 
-               long serviceResourceIndex = 1L;
-               long tagIndex = 1L;
-
                for (AtlasEntityWithTraits element : entitiesWithTraits) {
-
-                       ServiceTags serviceTags = buildServiceTags(element, 
serviceResourceIndex, tagIndex, ret);
-
-                       serviceResourceIndex++;
-
-                       tagIndex += CollectionUtils.size(serviceTags.getTags());
-
+                       buildServiceTags(element, ret);
                }
 
                // Remove duplicate tag definitions
-               for (Map.Entry<String, ServiceTags> serviceTagsMapEntry : 
ret.entrySet()){
-
-                       Map<Long, RangerTagDef> allTagDefs = 
serviceTagsMapEntry.getValue().getTagDefinitions();
-
-                       Map<String, String> tagTypeIndex = new HashMap<String, 
String>();
-                       Map<Long, RangerTagDef> uniqueTagDefs = new 
HashMap<Long, RangerTagDef>();
-
-                       for (Map.Entry<Long, RangerTagDef> entry : 
allTagDefs.entrySet()) {
-                               String tagTypeName = entry.getValue().getName();
+               if(CollectionUtils.isNotEmpty(ret.values())) {
+                       for (ServiceTags serviceTag : ret.values()) {
+                               
if(MapUtils.isNotEmpty(serviceTag.getTagDefinitions())) {
+                                       Map<String, RangerTagDef> uniqueTagDefs 
= new HashMap<String, RangerTagDef>();
+
+                                       for (RangerTagDef tagDef : 
serviceTag.getTagDefinitions().values()) {
+                                               RangerTagDef existingTagDef = 
uniqueTagDefs.get(tagDef.getName());
+
+                                               if (existingTagDef == null) {
+                                                       
uniqueTagDefs.put(tagDef.getName(), tagDef);
+                                               } else {
+                                                       
if(CollectionUtils.isNotEmpty(tagDef.getAttributeDefs())) {
+                                                               
for(RangerTagAttributeDef tagAttrDef : tagDef.getAttributeDefs()) {
+                                                                       boolean 
attrDefExists = false;
+
+                                                                       
if(CollectionUtils.isNotEmpty(existingTagDef.getAttributeDefs())) {
+                                                                               
for(RangerTagAttributeDef existingTagAttrDef : 
existingTagDef.getAttributeDefs()) {
+                                                                               
        if(StringUtils.equalsIgnoreCase(existingTagAttrDef.getName(), 
tagAttrDef.getName())) {
+                                                                               
                attrDefExists = true;
+                                                                               
                break;
+                                                                               
        }
+                                                                               
}
+                                                                       }
+
+                                                                       if(! 
attrDefExists) {
+                                                                               
existingTagDef.getAttributeDefs().add(tagAttrDef);
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+                                       }
 
-                               if (tagTypeIndex.get(tagTypeName) == null) {
-                                       tagTypeIndex.put(tagTypeName, 
tagTypeName);
-                                       uniqueTagDefs.put(entry.getKey(), 
entry.getValue());
+                                       serviceTag.getTagDefinitions().clear();
+                                       for(RangerTagDef tagDef : 
uniqueTagDefs.values()) {
+                                               
serviceTag.getTagDefinitions().put(tagDef.getId(), tagDef);
+                                       }
                                }
                        }
-                       
serviceTagsMapEntry.getValue().setTagDefinitions(uniqueTagDefs);
                }
 
                return ret;
        }
 
-       static private ServiceTags buildServiceTags(AtlasEntityWithTraits 
entityWithTraits, long index, long tagIndex, Map<String, ServiceTags> 
serviceTagsMap) throws Exception {
-
-               ServiceTags ret = null;
-
-               IReferenceableInstance entity = entityWithTraits.getEntity();
-
-               RangerServiceResource serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
+       static private ServiceTags buildServiceTags(AtlasEntityWithTraits 
entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
+               ServiceTags            ret             = null;
+               IReferenceableInstance entity          = 
entityWithTraits.getEntity();
+               RangerServiceResource  serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
 
                if (serviceResource != null) {
-
-                       serviceResource.setId(index);
+                       List<RangerTag>    tags    = getTags(entityWithTraits);
+                       List<RangerTagDef> tagDefs = 
getTagDefs(entityWithTraits);
 
                        String serviceName = serviceResource.getServiceName();
 
-                       Map<Long, RangerTag> tags = getTags(entityWithTraits, 
tagIndex);
-
-                       Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
+                       ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
 
-                       Map<Long, List<Long>> resourceIdToTagIds = null;
+                       
serviceResource.setId((long)ret.getServiceResources().size());
+                       ret.getServiceResources().add(serviceResource);
 
-                       resourceIdToTagIds = new HashMap<Long, List<Long>>();
-                       List<Long> tagList = new ArrayList<Long>();
+                       List<Long> tagIds = new ArrayList<Long>();
 
-                       if (MapUtils.isNotEmpty(tags)) {
-                               resourceIdToTagIds = new HashMap<Long, 
List<Long>>();
+                       if(CollectionUtils.isNotEmpty(tags)) {
+                               for(RangerTag tag : tags) {
+                                       tag.setId((long)ret.getTags().size());
+                                       ret.getTags().put(tag.getId(), tag);
 
-                               for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
-                                       tagList.add(entry.getKey());
+                                       tagIds.add(tag.getId());
                                }
                        }
+                       ret.getResourceToTagIds().put(serviceResource.getId(), 
tagIds);
 
-                       resourceIdToTagIds.put(index, tagList);
-
-                       ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
-
-                       ret.getServiceResources().add(serviceResource);
-                       ret.getTagDefinitions().putAll(tagDefs);
-                       ret.getTags().putAll(tags);
-                       ret.getResourceToTagIds().putAll(resourceIdToTagIds);
-
+                       if(CollectionUtils.isNotEmpty(tagDefs)) {
+                               for(RangerTagDef tagDef : tagDefs) {
+                                       
tagDef.setId((long)ret.getTagDefinitions().size());
+                                       
ret.getTagDefinitions().put(tagDef.getId(), tagDef);
+                               }
+                       }
                } else {
-                       LOG.error("AtlasResourceMapper not found for 
entity-type:" + entity.getTypeName());
+                       LOG.error("Failed to build serviceResource for entity:" 
+ entity.getId()._getId());
                }
 
                return ret;
@@ -199,67 +208,62 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private Map<Long, RangerTag> getTags(AtlasEntityWithTraits 
entityWithTraits, long index) {
-               Map<Long, RangerTag> ret = null;
-
-               ret = new HashMap<Long, RangerTag>();
-
-               List<IStruct> traits = entityWithTraits.getAllTraits();
-
-               for (IStruct trait : traits) {
+       static private List<RangerTag> getTags(AtlasEntityWithTraits 
entityWithTraits) {
+               List<RangerTag> ret = new ArrayList<RangerTag>();
 
-                       String traitName = trait.getTypeName();
+               if(entityWithTraits != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+                       List<IStruct> traits = entityWithTraits.getAllTraits();
 
-                       Map<String, String> tagAttrValues = new HashMap<String, 
String>();
+                       for (IStruct trait : traits) {
+                               Map<String, String> tagAttrs = new 
HashMap<String, String>();
 
-                       try {
+                               try {
+                                       Map<String, Object> attrs = 
trait.getValuesMap();
 
-                               Map<String, Object> attrValues = 
trait.getValuesMap();
+                                       if(MapUtils.isNotEmpty(attrs)) {
+                                               for (Map.Entry<String, Object> 
attrEntry : attrs.entrySet()) {
+                                                       String attrName  = 
attrEntry.getKey();
+                                                       Object attrValue = 
attrEntry.getValue();
 
-                               for (Map.Entry<String, Object> attrValueEntry : 
attrValues.entrySet()) {
-                                       String attrName = 
attrValueEntry.getKey();
-                                       Object attrValue = 
attrValueEntry.getValue();
-                                       try {
-                                               String strValue = 
String.class.cast(attrValue);
-                                               tagAttrValues.put(attrName, 
strValue);
-                                       } catch (ClassCastException exception) {
-                                               LOG.error("Cannot cast 
attribute-value to String, skipping... attrName=" + attrName);
+                                                       tagAttrs.put(attrName, 
attrValue != null ? attrValue.toString() : null);
+                                               }
                                        }
+                               } catch (AtlasException exception) {
+                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
                                }
-                       } catch (AtlasException exception) {
-                               LOG.error("Could not get values for trait:" + 
traitName, exception);
-                       }
-
-                       RangerTag tag = new RangerTag();
-
-                       tag.setType(traitName);
-                       tag.setAttributes(tagAttrValues);
-
-                       ret.put(index++, tag);
 
+                               ret.add(new RangerTag(trait.getTypeName(), 
tagAttrs));
+                       }
                }
 
                return ret;
        }
 
-       static private Map<Long, RangerTagDef> getTagDefs(Map<Long, RangerTag> 
tags) {
+       static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits 
entityWithTraits) {
+               List<RangerTagDef> ret = new ArrayList<RangerTagDef>();
 
-               Map<Long, RangerTagDef> ret = null;
+               if(entityWithTraits != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+                       List<IStruct> traits = entityWithTraits.getAllTraits();
 
-               if (MapUtils.isNotEmpty(tags)) {
-                       ret = new HashMap<Long, RangerTagDef>();
+                       for (IStruct trait : traits) {
+                               RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
 
-                       for (Map.Entry<Long, RangerTag> entry : 
tags.entrySet()) {
+                               try {
+                                       Map<String, Object> attrs = 
trait.getValuesMap();
 
-                               RangerTagDef tagDef = new RangerTagDef();
-                               tagDef.setName(entry.getValue().getType());
-                               tagDef.setId(entry.getKey());
-                               ret.put(entry.getKey(), tagDef);
+                                       if(MapUtils.isNotEmpty(attrs)) {
+                                               for (String attrName : 
attrs.keySet()) {
+                                                       
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+                                               }
+                                       }
+                               } catch (AtlasException exception) {
+                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
+                               }
 
+                               ret.add(tagDef);
                        }
                }
 
                return ret;
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
new file mode 100644
index 0000000..fd94928
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.util.Properties;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AtlasResourceMapper {
+       private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapper.class);
+
+       protected Properties properties;
+
+       public AtlasResourceMapper() {
+       }
+
+       public void initialize(Properties properties) {
+               this.properties = properties;
+       }
+
+       abstract public List<String> getSupportedEntityTypes();
+
+       abstract public RangerServiceResource buildResource(final 
IReferenceableInstance entity) throws Exception;
+
+
+       protected String getRangerServiceName(String componentName, String 
atlasInstanceName) {
+               String propName = 
TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName
+                               + ".instance." + atlasInstanceName
+                               + 
TagSyncConfig.TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX;
+
+               return properties.getProperty(propName);
+       }
+
+       static protected <T> T getEntityAttribute(IReferenceableInstance 
entity, String name, Class<T> type) {
+               T ret = null;
+
+               try {
+                       Map<String, Object> valueMap = entity.getValuesMap();
+                       ret = getAttribute(valueMap, name, type);
+               } catch (AtlasException exception) {
+                       LOG.error("Cannot get map of values for entity: " + 
entity.getId()._getId(), exception);
+               }
+
+               return ret;
+       }
+
+       static protected <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
+               return type.cast(map.get(name));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
new file mode 100644
index 0000000..f05d814
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for th
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+public class AtlasResourceMapperUtil {
+       private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapperUtil.class);
+
+       private static Map<String, AtlasResourceMapper> atlasResourceMappers = 
new HashMap<String, AtlasResourceMapper>();
+
+       public static boolean isEntityTypeHandled(String entityTypeName) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ")");
+               }
+
+               AtlasResourceMapper mapper = 
atlasResourceMappers.get(entityTypeName);
+
+               boolean ret = mapper != null;
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ") : " + ret);
+               }
+
+               return ret;
+       }
+
+       public static RangerServiceResource 
getRangerServiceResource(IReferenceableInstance atlasEntity) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getRangerServiceResource(" + 
atlasEntity.getId()._getId() +")");
+               }
+
+               RangerServiceResource resource = null;
+
+               AtlasResourceMapper mapper = 
atlasResourceMappers.get(atlasEntity.getTypeName());
+
+               if (mapper != null) {
+                       try {
+                               resource = mapper.buildResource(atlasEntity);
+                       } catch (Exception exception) {
+                               LOG.error("Could not get serviceResource for 
atlas entity:" + atlasEntity.getId()._getId() + ": ", exception);
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== getRangerServiceResource(" + 
atlasEntity.getId()._getId() +"): resource=" + resource);
+               }
+
+               return resource;
+       }
+
+       static public boolean initializeAtlasResourceMappers(Properties 
properties) {
+               final String MAPPER_NAME_DELIMIER = ",";
+
+               String customMapperNames = 
TagSyncConfig.getCustomAtlasResourceMappers(properties);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
customMapperNames + ")");
+               }
+               boolean ret = true;
+
+               String allResourceMappers = 
"org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper";
+
+               if (StringUtils.isNotBlank(customMapperNames)) {
+                       allResourceMappers = allResourceMappers + 
MAPPER_NAME_DELIMIER + customMapperNames;
+               }
+
+               String[] mapperNamesArray = 
allResourceMappers.split(MAPPER_NAME_DELIMIER);
+
+               List<String> mapperNames = Arrays.asList(mapperNamesArray);
+
+               for (String mapperName : mapperNames) {
+                       mapperName = mapperName.trim();
+                       try {
+                               Class clazz = Class.forName(mapperName);
+                               AtlasResourceMapper resourceMapper = 
(AtlasResourceMapper) clazz.newInstance();
+
+                               resourceMapper.initialize(properties);
+
+                               for (String entityTypeName : 
resourceMapper.getSupportedEntityTypes()) {
+                                       add(entityTypeName, resourceMapper);
+                               }
+
+                       } catch (Exception exception) {
+                               LOG.error("Failed to create 
AtlasResourceMapper:" + mapperName + ": ", exception);
+                               ret = false;
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
allResourceMappers + "): " + ret);
+               }
+               return ret;
+       }
+
+       private static void add(String entityType, AtlasResourceMapper mapper) {
+               atlasResourceMappers.put(entityType, mapper);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/9ea1d4ad/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
new file mode 100644
index 0000000..2499177
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provider;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.EntityNotification;
+
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+public class AtlasTagSource extends AbstractTagSource {
+       private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
+
+       public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"application.properties";
+
+       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.kafka.bootstrap.servers";
+       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
+       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
+
+       private ConsumerRunnable consumerTask;
+
+       @Override
+       public boolean initialize(Properties properties) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasTagSource.initialize()");
+               }
+
+               Properties atlasProperties = new Properties();
+
+               boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
+
+               if (ret) {
+
+                       InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
+
+                       if (inputStream != null) {
+                               try {
+                                       atlasProperties.load(inputStream);
+                               } catch (Exception exception) {
+                                       ret = false;
+                                       LOG.error("Cannot load Atlas 
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, 
exception);
+                               } finally {
+                                       try {
+                                               inputStream.close();
+                                       } catch (IOException ioException) {
+                                               LOG.error("Cannot close Atlas 
application properties file, file-name:\" + 
TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
+                                       }
+                               }
+                       } else {
+                               ret = false;
+                               LOG.error("Cannot find Atlas application 
properties file");
+                       }
+               }
+
+               if (ret) {
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
+                       }
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
+                       }
+                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP)))
 {
+                               ret = false;
+                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
+                       }
+               }
+
+               if (ret) {
+                       NotificationModule notificationModule = new 
NotificationModule();
+
+                       Injector injector = 
Guice.createInjector(notificationModule);
+
+                       Provider<NotificationInterface> consumerProvider = 
injector.getProvider(NotificationInterface.class);
+                       NotificationInterface notification = 
consumerProvider.get();
+                       List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
+
+                       consumerTask = new ConsumerRunnable(iterators.get(0));
+
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AtlasTagSource.initialize(), result=" + 
ret);
+               }
+               return ret;
+       }
+
+       @Override
+       public boolean start() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AtlasTagSource.start()");
+               }
+               Thread consumerThread = null;
+               if (consumerTask == null) {
+                       LOG.error("No consumerTask!!!");
+               } else {
+                       consumerThread = new Thread(consumerTask);
+                       consumerThread.setDaemon(true);
+                       consumerThread.start();
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AtlasTagSource.start()");
+               }
+               return consumerThread != null;
+       }
+
+       @Override
+       public boolean isChanged() {
+               return true;
+       }
+
+       private static String getPrintableEntityNotification(EntityNotification 
notification) {
+               StringBuilder sb = new StringBuilder();
+
+               sb.append("{ Notification-Type: 
").append(notification.getOperationType()).append(", ");
+               AtlasEntityWithTraits entityWithTraits = new 
AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits());
+               sb.append(entityWithTraits.toString());
+               sb.append("}");
+               return sb.toString();
+       }
+
+       private class ConsumerRunnable implements Runnable {
+
+               private final NotificationConsumer<EntityNotification> consumer;
+
+               private 
ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
+                       this.consumer = consumer;
+               }
+
+               private boolean hasNext() {
+                       boolean ret = false;
+                       try {
+                               ret = consumer.hasNext();
+                       } catch (Exception exception) {
+                               LOG.error("EntityNotification consumer threw 
exception, IGNORING...:", exception);
+                       }
+                       return ret;
+               }
+
+               @Override
+               public void run() {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("==> ConsumerRunnable.run()");
+                       }
+                       while (!shutdown) {
+                               if (hasNext()) {
+                                       EntityNotification notification = 
consumer.next();
+                                       if (notification != null) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+                                               }
+
+                                               ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification);
+                                               if (serviceTags == null) {
+                                                       LOG.error("Failed to 
create ServiceTags for notification :" + 
getPrintableEntityNotification(notification));
+                                               } else {
+                                                       updateSink(serviceTags);
+                                               }
+                                       }
+                               }
+                       }
+                       LOG.info("Shutting down the Tag-Atlas-source thread");
+               }
+       }
+}
+

Reply via email to