Repository: ranger
Updated Branches:
  refs/heads/master ab6cb3935 -> 081af4819


RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated 
entity does not have associated traits


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/081af481
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/081af481
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/081af481

Branch: refs/heads/master
Commit: 081af481923207ee93a3d4a7cc29901b4c972a44
Parents: ab6cb39
Author: Abhay Kulkarni <[email protected]>
Authored: Thu May 17 09:45:06 2018 -0700
Committer: Abhay Kulkarni <[email protected]>
Committed: Thu May 17 09:45:06 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |  1 +
 src/main/assembly/tagsync.xml                   |  1 +
 .../source/atlas/AtlasNotificationMapper.java   | 12 ++++
 .../tagsync/source/atlas/AtlasTagSource.java    | 62 +++++++++++++++++---
 4 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 878f5f0..756eccb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jackson.version>2.9.2</atlas.jackson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
+        <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version>
         <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
         <bouncycastle.version>1.55</bouncycastle.version>
         <c3p0.version>0.9.1.2</c3p0.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0788ac1..bc6e28b 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -45,6 +45,7 @@
                                        
<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
+                                       
<include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include>
                                        
<include>org.apache.hadoop:hadoop-auth</include>
                                        
<include>org.apache.hadoop:hadoop-common</include>
                                        
<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 85c7c20..a4cab28 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -118,8 +118,20 @@ public class AtlasNotificationMapper {
             switch (opType) {
                 case ENTITY_CREATE:
                     ret = ! entityNotification.getIsEmptyClassifications();
+                    if (!ret) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("ENTITY_CREATE notification is ignored, 
as there are no traits associated with the entity. Ranger will get necessary 
information from any subsequent TRAIT_ADDED notification");
+                        }
+                    }
                     break;
                 case ENTITY_UPDATE:
+                    ret = ! entityNotification.getIsEmptyClassifications();
+                    if (!ret) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("ENTITY_UPDATE notification is ignored, 
as there are no traits associated with the entity.");
+                        }
+                    }
+                    break;
                 case ENTITY_DELETE:
                 case CLASSIFICATION_ADD:
                 case CLASSIFICATION_UPDATE:

http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index ea4c20c..21a22cd 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -104,7 +104,6 @@ public class AtlasTagSource extends AbstractTagSource {
                        List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
 
                        consumerTask = new ConsumerRunnable(iterators.get(0));
-
                }
 
                if (LOG.isDebugEnabled()) {
@@ -163,11 +162,45 @@ public class AtlasTagSource extends AbstractTagSource {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("==> ConsumerRunnable.run()");
                        }
+
+                       boolean seenCommitException = false;
+                       long offsetOfLastMessageDeliveredToRanger = -1L;
+
                        while (true) {
                                try {
                                        
List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
 
-                                       for 
(AtlasKafkaMessage<EntityNotification> message :  messages) {
+                                       int index = 0;
+
+                                       if (messages.size() > 0 && 
seenCommitException) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" 
+ offsetOfLastMessageDeliveredToRanger + "]");
+                                               }
+                                               for (; index < messages.size(); 
index++) {
+                                                       
AtlasKafkaMessage<EntityNotification> message = messages.get(index);
+                                                       if (message.getOffset() 
<= offsetOfLastMessageDeliveredToRanger) {
+                                                               // Already 
delivered to Ranger
+                                                               TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+                                                               try {
+                                                                       if 
(LOG.isDebugEnabled()) {
+                                                                               
LOG.debug("Committing previously commit-failed message with offset:[" + 
message.getOffset() + "]");
+                                                                       }
+                                                                       
consumer.commit(partition, message.getOffset());
+                                                               } catch 
(Exception commitException) {
+                                                                       
LOG.warn("Ranger tagsync already processed message at offset " + 
message.getOffset() + ". Ignoring failure in committing this message and 
continuing to process next message", commitException);
+                                                                       
LOG.warn("This will cause Kafka to deliver this message:[" + 
message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+                                                               }
+                                                       } else {
+                                                               break;
+                                                       }
+                                               }
+                                       }
+
+                                       seenCommitException = false;
+                                       offsetOfLastMessageDeliveredToRanger = 
-1L;
+
+                                       for (; index < messages.size(); 
index++) {
+                                               
AtlasKafkaMessage<EntityNotification> message = messages.get(index);
                                                EntityNotification notification 
= message != null ? message.getMessage() : null;
 
                                                if (notification != null) {
@@ -179,16 +212,24 @@ public class AtlasTagSource extends AbstractTagSource {
                                                        }
                                                        if (notificationWrapper 
!= null) {
                                                                if 
(LOG.isDebugEnabled()) {
-                                                                       
LOG.debug("Notification=" + 
getPrintableEntityNotification(notificationWrapper));
+                                                                       
LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + 
getPrintableEntityNotification(notificationWrapper));
                                                                }
 
                                                                ServiceTags 
serviceTags = 
AtlasNotificationMapper.processEntityNotification(notificationWrapper);
                                                                if (serviceTags 
!= null) {
                                                                        
updateSink(serviceTags);
                                                                }
-
-                                                               TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-                                                               
consumer.commit(partition, message.getOffset());
+                                                               
offsetOfLastMessageDeliveredToRanger = message.getOffset();
+
+                                                               if 
(!seenCommitException) {
+                                                                       
TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", 
message.getPartition());
+                                                                       try {
+                                                                               
consumer.commit(partition, message.getOffset());
+                                                                       } catch 
(Exception commitException) {
+                                                                               
seenCommitException = true;
+                                                                               
LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + 
". Ignoring failure in committing this message and continuing to process next 
message", commitException);
+                                                                       }
+                                                               }
                                                        }
                                                } else {
                                                        LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
@@ -196,7 +237,14 @@ public class AtlasTagSource extends AbstractTagSource {
                                        }
                                } catch (Exception exception) {
                                        LOG.error("Caught exception..: ", 
exception);
-                                       return;
+                                       // If transient error, retry after 
short interval
+                                       try {
+                                               Thread.sleep(100);
+                                       } catch (InterruptedException 
interrupted) {
+                                               LOG.error("Interrupted: ", 
interrupted);
+                                               LOG.error("Returning from 
thread. May cause process to be up but not processing events!!");
+                                               return;
+                                       }
                                }
                        }
                }

Reply via email to