Repository: ranger Updated Branches: refs/heads/master ab6cb3935 -> 081af4819
RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/081af481 Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/081af481 Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/081af481 Branch: refs/heads/master Commit: 081af481923207ee93a3d4a7cc29901b4c972a44 Parents: ab6cb39 Author: Abhay Kulkarni <[email protected]> Authored: Thu May 17 09:45:06 2018 -0700 Committer: Abhay Kulkarni <[email protected]> Committed: Thu May 17 09:45:06 2018 -0700 ---------------------------------------------------------------------- pom.xml | 1 + src/main/assembly/tagsync.xml | 1 + .../source/atlas/AtlasNotificationMapper.java | 12 ++++ .../tagsync/source/atlas/AtlasTagSource.java | 62 +++++++++++++++++--- 4 files changed, 69 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 878f5f0..756eccb 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,7 @@ <atlas.gson.version>2.5</atlas.gson.version> <atlas.jackson.version>2.9.2</atlas.jackson.version> <atlas.jettison.version>1.3.7</atlas.jettison.version> + <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version> <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version> <bouncycastle.version>1.55</bouncycastle.version> <c3p0.version>0.9.1.2</c3p0.version> http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/src/main/assembly/tagsync.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml index 0788ac1..bc6e28b 100644 --- a/src/main/assembly/tagsync.xml +++ b/src/main/assembly/tagsync.xml @@ -45,6 +45,7 @@ <include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-common:jar:${atlas.version}</include> + <include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include> <include>org.apache.hadoop:hadoop-auth</include> <include>org.apache.hadoop:hadoop-common</include> <include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include> http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 85c7c20..a4cab28 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -118,8 +118,20 @@ public class AtlasNotificationMapper { switch (opType) { case ENTITY_CREATE: ret = ! entityNotification.getIsEmptyClassifications(); + if (!ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits associated with the entity. Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); + } + } break; case ENTITY_UPDATE: + ret = ! entityNotification.getIsEmptyClassifications(); + if (!ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits associated with the entity."); + } + } + break; case ENTITY_DELETE: case CLASSIFICATION_ADD: case CLASSIFICATION_UPDATE: http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index ea4c20c..21a22cd 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -104,7 +104,6 @@ public class AtlasTagSource extends AbstractTagSource { List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); consumerTask = new ConsumerRunnable(iterators.get(0)); - } if (LOG.isDebugEnabled()) { @@ -163,11 +162,45 @@ public class AtlasTagSource extends AbstractTagSource { if (LOG.isDebugEnabled()) { LOG.debug("==> ConsumerRunnable.run()"); } + + boolean seenCommitException = false; + long offsetOfLastMessageDeliveredToRanger = -1L; + while (true) { try { List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L); - for (AtlasKafkaMessage<EntityNotification> message : messages) { + int index = 0; + + if (messages.size() > 0 && seenCommitException) { + if (LOG.isDebugEnabled()) { + LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" + offsetOfLastMessageDeliveredToRanger + "]"); + } + for (; index < messages.size(); index++) { + AtlasKafkaMessage<EntityNotification> message = messages.get(index); + if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) { + // Already delivered to Ranger + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Committing previously commit-failed message with offset:[" + message.getOffset() + "]"); + } + consumer.commit(partition, message.getOffset()); + } catch (Exception commitException) { + LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); + LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!"); + } + } else { + break; + } + } + } + + seenCommitException = false; + offsetOfLastMessageDeliveredToRanger = -1L; + + for (; index < messages.size(); index++) { + AtlasKafkaMessage<EntityNotification> message = messages.get(index); EntityNotification notification = message != null ? message.getMessage() : null; if (notification != null) { @@ -179,16 +212,24 @@ public class AtlasTagSource extends AbstractTagSource { } if (notificationWrapper != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Notification=" + getPrintableEntityNotification(notificationWrapper)); + LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notificationWrapper)); } ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper); if (serviceTags != null) { updateSink(serviceTags); } - - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); - consumer.commit(partition, message.getOffset()); + offsetOfLastMessageDeliveredToRanger = message.getOffset(); + + if (!seenCommitException) { + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + try { + consumer.commit(partition, message.getOffset()); + } catch (Exception commitException) { + seenCommitException = true; + LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); + } + } } } else { LOG.error("Null entityNotification received from Kafka!! Ignoring.."); @@ -196,7 +237,14 @@ public class AtlasTagSource extends AbstractTagSource { } } catch (Exception exception) { LOG.error("Caught exception..: ", exception); - return; + // If transient error, retry after short interval + try { + Thread.sleep(100); + } catch (InterruptedException interrupted) { + LOG.error("Interrupted: ", interrupted); + LOG.error("Returning from thread. May cause process to be up but not processing events!!"); + return; + } } } }
