Repository: ranger
Updated Branches:
  refs/heads/ranger-0.7 7e3963bc7 -> 126ff6ee0


RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated 
entity does not have associated traits


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/126ff6ee
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/126ff6ee
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/126ff6ee

Branch: refs/heads/ranger-0.7
Commit: 126ff6ee04e580dcf8b924f76df0e3917221106e
Parents: 7e3963b
Author: Abhay Kulkarni <akulka...@hortonworks.com>
Authored: Thu May 17 08:51:40 2018 -0700
Committer: Abhay Kulkarni <akulka...@hortonworks.com>
Committed: Thu May 17 08:51:40 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |  1 +
 src/main/assembly/tagsync.xml                   |  1 +
 .../source/atlas/AtlasNotificationMapper.java   | 53 +++++++++++++------
 .../tagsync/source/atlas/AtlasTagSource.java    | 54 +++++++++++++++++---
 4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b76e4e3..2ec6768 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
         <atlas.guava.version>14.0</atlas.guava.version>
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
+        <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version>
         <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
         <avatica.version>1.7.1</avatica.version>
         <bouncycastle.version>1.55</bouncycastle.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 26b42ca..5139937 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -50,6 +50,7 @@
                                        
<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
+                                       
<include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include>
                                        
<include>org.apache.hadoop:hadoop-auth</include>
                                        
<include>org.apache.hadoop:hadoop-common</include>
                                        
<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 8641d60..1c7f063 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -51,7 +51,6 @@ import java.util.*;
 public class AtlasNotificationMapper {
        private static final Log LOG = 
LogFactory.getLog(AtlasNotificationMapper.class);
 
-
        private static Map<String, Long> unhandledEventTypes = new 
HashMap<String, Long>();
 
        private static final ThreadLocal<DateFormat> DATE_FORMATTER = new 
ThreadLocal<DateFormat>() {
@@ -140,8 +139,20 @@ public class AtlasNotificationMapper {
                        switch (opType) {
                                case ENTITY_CREATE:
                                        ret = 
CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+                                       if (!ret) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits 
associated with the entity. Ranger will get necessary information from any 
subsequent TRAIT_ADDED notification");
+                                               }
+                                       }
                                        break;
                                case ENTITY_UPDATE:
+                                       ret = 
CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+                                       if (!ret) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits 
associated with the entity.");
+                                               }
+                                       }
+                                       break;
                                case ENTITY_DELETE:
                                case TRAIT_ADD:
                                case TRAIT_UPDATE:
@@ -278,9 +289,8 @@ public class AtlasNotificationMapper {
                List<RangerTag>        ret    = new ArrayList<RangerTag>();
                IReferenceableInstance entity = entityWithTraits != null ? 
entityWithTraits.getEntity() : null;
 
-               if(entity != null && 
CollectionUtils.isNotEmpty(entity.getTraits())) {
-                       for (String traitName : entity.getTraits()) {
-                               IStruct             trait    = 
entity.getTrait(traitName);
+               if(entity != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+                       for (IStruct trait : entityWithTraits.getAllTraits()) {
                                Map<String, String> tagAttrs = new 
HashMap<String, String>();
 
                                try {
@@ -310,9 +320,8 @@ public class AtlasNotificationMapper {
                List<RangerTagDef>     ret    = new ArrayList<RangerTagDef>();
                IReferenceableInstance entity = entityWithTraits != null ? 
entityWithTraits.getEntity() : null;
 
-               if(entity != null && 
CollectionUtils.isNotEmpty(entity.getTraits())) {
-                       for (String traitName : entity.getTraits()) {
-                               IStruct       trait = 
entity.getTrait(traitName);
+               if(entity != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+                       for (IStruct trait : entityWithTraits.getAllTraits()) {
                                RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
 
                                try {
@@ -415,7 +424,7 @@ public class AtlasNotificationMapper {
 
                if (serviceResource != null) {
                        List<RangerTag>     tags        = getTags(entity, 
typeRegistry);
-                       List<RangerTagDef>  tagDefs     = getTagDefs(entity);
+                       List<RangerTagDef>  tagDefs     = getTagDefs(entity, 
typeRegistry);
                        String              serviceName = 
serviceResource.getServiceName();
 
                        ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
@@ -477,28 +486,38 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) {
+       static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity, 
AtlasTypeRegistry typeRegistry) {
                List<RangerTagDef> ret = new ArrayList<>();
 
                if(entity != null && 
CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
-                       List<AtlasClassification> traits = 
entity.getClassifications();
+                       List<AtlasClassification> classifications = 
entity.getClassifications();
 
-                       for (AtlasClassification trait : traits) {
-                               RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
+                       for (AtlasClassification classification : 
classifications) {
+                               ret.add(getTagDef(classification));
 
-                               if(MapUtils.isNotEmpty(trait.getAttributes())) {
-                                       for (String attrName : 
trait.getAttributes().keySet()) {
-                                               
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+                               List<AtlasClassification> superClassifications 
= getSuperClassifications(classification, typeRegistry);
+
+                               if 
(CollectionUtils.isNotEmpty(superClassifications)) {
+                                       for (AtlasClassification 
superClassification : superClassifications) {
+                                               
ret.add(getTagDef(superClassification));
                                        }
                                }
-
-                               ret.add(tagDef);
                        }
                }
 
                return ret;
        }
 
+       static private RangerTagDef getTagDef(AtlasClassification 
classification) {
+               RangerTagDef tagDef = new 
RangerTagDef(classification.getTypeName(), "Atlas");
+               if(MapUtils.isNotEmpty(classification.getAttributes())) {
+                       for (String attrName : 
classification.getAttributes().keySet()) {
+                               tagDef.getAttributeDefs().add(new 
RangerTagAttributeDef(attrName, "string"));
+                       }
+               }
+               return tagDef;
+       }
+
        static private List<AtlasClassification> 
getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry 
typeRegistry) {
                List<AtlasClassification> ret                = null;
                AtlasClassificationType   classificationType = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());

http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 95ff8ec..3810442 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -160,32 +160,72 @@ public class AtlasTagSource extends AbstractTagSource {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("==> ConsumerRunnable.run()");
                        }
+                       boolean seenCommitException = false;
+                       long offsetOfLastMessageDeliveredToRanger = -1L;
+
                        while (true) {
                                try {
                                        
List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
+                                       int index = 0;
+
+                                       if (seenCommitException) {
+                                               for (; index < messages.size(); 
index++) {
+                                                       
AtlasKafkaMessage<EntityNotification> message = messages.get(index);
+                                                       if (message.getOffset() 
<= offsetOfLastMessageDeliveredToRanger) {
+                                                               // Already 
delivered to Ranger
+                                                               TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+                                                               try {
+                                                                       
consumer.commit(partition, message.getOffset());
+                                                               } catch 
(Exception commitException) {
+                                                                       
LOG.warn("Ranger tagsync already processed message at offset " + 
message.getOffset() + ". Ignoring failure in committing this message and 
continuing to process next message", commitException);
+                                                                       
LOG.warn("This will cause Kafka to deliver this message:[" + 
message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+                                                               }
+                                                       } else {
+                                                               
seenCommitException = false;
+                                                               
offsetOfLastMessageDeliveredToRanger = -1L;
+                                                               break;
+                                                       }
+                                               }
+                                       }
 
-                                       for 
(AtlasKafkaMessage<EntityNotification> message :  messages) {
+                                       for (; index < messages.size(); 
index++) {
+                                               
AtlasKafkaMessage<EntityNotification> message = messages.get(index);
                                                EntityNotification notification 
= message != null ? message.getMessage() : null;
 
                                                if (notification != null) {
                                                        if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+                                                               
LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + 
getPrintableEntityNotification(notification));
                                                        }
 
                                                        ServiceTags serviceTags 
= AtlasNotificationMapper.processEntityNotification(notification);
                                                        if (serviceTags != 
null) {
                                                                
updateSink(serviceTags);
                                                        }
-
-                                                       TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-                                                       
consumer.commit(partition, message.getOffset());
+                                                       
offsetOfLastMessageDeliveredToRanger = message.getOffset();
+
+                                                       if 
(!seenCommitException) {
+                                                               TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+                                                               try {
+                                                                       
consumer.commit(partition, message.getOffset());
+                                                               } catch 
(Exception commitException) {
+                                                                       
seenCommitException = true;
+                                                                       
LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + 
". Ignoring failure in committing this message and continuing to process next 
message", commitException);
+                                                               }
+                                                       }
                                                } else {
                                                        LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
                                                }
                                        }
                                } catch (Exception exception) {
-                                       LOG.error("Caught exception..: ", 
exception);
-                                       return;
+                                       LOG.error("Caught exception : ", 
exception);
+                                       // If transient error, retry after 
short interval
+                                       try {
+                                               Thread.sleep(100);
+                                       } catch (InterruptedException 
interrupted) {
+                                               LOG.error("Interrupted: ", 
interrupted);
+                                               LOG.error("Returning from 
thread. May cause process to be up but not processing events!!");
+                                               return;
+                                       }
                                }
                        }
                }

Reply via email to