Repository: ranger
Updated Branches:
  refs/heads/ranger-0.7 8ebad64ef -> 30b1188fe


RANGER-1897: tagsync update to replace Atlas V1 API usage with Atlas V2 API for 
tag-download using REST


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/30b1188f
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/30b1188f
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/30b1188f

Branch: refs/heads/ranger-0.7
Commit: 30b1188fe54788bcca3216dbeeb2f956e5cb9c9d
Parents: 8ebad64
Author: Madhan Neethiraj <[email protected]>
Authored: Tue Nov 21 11:03:53 2017 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Sat Nov 25 11:59:58 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   9 +-
 src/main/assembly/tagsync.xml                   |  12 +-
 tagsync/pom.xml                                 |  42 ++-
 .../source/atlas/AtlasHbaseResourceMapper.java  |  19 +-
 .../source/atlas/AtlasHdfsResourceMapper.java   |  17 +-
 .../source/atlas/AtlasHiveResourceMapper.java   |  19 +-
 .../source/atlas/AtlasKafkaResourceMapper.java  |  15 +-
 .../source/atlas/AtlasNotificationMapper.java   | 317 ++++++++++++++----
 .../source/atlas/AtlasResourceMapper.java       |   7 +
 .../source/atlas/AtlasResourceMapperUtil.java   |  25 ++
 .../tagsync/source/atlas/AtlasTagSource.java    |  48 +--
 .../source/atlasrest/AtlasRESTTagSource.java    | 129 +++++---
 .../tagsync/source/atlasrest/AtlasRESTUtil.java | 325 -------------------
 13 files changed, 512 insertions(+), 472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80de97e..cc09475 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@
         <asm.all.version>3.2</asm.all.version>
         <asm.version>3.1</asm.version>
         <aspectj.version>1.8.2</aspectj.version>
-        <atlas.version>0.7-incubating</atlas.version>
+        <atlas.version>0.8.2-SNAPSHOT</atlas.version>
         <atlas.guava.version>14.0</atlas.guava.version>
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
@@ -357,7 +357,12 @@
                 <groupId>com.webcohesion.enunciate</groupId>
                 <artifactId>enunciate-core-annotations</artifactId>
                 <version>2.8.0</version>
-                </dependency>
+            </dependency>
+            <dependency>
+                <groupId>com.sun.jersey.contribs</groupId>
+                <artifactId>jersey-multipart</artifactId>
+                <version>${jersey-bundle.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0b17151..26b42ca 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -40,12 +40,16 @@
                                        
<include>com.google.inject:guice:jar:${guice.version}</include>
                                        
<include>com.google.inject.extensions:guice-multibindings:jar:${guice.version}</include>
                                        
<include>com.sun.jersey:jersey-bundle:jar:${jersey-bundle.version}</include>
+                                       
<include>com.sun.jersey.contribs:jersey-multipart:jar:${sun-jersey-bundle.version}</include>
                                        
<include>com.thoughtworks.paranamer:paranamer:jar:${paranamer.version}</include>
                                        
<include>com.yammer.metrics:metrics-core</include>
                                        
<include>org.apache.atlas:atlas-notification:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-client:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
                                        
<include>org.apache.hadoop:hadoop-auth</include>
                                        
<include>org.apache.hadoop:hadoop-common</include>
                                        
<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>
@@ -55,10 +59,10 @@
                                        
<include>org.apache.ranger:ranger-plugins-common</include>
                                        
<include>org.apache.ranger:ranger-util</include>
                                        
<include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include>
-                                       
<include>org.codehaus.jackson:jackson-core-asl</include>
-                                       
<include>org.codehaus.jackson:jackson-jaxrs</include>
-                                       
<include>org.codehaus.jackson:jackson-mapper-asl</include>
-                                       
<include>org.codehaus.jackson:jackson-xc</include>
+                                       
<include>org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version}</include>
+                                       
<include>org.codehaus.jackson:jackson-jaxrs:jar:${codehaus.jackson.version}</include>
+                                       
<include>org.codehaus.jackson:jackson-mapper-asl:jar:${codehaus.jackson.version}</include>
+                                       
<include>org.codehaus.jackson:jackson-xc:jar:${codehaus.jackson.version}</include>
                                        
<include>org.codehaus.jettison:jettison:jar:${jettison.version}</include>
                                        
<include>org.json4s:json4s-native_${scala.binary.version}:jar:${json4s.version}</include>
                                        
<include>org.json4s:json4s-core_${scala.binary.version}:jar:${json4s.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index 42e9d2f..417a12f 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -55,6 +55,11 @@
             <version>${jersey-bundle.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.sun.jersey.contribs</groupId>
+            <artifactId>jersey-multipart</artifactId>
+            <version>${sun-jersey-bundle.version}</version>
+        </dependency>
+        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
             <version>${commons.cli.version}</version>
@@ -110,6 +115,26 @@
             <version>${jettison.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-jaxrs</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-xc</artifactId>
+            <version>${codehaus.jackson.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-notification</artifactId>
             <version>${atlas.version}</version>
@@ -121,7 +146,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-client</artifactId>
+            <artifactId>atlas-client-v1</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-common</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
             <version>${atlas.version}</version>
         </dependency>
         <dependency>
@@ -144,6 +179,11 @@
            </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-intg</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_${scala.binary.version}</artifactId>
             <version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
index 8b36a31..00615e4 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
@@ -48,7 +49,23 @@ public class AtlasHbaseResourceMapper extends 
AtlasResourceMapper {
 
        @Override
        public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
+               String entityGuid    = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String entityType    = entity.getTypeName();
                String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+               return getServiceResource(entityGuid, entityType, 
qualifiedName);
+       }
+
+       @Override
+       public RangerServiceResource buildResource(final AtlasEntityHeader 
entity) throws Exception {
+               String entityGuid    = entity.getGuid();
+               String entityType    = entity.getTypeName();
+               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+               return getServiceResource(entityGuid, entityType, 
qualifiedName);
+       }
+
+       private RangerServiceResource getServiceResource(String entityGuid, 
String entityType, String qualifiedName) throws Exception {
                if (StringUtils.isEmpty(qualifiedName)) {
                        throw new Exception("attribute '" +  
ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
                }
@@ -63,8 +80,6 @@ public class AtlasHbaseResourceMapper extends 
AtlasResourceMapper {
                        throwExceptionWithMessage("cluster-name not found in 
attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
                }
 
-               String entityType  = entity.getTypeName();
-               String entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
                String serviceName = getRangerServiceName(clusterName);
 
                Map<String, RangerPolicyResource> elements = new 
HashMap<String, RangerPolicyResource>();

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
index 06bff90..d970859 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
@@ -57,10 +58,25 @@ public class AtlasHdfsResourceMapper extends 
AtlasResourceMapper {
 
        @Override
        public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
+               String entityGuid    = entity.getId() != null ? 
entity.getId()._getId() : null;
                String path          = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_PATH, String.class);
                String clusterName   = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class);
                String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
 
+               return getServiceResource(entityGuid, path, clusterName, 
qualifiedName);
+       }
+
+       @Override
+       public RangerServiceResource buildResource(final AtlasEntityHeader 
entity) throws Exception {
+               String entityGuid    = entity.getGuid();
+               String path          = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_PATH, String.class);
+               String clusterName   = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class);
+               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+               return getServiceResource(entityGuid, path, clusterName, 
qualifiedName);
+       }
+
+       private RangerServiceResource getServiceResource(String entityGuid, 
String path, String clusterName, String qualifiedName) throws Exception {
                if(StringUtils.isEmpty(path)) {
                        path = getResourceNameFromQualifiedName(qualifiedName);
 
@@ -81,7 +97,6 @@ public class AtlasHdfsResourceMapper extends 
AtlasResourceMapper {
                        }
                }
 
-               String  entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
                String  serviceName = getRangerServiceName(clusterName);
                Boolean isExcludes  = Boolean.FALSE;
                Boolean isRecursive = Boolean.TRUE;

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
index a359622..84d1226 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
@@ -47,7 +48,23 @@ public class AtlasHiveResourceMapper extends 
AtlasResourceMapper {
 
        @Override
        public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
+               String entityGuid    = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String entityType    = entity.getTypeName();
                String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+               return getServiceResource(entityGuid, entityType, 
qualifiedName);
+       }
+
+       @Override
+       public RangerServiceResource buildResource(final AtlasEntityHeader 
entity) throws Exception {
+               String entityGuid    = entity.getGuid();
+               String entityType    = entity.getTypeName();
+               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+               return getServiceResource(entityGuid, entityType, 
qualifiedName);
+       }
+
+       private RangerServiceResource getServiceResource(String entityGuid, 
String entityType, String qualifiedName) throws Exception {
                if (StringUtils.isEmpty(qualifiedName)) {
                        throw new Exception("attribute '" +  
ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
                }
@@ -62,8 +79,6 @@ public class AtlasHiveResourceMapper extends 
AtlasResourceMapper {
                        throwExceptionWithMessage("cluster-name not found in 
attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
                }
 
-               String   entityType  = entity.getTypeName();
-               String   entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
                String   serviceName = getRangerServiceName(clusterName);
                String[] resources   = 
resourceStr.split(QUALIFIED_NAME_DELIMITER);
                String   dbName      = resources.length > 0 ? resources[0] : 
null;

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
index 9f1fc2d..0c0247f 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy;
@@ -42,8 +43,21 @@ public class AtlasKafkaResourceMapper extends 
AtlasResourceMapper {
 
        @Override
        public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
+               String entityGuid    = entity.getId() != null ? 
entity.getId()._getId() : null;
                String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
 
+               return getServiceResource(entityGuid, qualifiedName);
+       }
+
+       @Override
+       public RangerServiceResource buildResource(final AtlasEntityHeader 
entity) throws Exception {
+               String entityGuid    = entity.getGuid();
+               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+               return getServiceResource(entityGuid, qualifiedName);
+       }
+
+       private RangerServiceResource getServiceResource(String entityGuid, 
String qualifiedName) throws Exception {
                String topic = getResourceNameFromQualifiedName(qualifiedName);
 
                if(StringUtils.isEmpty(topic)) {
@@ -67,7 +81,6 @@ public class AtlasKafkaResourceMapper extends 
AtlasResourceMapper {
 
                elements.put(RANGER_TYPE_KAFKA_TOPIC, new 
RangerPolicyResource(topic, isExcludes, isRecursive));
 
-               String  entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
                String  serviceName = getRangerServiceName(clusterName);
 
                return new RangerServiceResource(entityGuid, serviceName, 
elements);

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 922317e..f42c908 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -20,7 +20,16 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.persistence.Id;
@@ -35,10 +44,9 @@ import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef;
 import org.apache.ranger.plugin.util.ServiceTags;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
 
 public class AtlasNotificationMapper {
        private static final Log LOG = 
LogFactory.getLog(AtlasNotificationMapper.class);
@@ -46,6 +54,17 @@ public class AtlasNotificationMapper {
 
        private static Map<String, Long> unhandledEventTypes = new 
HashMap<String, Long>();
 
+       private static final ThreadLocal<DateFormat> DATE_FORMATTER = new 
ThreadLocal<DateFormat>() {
+               @Override
+               protected DateFormat initialValue() {
+                       SimpleDateFormat dateFormat = new 
SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
+
+                       dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+                       return dateFormat;
+               }
+       };
+
        private static void logUnhandledEntityNotification(EntityNotification 
entityNotification) {
 
                final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS 
= 5 * 60 * 1000; // 5 minutes
@@ -134,6 +153,7 @@ public class AtlasNotificationMapper {
                                case ENTITY_UPDATE:
                                case ENTITY_DELETE:
                                case TRAIT_ADD:
+                               case TRAIT_UPDATE:
                                case TRAIT_DELETE: {
                                        ret = true;
                                        break;
@@ -175,7 +195,6 @@ public class AtlasNotificationMapper {
        }
 
        static private Map<String, ServiceTags> 
buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws 
Exception {
-
                Map<String, ServiceTags> ret = new HashMap<String, 
ServiceTags>();
 
                for (AtlasEntityWithTraits element : entitiesWithTraits) {
@@ -189,11 +208,163 @@ public class AtlasNotificationMapper {
                        }
                }
 
+               return ret;
+       }
+
+       static private ServiceTags buildServiceTags(AtlasEntityWithTraits 
entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
+               ServiceTags            ret             = null;
+               IReferenceableInstance entity          = 
entityWithTraits.getEntity();
+               RangerServiceResource  serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
+
+               if (serviceResource != null) {
+                       List<RangerTag>    tags        = 
getTags(entityWithTraits);
+                       List<RangerTagDef> tagDefs     = 
getTagDefs(entityWithTraits);
+                       String             serviceName = 
serviceResource.getServiceName();
+
+                       ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
+
+                       if (serviceTagsMap == null || 
CollectionUtils.isNotEmpty(tags)) {
+                               serviceResource.setId((long) 
ret.getServiceResources().size());
+                               ret.getServiceResources().add(serviceResource);
+
+                               List<Long> tagIds = new ArrayList<>();
+
+                               if (CollectionUtils.isNotEmpty(tags)) {
+                                       for (RangerTag tag : tags) {
+                                               tag.setId((long) 
ret.getTags().size());
+                                               ret.getTags().put(tag.getId(), 
tag);
+
+                                               tagIds.add(tag.getId());
+                                       }
+                               }
+                               
ret.getResourceToTagIds().put(serviceResource.getId(), tagIds);
+
+                               if (CollectionUtils.isNotEmpty(tagDefs)) {
+                                       for (RangerTagDef tagDef : tagDefs) {
+                                               tagDef.setId((long) 
ret.getTagDefinitions().size());
+                                               
ret.getTagDefinitions().put(tagDef.getId(), tagDef);
+                                       }
+                               }
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Entity " + entityWithTraits 
+ " does not have any tags associated with it when full-sync is being done.");
+                                       LOG.debug("Will not add this entity to 
serviceTags, so that this entity, if exists,  will be removed from ranger");
+                               }
+                       }
+               } else {
+                       LOG.error("Failed to build serviceResource for entity:" 
+ entity.getId()._getId());
+               }
+
+               return ret;
+       }
+
+       static private ServiceTags createOrGetServiceTags(Map<String, 
ServiceTags> serviceTagsMap, String serviceName) {
+               ServiceTags ret = serviceTagsMap == null ? null : 
serviceTagsMap.get(serviceName);
+
+               if (ret == null) {
+                       ret = new ServiceTags();
+
+                       if (serviceTagsMap != null) {
+                               serviceTagsMap.put(serviceName, ret);
+                       }
+
+                       ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
+                       ret.setServiceName(serviceName);
+               }
+
+               return ret;
+       }
+
+       static private List<RangerTag> getTags(AtlasEntityWithTraits 
entityWithTraits) {
+               List<RangerTag>        ret    = new ArrayList<RangerTag>();
+               IReferenceableInstance entity = entityWithTraits != null ? 
entityWithTraits.getEntity() : null;
+
+               if(entity != null && 
CollectionUtils.isNotEmpty(entity.getTraits())) {
+                       for (String traitName : entity.getTraits()) {
+                               IStruct             trait    = 
entity.getTrait(traitName);
+                               Map<String, String> tagAttrs = new 
HashMap<String, String>();
+
+                               try {
+                                       Map<String, Object> attrs = 
trait.getValuesMap();
+
+                                       if(MapUtils.isNotEmpty(attrs)) {
+                                               for (Map.Entry<String, Object> 
attrEntry : attrs.entrySet()) {
+                                                       String attrName  = 
attrEntry.getKey();
+                                                       Object attrValue = 
attrEntry.getValue();
+
+                                                       tagAttrs.put(attrName, 
attrValue != null ? attrValue.toString() : null);
+                                               }
+                                       }
+
+                               } catch (AtlasException exception) {
+                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
+                               }
+
+                               ret.add(new RangerTag(null, 
trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE));
+                       }
+               }
+
+               return ret;
+       }
+
+       static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits 
entityWithTraits) {
+               List<RangerTagDef>     ret    = new ArrayList<RangerTagDef>();
+               IReferenceableInstance entity = entityWithTraits != null ? 
entityWithTraits.getEntity() : null;
+
+               if(entity != null && 
CollectionUtils.isNotEmpty(entity.getTraits())) {
+                       for (String traitName : entity.getTraits()) {
+                               IStruct       trait = 
entity.getTrait(traitName);
+                               RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
+
+                               try {
+                                       Map<String, Object> attrs = 
trait.getValuesMap();
+
+                                       if(MapUtils.isNotEmpty(attrs)) {
+                                               for (String attrName : 
attrs.keySet()) {
+                                                       
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+                                               }
+                                       }
+                               } catch (AtlasException exception) {
+                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
+                               }
+
+                               ret.add(tagDef);
+                       }
+               }
+
+               return ret;
+       }
+
+       public static Map<String, ServiceTags> 
processSearchResult(AtlasSearchResult result, AtlasTypeRegistry typeRegistry) {
+               Map<String, ServiceTags> ret = null;
+
+               try {
+                       ret = buildServiceTags(result, typeRegistry);
+               } catch (Exception exception) {
+                       LOG.error("Failed to build serviceTags", exception);
+               }
+
+               return ret;
+       }
+
+       static private Map<String, ServiceTags> 
buildServiceTags(AtlasSearchResult result, AtlasTypeRegistry typeRegistry) 
throws Exception {
+               Map<String, ServiceTags> ret = new HashMap<>();
+
+               for (AtlasEntityHeader entity : result.getEntities()) {
+                       if (entity != null && entity.getStatus() == 
AtlasEntity.Status.ACTIVE) {
+                               buildServiceTags(entity, typeRegistry, ret);
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Ignoring entity because its 
State is not ACTIVE: " + entity);
+                               }
+                       }
+               }
+
                // Remove duplicate tag definitions
                if(CollectionUtils.isNotEmpty(ret.values())) {
                        for (ServiceTags serviceTag : ret.values()) {
                                
if(MapUtils.isNotEmpty(serviceTag.getTagDefinitions())) {
-                                       Map<String, RangerTagDef> uniqueTagDefs 
= new HashMap<String, RangerTagDef>();
+                                       Map<String, RangerTagDef> uniqueTagDefs 
= new HashMap<>();
 
                                        for (RangerTagDef tagDef : 
serviceTag.getTagDefinitions().values()) {
                                                RangerTagDef existingTagDef = 
uniqueTagDefs.get(tagDef.getName());
@@ -239,25 +410,22 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private ServiceTags buildServiceTags(AtlasEntityWithTraits 
entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
-               ServiceTags            ret             = null;
-               IReferenceableInstance entity          = 
entityWithTraits.getEntity();
-               RangerServiceResource  serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
+       static private ServiceTags buildServiceTags(AtlasEntityHeader entity, 
AtlasTypeRegistry typeRegistry, Map<String, ServiceTags> serviceTagsMap) throws 
Exception {
+               ServiceTags           ret             = null;
+               RangerServiceResource serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
 
                if (serviceResource != null) {
-
-                       List<RangerTag>     tags        = 
getTags(entityWithTraits);
-                       List<RangerTagDef>  tagDefs     = 
getTagDefs(entityWithTraits);
+                       List<RangerTag>     tags        = getTags(entity, 
typeRegistry);
+                       List<RangerTagDef>  tagDefs     = getTagDefs(entity);
                        String              serviceName = 
serviceResource.getServiceName();
 
                        ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
 
                        if (serviceTagsMap == null || 
CollectionUtils.isNotEmpty(tags)) {
-
                                serviceResource.setId((long) 
ret.getServiceResources().size());
                                ret.getServiceResources().add(serviceResource);
 
-                               List<Long> tagIds = new ArrayList<Long>();
+                               List<Long> tagIds = new ArrayList<>();
 
                                if (CollectionUtils.isNotEmpty(tags)) {
                                        for (RangerTag tag : tags) {
@@ -277,90 +445,125 @@ public class AtlasNotificationMapper {
                                }
                        } else {
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Entity " + entityWithTraits 
+ " does not have any tags associated with it when full-sync is being done.");
+                                       LOG.debug("Entity " + entity + " does 
not have any tags associated with it when full-sync is being done.");
                                        LOG.debug("Will not add this entity to 
serviceTags, so that this entity, if exists,  will be removed from ranger");
                                }
                        }
                } else {
-                       LOG.error("Failed to build serviceResource for entity:" 
+ entity.getId()._getId());
+                       LOG.error("Failed to build serviceResource for entity:" 
+ entity.getGuid());
                }
 
                return ret;
        }
 
-       static private ServiceTags createOrGetServiceTags(Map<String, 
ServiceTags> serviceTagsMap, String serviceName) {
-               ServiceTags ret = serviceTagsMap == null ? null : 
serviceTagsMap.get(serviceName);
+       static private List<RangerTag> getTags(AtlasEntityHeader entity, 
AtlasTypeRegistry typeRegistry) {
+               List<RangerTag> ret = new ArrayList<>();
 
-               if (ret == null) {
-                       ret = new ServiceTags();
+               if(entity != null && 
CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
+                       List<AtlasClassification> classifications = 
entity.getClassifications();
 
-                       if (serviceTagsMap != null) {
-                               serviceTagsMap.put(serviceName, ret);
+                       for (AtlasClassification classification : 
classifications) {
+                               ret.add(getRangerTag(classification, 
typeRegistry));
+
+                               List<AtlasClassification> superClassifications 
= getSuperClassifications(classification, typeRegistry);
+
+                               if 
(CollectionUtils.isNotEmpty(superClassifications)) {
+                                       for (AtlasClassification 
superClassification : superClassifications) {
+                                               
ret.add(getRangerTag(superClassification, typeRegistry));
+                                       }
+                               }
                        }
+               }
 
-                       ret.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-                       ret.setServiceName(serviceName);
+               return ret;
+       }
+
+       static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) {
+               List<RangerTagDef> ret = new ArrayList<>();
+
+               if(entity != null && 
CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
+                       List<AtlasClassification> traits = 
entity.getClassifications();
+
+                       for (AtlasClassification trait : traits) {
+                               RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
+
+                               if(MapUtils.isNotEmpty(trait.getAttributes())) {
+                                       for (String attrName : 
trait.getAttributes().keySet()) {
+                                               
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+                                       }
+                               }
+
+                               ret.add(tagDef);
+                       }
                }
 
                return ret;
        }
 
-       static private List<RangerTag> getTags(AtlasEntityWithTraits 
entityWithTraits) {
-               List<RangerTag> ret = new ArrayList<RangerTag>();
+       static private List<AtlasClassification> 
getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry 
typeRegistry) {
+               List<AtlasClassification> ret                = null;
+               AtlasClassificationType   classificationType = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
 
-               if(entityWithTraits != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
-                       List<IStruct> traits = entityWithTraits.getAllTraits();
+               if (classificationType != null && 
CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) {
+                       ret = new 
ArrayList<>(classificationType.getAllSuperTypes().size());
 
-                       for (IStruct trait : traits) {
-                               Map<String, String> tagAttrs = new 
HashMap<String, String>();
+                       for (String superTypeName : 
classificationType.getAllSuperTypes()) {
+                               AtlasClassification superClassification = new 
AtlasClassification(superTypeName);
 
-                               try {
-                                       Map<String, Object> attrs = 
trait.getValuesMap();
+                               if 
(MapUtils.isNotEmpty(classification.getAttributes())) {
+                                       AtlasClassificationType 
superClassificationType = 
typeRegistry.getClassificationTypeByName(superTypeName);
 
-                                       if(MapUtils.isNotEmpty(attrs)) {
-                                               for (Map.Entry<String, Object> 
attrEntry : attrs.entrySet()) {
-                                                       String attrName  = 
attrEntry.getKey();
-                                                       Object attrValue = 
attrEntry.getValue();
+                                       if (superClassificationType != null && 
MapUtils.isNotEmpty(superClassificationType.getAllAttributes())) {
+                                               Map<String, Object> 
superClassificationAttributes = new HashMap<>();
 
-                                                       tagAttrs.put(attrName, 
attrValue != null ? attrValue.toString() : null);
+                                               for (Map.Entry<String, Object> 
entry : classification.getAttributes().entrySet()) {
+                                                       String attrName = 
entry.getKey();
+
+                                                       if 
(superClassificationType.getAllAttributes().containsKey(attrName)) {
+                                                               
superClassificationAttributes.put(attrName, entry.getValue());
+                                                       }
                                                }
+
+                                               
superClassification.setAttributes(superClassificationAttributes);
                                        }
-                               } catch (AtlasException exception) {
-                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
                                }
 
-                               ret.add(new RangerTag(null, 
trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE));
+                               ret.add(superClassification);
                        }
                }
 
                return ret;
        }
 
-       static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits 
entityWithTraits) {
-               List<RangerTagDef> ret = new ArrayList<RangerTagDef>();
+       static private RangerTag getRangerTag(AtlasClassification 
classification, AtlasTypeRegistry typeRegistry) {
+               final Map<String, String> tagAttrs;
 
-               if(entityWithTraits != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
-                       List<IStruct> traits = entityWithTraits.getAllTraits();
+               if(MapUtils.isNotEmpty(classification.getAttributes())) {
+                       tagAttrs = new HashMap<>();
 
-                       for (IStruct trait : traits) {
-                               RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
+                       for (Map.Entry<String, Object> attrEntry : 
classification.getAttributes().entrySet()) {
+                               String attrName  = attrEntry.getKey();
+                               Object attrValue = attrEntry.getValue();
 
-                               try {
-                                       Map<String, Object> attrs = 
trait.getValuesMap();
+                               // V2 Atlas APIs have date attributes as 
number; convert the value to earlier version format, so that
+                               // Ranger conditions can recognize the value 
correctly
+                               if (attrValue instanceof Number) {
+                                       AtlasClassificationType 
classificationType = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                                       AtlasAttribute          attribute       
   = (classificationType != null) ? classificationType.getAttribute(attrName) : 
null;
 
-                                       if(MapUtils.isNotEmpty(attrs)) {
-                                               for (String attrName : 
attrs.keySet()) {
-                                                       
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
-                                               }
+                                       if (attribute != null && 
attribute.getAttributeType() instanceof AtlasBuiltInTypes.AtlasDateType) {
+                                               Date dateValue = new 
Date(((Number)attrValue).longValue());
+
+                                               attrValue = 
DATE_FORMATTER.get().format(dateValue);
                                        }
-                               } catch (AtlasException exception) {
-                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
                                }
 
-                               ret.add(tagDef);
+                               tagAttrs.put(attrName, attrValue != null ? 
attrValue.toString() : null);
                        }
+               } else {
+                       tagAttrs = Collections.emptyMap();
                }
 
-               return ret;
+               return new RangerTag(null, classification.getTypeName(), 
tagAttrs, RangerTag.OWNER_SERVICERESOURCE);
        }
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
index 8ececdf..a2ad796 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import java.util.Map;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -75,6 +76,8 @@ public abstract class AtlasResourceMapper {
 
        abstract public RangerServiceResource buildResource(final 
IReferenceableInstance entity) throws Exception;
 
+       abstract public RangerServiceResource buildResource(final 
AtlasEntityHeader entity) throws Exception;
+
        protected String getCustomRangerServiceName(String atlasInstanceName) {
                if(properties != null) {
                        String propName = 
TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName
@@ -132,6 +135,10 @@ public abstract class AtlasResourceMapper {
                return ret;
        }
 
+       static protected <T> T getEntityAttribute(AtlasEntityHeader entity, 
String name, Class<T> type) {
+               return getAttribute(entity.getAttributes(), name, type);
+       }
+
        static protected <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
                return type.cast(map.get(name));
        }

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
index f9f0eaf..d004bff 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerServiceResource;
@@ -74,6 +75,30 @@ public class AtlasResourceMapperUtil {
                return resource;
        }
 
+       public static RangerServiceResource 
getRangerServiceResource(AtlasEntityHeader atlasEntity) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getRangerServiceResource(" + 
atlasEntity.getGuid() +")");
+               }
+
+               RangerServiceResource resource = null;
+
+               AtlasResourceMapper mapper = 
atlasResourceMappers.get(atlasEntity.getTypeName());
+
+               if (mapper != null) {
+                       try {
+                               resource = mapper.buildResource(atlasEntity);
+                       } catch (Exception exception) {
+                               LOG.error("Could not get serviceResource for 
atlas entity:" + atlasEntity.getGuid() + ": ", exception);
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== getRangerServiceResource(" + 
atlasEntity.getGuid() +"): resource=" + resource);
+               }
+
+               return resource;
+       }
+
        static public boolean initializeAtlasResourceMappers(Properties 
properties) {
                final String MAPPER_NAME_DELIMITER = ",";
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 12b02d9..95ff8ec 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -20,26 +20,22 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Provider;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.entity.EntityNotification;
-
-import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.kafka.common.TopicPartition;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Properties;
 import java.util.List;
+import java.util.Properties;
 
 public class AtlasTagSource extends AbstractTagSource {
        private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
@@ -102,12 +98,7 @@ public class AtlasTagSource extends AbstractTagSource {
                }
 
                if (ret) {
-                       NotificationModule notificationModule = new 
NotificationModule();
-
-                       Injector injector = 
Guice.createInjector(notificationModule);
-
-                       Provider<NotificationInterface> consumerProvider = 
injector.getProvider(NotificationInterface.class);
-                       NotificationInterface notification = 
consumerProvider.get();
+                       NotificationInterface notification = 
NotificationProvider.get();
                        List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
 
                        consumerTask = new ConsumerRunnable(iterators.get(0));
@@ -163,15 +154,6 @@ public class AtlasTagSource extends AbstractTagSource {
                        this.consumer = consumer;
                }
 
-               private boolean hasNext() {
-                       boolean ret = false;
-                       try {
-                               ret = consumer.hasNext();
-                       } catch (Exception exception) {
-                               LOG.error("EntityNotification consumer threw 
exception, IGNORING...:", exception);
-                       }
-                       return ret;
-               }
 
                @Override
                public void run() {
@@ -180,8 +162,11 @@ public class AtlasTagSource extends AbstractTagSource {
                        }
                        while (true) {
                                try {
-                                       if (hasNext()) {
-                                               EntityNotification notification 
= consumer.peek();
+                                       
List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
+
+                                       for 
(AtlasKafkaMessage<EntityNotification> message :  messages) {
+                                               EntityNotification notification 
= message != null ? message.getMessage() : null;
+
                                                if (notification != null) {
                                                        if 
(LOG.isDebugEnabled()) {
                                                                
LOG.debug("Notification=" + getPrintableEntityNotification(notification));
@@ -191,11 +176,12 @@ public class AtlasTagSource extends AbstractTagSource {
                                                        if (serviceTags != 
null) {
                                                                
updateSink(serviceTags);
                                                        }
+
+                                                       TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+                                                       
consumer.commit(partition, message.getOffset());
                                                } else {
                                                        LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
                                                }
-                                               // Move iterator forward
-                                               consumer.next();
                                        }
                                } catch (Exception exception) {
                                        LOG.error("Caught exception..: ", 
exception);

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 4e0ae90..239f143 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -23,48 +23,52 @@ import com.google.gson.Gson;
 
 import com.google.gson.GsonBuilder;
 
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.tagsync.model.AbstractTagSource;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 import org.apache.ranger.tagsync.process.TagSynchronizer;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
 import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
 import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
 
-import java.util.List;
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
 public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
        private static final Log LOG = 
LogFactory.getLog(AtlasRESTTagSource.class);
 
-       private long sleepTimeBetweenCycleInMillis;
-
-       private AtlasRESTUtil atlasRESTUtil = null;
-
-       private Thread myThread = null;
+       private long     sleepTimeBetweenCycleInMillis;
+       private String[] restUrls         = null;
+       private boolean  isKerberized     = false;
+       private String[] userNamePassword = null;
+       private Thread   myThread         = null;
 
        public static void main(String[] args) {
-
-               AtlasRESTTagSource atlasRESTTagSource = new 
AtlasRESTTagSource();
-
-               TagSyncConfig config = TagSyncConfig.getInstance();
-
-               Properties props = config.getProperties();
+               TagSyncConfig config  = TagSyncConfig.getInstance();
+               Properties    props   = config.getProperties();
+               TagSink       tagSink = 
TagSynchronizer.initializeTagSink(props);
 
                TagSynchronizer.printConfigurationProperties(props);
 
-               TagSink tagSink = TagSynchronizer.initializeTagSink(props);
-
                if (tagSink != null) {
+                       AtlasRESTTagSource atlasRESTTagSource = new 
AtlasRESTTagSource();
 
                        if (atlasRESTTagSource.initialize(props)) {
                                try {
@@ -79,46 +83,45 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
                                LOG.error("AtlasRESTTagSource initialized 
failed, exiting.");
                                System.exit(1);
                        }
-
                } else {
                        LOG.error("TagSink initialialization failed, exiting.");
                        System.exit(1);
                }
-
        }
+
        @Override
        public boolean initialize(Properties properties) {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> AtlasRESTTagSource.initialize()");
                }
 
+               sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
+
                boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
 
-               sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
-               final boolean isKerberized = 
TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
+               String  sslConfigFile = 
TagSyncConfig.getAtlasRESTSslConfigFile(properties);
+
+               this.isKerberized     = 
TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
+               this.userNamePassword = new String[] { 
TagSyncConfig.getAtlasRESTUserName(properties), 
TagSyncConfig.getAtlasRESTPassword(properties) };
 
-               String restUrl       = 
TagSyncConfig.getAtlasRESTEndpoint(properties);
-               String sslConfigFile = 
TagSyncConfig.getAtlasRESTSslConfigFile(properties);
-               String userName = 
TagSyncConfig.getAtlasRESTUserName(properties);
-               String password = 
TagSyncConfig.getAtlasRESTPassword(properties);
+               String restEndpoint = 
TagSyncConfig.getAtlasRESTEndpoint(properties);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("restUrl=" + restUrl);
+                       LOG.debug("restEndpoint=" + restEndpoint);
                        LOG.debug("sslConfigFile=" + sslConfigFile);
-                       LOG.debug("userName=" + userName);
+                       LOG.debug("userName=" + userNamePassword[0]);
                        LOG.debug("kerberized=" + isKerberized);
                }
 
-               if (StringUtils.isNotEmpty(restUrl)) {
-                       if (!restUrl.endsWith("/")) {
-                               restUrl += "/";
-                       }
-                       RangerRESTClient atlasRESTClient = new 
RangerRESTClient(restUrl, sslConfigFile);
+               if (StringUtils.isNotEmpty(restEndpoint)) {
+                       this.restUrls = restEndpoint.split(",");
 
-                       if (!isKerberized) {
-                               atlasRESTClient.setBasicAuthInfo(userName, 
password);
+                       for (int i = 0; i < restUrls.length; i++) {
+                               if (!restUrls[i].endsWith("/")) {
+                                       restUrls[i] += "/";
+                               }
                        }
-                       atlasRESTUtil = new AtlasRESTUtil(atlasRESTClient, 
isKerberized);
+
                } else {
                        LOG.info("AtlasEndpoint not specified, Initial download 
of Atlas-entities cannot be done.");
                        ret = false;
@@ -133,7 +136,6 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
        @Override
        public boolean start() {
-
                myThread = new Thread(this);
                myThread.setDaemon(true);
                myThread.start();
@@ -150,21 +152,17 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
        @Override
        public void run() {
-
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> AtlasRESTTagSource.run()");
                }
 
                while (true) {
-
                        synchUp();
 
                        LOG.debug("Sleeping for [" + 
sleepTimeBetweenCycleInMillis + "] milliSeconds");
 
                        try {
-
                                Thread.sleep(sleepTimeBetweenCycleInMillis);
-
                        } catch (InterruptedException exception) {
                                LOG.error("Interrupted..: ", exception);
                                return;
@@ -173,17 +171,40 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
        }
 
        public void synchUp() {
+               SearchParameters           searchParams = new 
SearchParameters();
+               AtlasTypeRegistry          typeRegistry = new 
AtlasTypeRegistry();
+               AtlasTransientTypeRegistry tty          = null;
+               AtlasSearchResult          searchResult = null;
+
+               searchParams.setClassification("*");
+               searchParams.setIncludeClassificationAttributes(true);
+               searchParams.setOffset(0);
+               searchParams.setLimit(Integer.MAX_VALUE);
+
+               try {
+                       AtlasClientV2 atlasClient = getAtlasClient();
+
+                       searchResult = atlasClient.facetedSearch(searchParams);
 
-               List<AtlasEntityWithTraits> atlasEntities = 
atlasRESTUtil.getAtlasEntities();
+                       AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new 
SearchFilter());
 
-               if (CollectionUtils.isNotEmpty(atlasEntities)) {
+                       tty = typeRegistry.lockTypeRegistryForUpdate();
+
+                       tty.addTypes(typesDef);
+               } catch (AtlasServiceException | AtlasBaseException | 
IOException excp) {
+                       LOG.error("failed to download tags from Atlas", excp);
+               } finally {
+                       if (tty != null) {
+                               typeRegistry.releaseTypeRegistryForUpdate(tty, 
true);
+                       }
+               }
+
+               if (searchResult != null) {
                        if (LOG.isDebugEnabled()) {
-                               for (AtlasEntityWithTraits element : 
atlasEntities) {
-                                       LOG.debug(element);
-                               }
+                               LOG.debug(AtlasType.toJson(searchResult));
                        }
 
-                       Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processAtlasEntities(atlasEntities);
+                       Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processSearchResult(searchResult, typeRegistry);
 
                        if (MapUtils.isNotEmpty(serviceTagsMap)) {
                                for (Map.Entry<String, ServiceTags> entry : 
serviceTagsMap.entrySet()) {
@@ -195,6 +216,7 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
                                                LOG.debug("serviceTags=" + 
serviceTagsString);
                                        }
+
                                        updateSink(entry.getValue());
                                }
                        }
@@ -202,5 +224,20 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
        }
 
+       private AtlasClientV2 getAtlasClient() throws IOException {
+               final AtlasClientV2 ret;
+
+               if (isKerberized) {
+                       UserGroupInformation ugi = 
UserGroupInformation.getLoginUser();
+
+                       ugi.checkTGTAndReloginFromKeytab();
+
+                       ret = new AtlasClientV2(ugi, ugi.getShortUserName(), 
restUrls);
+               } else {
+                       ret = new AtlasClientV2(restUrls, userNamePassword);
+               }
+
+               return ret;
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
deleted file mode 100644
index 00a101e..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlasrest;
-
-import com.google.gson.Gson;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
-import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("unchecked")
-public class AtlasRESTUtil {
-       private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class);
-
-       private static final String REST_MIME_TYPE_JSON = "application/json";
-       private static final String API_ATLAS_TYPES    = "api/atlas/types";
-       private static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
-       private static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
-       private static final String API_ATLAS_TYPE     = "api/atlas/types/";
-
-       private static final String RESULTS_ATTRIBUTE               = "results";
-       private static final String DEFINITION_ATTRIBUTE            = 
"definition";
-       private static final String VALUES_ATTRIBUTE                = "values";
-       private static final String TRAITS_ATTRIBUTE                = "traits";
-       private static final String TYPE_NAME_ATTRIBUTE             = 
"typeName";
-       private static final String TRAIT_TYPES_ATTRIBUTE           = 
"traitTypes";
-       private static final String SUPER_TYPES_ATTRIBUTE           = 
"superTypes";
-       private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
-       private static final String NAME_ATTRIBUTE                  = "name";
-
-       private final Gson gson = new Gson();
-
-       private final RangerRESTClient atlasRESTClient;
-
-       private final boolean isKerberized;
-
-       public AtlasRESTUtil(RangerRESTClient atlasRESTClient, boolean 
isKerberized) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> AtlasRESTUtil()");
-               }
-
-               this.atlasRESTClient = atlasRESTClient;
-
-               this.isKerberized = isKerberized;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== AtlasRESTUtil()");
-               }
-       }
-
-       public List<AtlasEntityWithTraits> getAtlasEntities() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getAtlasEntities()");
-               }
-
-               List<AtlasEntityWithTraits> ret = new 
ArrayList<AtlasEntityWithTraits>();
-
-               Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
-               List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
-
-               if (CollectionUtils.isNotEmpty(types)) {
-
-                       for (String type : types) {
-
-                               if 
(!AtlasResourceMapperUtil.isEntityTypeHandled(type)) {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Not fetching Atlas 
entities of type: " + type);
-                                       }
-                                       continue;
-                               }
-
-                               Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
-
-                               List<String> guids = 
getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-                               if (CollectionUtils.isEmpty(guids)) {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("No Atlas entities 
for type: " + type);
-                                       }
-                                       continue;
-                               }
-
-                               for (String guid : guids) {
-
-                                       Map<String, Object> entityResponse = 
atlasAPI(API_ATLAS_ENTITY + guid);
-
-                                       Map<String, Object> definition = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
-
-                                       Map<String, Object> traitsAttribute = 
getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-
-                                       List<IStruct> allTraits = new 
LinkedList<>();
-
-                                       if 
(MapUtils.isNotEmpty(traitsAttribute)) {
-
-                                               for (Map.Entry<String, Object> 
entry : traitsAttribute.entrySet()) {
-
-                                                       Map<String, Object> 
trait = (Map<String, Object>) entry.getValue();
-
-                                                       Map<String, Object> 
traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-                                                       String traitTypeName = 
getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                       if 
(StringUtils.isEmpty(traitTypeName)) {
-                                                               continue;
-                                                       }
-
-                                                       List<IStruct> 
superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-                                                       Struct trait1 = new 
Struct(traitTypeName, traitValues);
-
-                                                       allTraits.add(trait1);
-                                                       
allTraits.addAll(superTypes);
-                                               }
-                                       }
-
-                                       IReferenceableInstance entity = 
InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
-
-                                       if (entity != null) {
-                                               AtlasEntityWithTraits 
atlasEntity = new AtlasEntityWithTraits(entity, allTraits);
-                                               ret.add(atlasEntity);
-                                       } else {
-                                               if (LOG.isInfoEnabled()) {
-                                                       LOG.info("Could not 
create Atlas entity from its definition, type=" + type + ", guid=" + guid);
-                                               }
-                                       }
-
-                               }
-
-                       }
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("<== getAtlasEntities()");
-                       }
-               }
-
-               return ret;
-       }
-
-       private Map<String, Object> getTraitType(String traitName) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getTraitType(" + traitName + ")");
-               }
-               Map<String, Object> ret = null;
-
-               Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + 
traitName);
-
-               Map<String, Object> definition = getAttribute(typeResponse, 
DEFINITION_ATTRIBUTE, Map.class);
-
-               List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
-
-               if (CollectionUtils.isNotEmpty(traitTypes)) {
-                       ret = (Map<String, Object>) traitTypes.get(0);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getTraitType(" + traitName + ")");
-               }
-               return ret;
-       }
-
-       private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, 
Map<String, Object> values) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getTraitSuperTypes()");
-               }
-               List<IStruct> ret = new LinkedList<>();
-
-               if (traitType != null) {
-
-                       List<String> superTypeNames = getAttribute(traitType, 
SUPER_TYPES_ATTRIBUTE, List.class);
-
-                       if (CollectionUtils.isNotEmpty(superTypeNames)) {
-                               for (String superTypeName : superTypeNames) {
-
-                                       Map<String, Object> superTraitType = 
getTraitType(superTypeName);
-
-                                       if (superTraitType != null) {
-                                               List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-                                               Map<String, Object> 
superTypeValues = new HashMap<>();
-                                               for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
-
-                                                       String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
-                                                       if 
(values.containsKey(attributeName)) {
-                                                               
superTypeValues.put(attributeName, values.get(attributeName));
-                                                       }
-                                               }
-
-                                               List<IStruct> superTraits = 
getTraitSuperTypes(getTraitType(superTypeName), values);
-
-                                               Struct superTrait = new 
Struct(superTypeName, superTypeValues);
-
-                                               ret.add(superTrait);
-                                               ret.addAll(superTraits);
-                                       }
-                               }
-                       }
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getTraitSuperTypes()");
-               }
-               return ret;
-       }
-
-       private Map<String, Object> atlasAPI(final String endpoint) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> atlasAPI(" + endpoint + ")");
-               }
-               Map<String, Object> ret = new HashMap<String, Object>();
-
-               try {
-                       UserGroupInformation userGroupInformation = null;
-                       if (isKerberized) {
-                               userGroupInformation = 
UserGroupInformation.getLoginUser();
-
-                               try {
-                                       
userGroupInformation.checkTGTAndReloginFromKeytab();
-                               } catch (IOException ioe) {
-                                       LOG.error("Error renewing TGT and 
relogin", ioe);
-                                       userGroupInformation = null;
-                               }
-                       }
-                       if (userGroupInformation != null) {
-                               LOG.debug("Using kerberos authentication");
-                               if(LOG.isDebugEnabled()) {
-                                       LOG.debug("Using Principal = "+ 
userGroupInformation.getUserName());
-                               }
-                               ret = userGroupInformation.doAs(new 
PrivilegedAction<Map<String, Object>>() {
-                                       @Override
-                                       public Map<String, Object> run() {
-                                               try{
-                                                       return 
executeAtlasAPI(endpoint);
-                                               }catch (Exception e) {
-                                                       LOG.error("Atlas API 
failed with message : ", e);
-                                               }
-                                               return null;
-                                       }
-                               });
-                       } else {
-                               LOG.debug("Using basic authentication");
-                               ret = executeAtlasAPI(endpoint);
-                       }
-               } catch (Exception exception) {
-                       LOG.error("Exception when fetching Atlas objects.", 
exception);
-                       ret = null;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== atlasAPI(" + endpoint + ")");
-               }
-               return ret;
-       }
-
-       private Map<String, Object> executeAtlasAPI(final String endpoint) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> executeAtlasAPI(" + endpoint + ")");
-               }
-
-               Map<String, Object> ret = new HashMap<String, Object>();
-
-               try {
-                       final WebResource webResource = 
atlasRESTClient.getResource(endpoint);
-
-                       ClientResponse response = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-                       if (response != null && response.getStatus() == 200) {
-                               ret = response.getEntity(ret.getClass());
-                       } else {
-                               RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-                               LOG.error("Error getting atlas data request=" + 
webResource.toString()
-                                               + ", response=" + 
resp.toString());
-                       }
-               } catch (Exception exception) {
-                       LOG.error("Exception when fetching Atlas objects.", 
exception);
-                       ret = null;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== executeAtlasAPI(" + endpoint + ")");
-               }
-
-               return ret;
-       }
-
-       private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
-               return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : 
null;
-       }
-
-}

Reply via email to