Repository: ranger
Updated Branches:
  refs/heads/ranger-1.0 89864e60b -> e3cdec010


RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated 
entity does not have associated traits


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/e3cdec01
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/e3cdec01
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/e3cdec01

Branch: refs/heads/ranger-1.0
Commit: e3cdec0102b398071a4f8ee26b29b3b7d94d5ad0
Parents: 89864e6
Author: Abhay Kulkarni <akulka...@hortonworks.com>
Authored: Thu May 17 09:07:11 2018 -0700
Committer: Abhay Kulkarni <akulka...@hortonworks.com>
Committed: Thu May 17 09:07:11 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 src/main/assembly/tagsync.xml                   |   1 +
 .../source/atlas/AtlasNotificationMapper.java   |  17 ++-
 .../tagsync/source/atlas/AtlasTagSource.java    | 122 ++++++++++++++-----
 4 files changed, 105 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c7632ba..3bf6964 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
         <atlas.guava.version>14.0</atlas.guava.version>
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
+        <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version>
         <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
         <bouncycastle.version>1.55</bouncycastle.version>
         <c3p0.version>0.9.1.2</c3p0.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0b17151..993fa5e 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -46,6 +46,7 @@
                                        
<include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-client:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
+                                       
<include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include>
                                        
<include>org.apache.hadoop:hadoop-auth</include>
                                        
<include>org.apache.hadoop:hadoop-common</include>
                                        
<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index f007ae5..e9245e4 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -128,11 +128,22 @@ public class AtlasNotificationMapper {
 
                if(opType != null) {
                        switch (opType) {
-                               case ENTITY_CREATE: {
-                                       LOG.debug("ENTITY_CREATE notification 
is not handled, as Ranger will get necessary information from any subsequent 
TRAIT_ADDED notification");
+                               case ENTITY_CREATE:
+                                       ret = 
CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+                                       if (!ret) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits 
associated with the entity. Ranger will get necessary information from any 
subsequent TRAIT_ADDED notification");
+                                               }
+                                       }
                                        break;
-                               }
                                case ENTITY_UPDATE:
+                                       ret = 
CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+                                       if (!ret) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits 
associated with the entity.");
+                                               }
+                                       }
+                                       break;
                                case ENTITY_DELETE:
                                case TRAIT_ADD:
                                case TRAIT_UPDATE:

http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index c382db0..403f31f 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -160,40 +160,96 @@ public class AtlasTagSource extends AbstractTagSource {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("==> ConsumerRunnable.run()");
                        }
+
+                       boolean seenCommitException = false;
+                       long offsetOfLastMessageDeliveredToRanger = -1L;
+
                        while (true) {
-                try {
-                    List<AtlasKafkaMessage<Object>> messages = 
consumer.receive(1000L);
-                    for (AtlasKafkaMessage<Object> message : messages) {
-                        Object kafkaMessage = message != null ? 
message.getMessage() : null;
-
-                        if (kafkaMessage != null) {
-                            EntityNotification notification = null;
-                            if (kafkaMessage instanceof EntityNotification) {
-                                notification = (EntityNotification) 
kafkaMessage;
-                            } else {
-                                LOG.warn("Received Kafka notification of 
unexpected type:[" + kafkaMessage.getClass().toString() + "], Ignoring...");
-                            }
-                            if (notification != null) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Notification=" + 
getPrintableEntityNotification(notification));
-                                }
-
-                                ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification);
-                                if (serviceTags != null) {
-                                    updateSink(serviceTags);
-                                }
-                            }
-                            TopicPartition partition = new 
TopicPartition("ATLAS_ENTITIES", message.getPartition());
-                            consumer.commit(partition, message.getOffset());
-                        } else {
-                            LOG.error("Null message received from Kafka!! 
Ignoring..");
-                        }
-                    }
-                } catch (Exception exception) {
-                    LOG.error("Caught exception..: ", exception);
-                    return;
-                }
-            }
+                               try {
+                                       List<AtlasKafkaMessage<Object>> 
messages = consumer.receive(1000L);
+
+                                       int index = 0;
+
+                                       if (messages.size() > 0 && 
seenCommitException) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" 
+ offsetOfLastMessageDeliveredToRanger + "]");
+                                               }
+                                               for (; index < messages.size(); 
index++) {
+                                                       
AtlasKafkaMessage<Object> message = messages.get(index);
+                                                       Object kafkaMessage = 
message != null ? message.getMessage() : null;
+
+                                                       if (kafkaMessage != 
null) {
+                                                               if 
(message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
+                                                                       // 
Already delivered to Ranger
+                                                                       
TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", 
message.getPartition());
+                                                                       try {
+                                                                               
if (LOG.isDebugEnabled()) {
+                                                                               
        LOG.debug("Committing previously commit-failed message with offset:[" + 
message.getOffset() + "]");
+                                                                               
}
+                                                                               
consumer.commit(partition, message.getOffset());
+                                                                       } catch 
(Exception commitException) {
+                                                                               
LOG.warn("Ranger tagsync already processed message at offset " + 
message.getOffset() + ". Ignoring failure in committing this message and 
continuing to process next message", commitException);
+                                                                               
LOG.warn("This will cause Kafka to deliver this message:[" + 
message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+                                                                       }
+                                                               } else {
+                                                                       break;
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       seenCommitException = false;
+                                       offsetOfLastMessageDeliveredToRanger = 
-1L;
+
+                                       for (; index < messages.size(); 
index++) {
+                                               AtlasKafkaMessage<Object> 
message = messages.get(index);
+                                               Object kafkaMessage = message 
!= null ? message.getMessage() : null;
+
+                                               if (kafkaMessage != null) {
+                                                       EntityNotification 
notification = null;
+                                                       if (kafkaMessage 
instanceof EntityNotification) {
+                                                               notification = 
(EntityNotification) kafkaMessage;
+                                                       } else {
+                                                               
LOG.warn("Received Kafka notification of unexpected type:[" + 
kafkaMessage.getClass().toString() + "], Ignoring...");
+                                                       }
+
+                                                       if (notification != 
null) {
+                                                               if 
(LOG.isDebugEnabled()) {
+                                                                       
LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + 
getPrintableEntityNotification(notification));
+                                                               }
+                                                               ServiceTags 
serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
+                                                               if (serviceTags 
!= null) {
+                                                                       
updateSink(serviceTags);
+                                                               }
+                                                       }
+
+                                                       
offsetOfLastMessageDeliveredToRanger = message.getOffset();
+
+                                                       if 
(!seenCommitException) {
+                                                               TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+                                                               try {
+                                                                       
consumer.commit(partition, message.getOffset());
+                                                               } catch 
(Exception commitException) {
+                                                                       
seenCommitException = true;
+                                                                       
LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + 
". Ignoring failure in committing this message and continuing to process next 
message", commitException);
+                                                               }
+                                                       }
+                                               } else {
+                                                       LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
+                                               }
+                                       }
+                               } catch (Exception exception) {
+                                       LOG.error("Caught exception..: ", 
exception);
+                                       // If transient error, retry after 
short interval
+                                       try {
+                                               Thread.sleep(100);
+                                       } catch (InterruptedException 
interrupted) {
+                                               LOG.error("Interrupted: ", 
interrupted);
+                                               LOG.error("Returning from 
thread. May cause process to be up but not processing events!!");
+                                               return;
+                                       }
+                               }
+                       }
                }
        }
 }

Reply via email to