Repository: ranger Updated Branches: refs/heads/ranger-0.7 7e3963bc7 -> 126ff6ee0
RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/126ff6ee Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/126ff6ee Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/126ff6ee Branch: refs/heads/ranger-0.7 Commit: 126ff6ee04e580dcf8b924f76df0e3917221106e Parents: 7e3963b Author: Abhay Kulkarni <[email protected]> Authored: Thu May 17 08:51:40 2018 -0700 Committer: Abhay Kulkarni <[email protected]> Committed: Thu May 17 08:51:40 2018 -0700 ---------------------------------------------------------------------- pom.xml | 1 + src/main/assembly/tagsync.xml | 1 + .../source/atlas/AtlasNotificationMapper.java | 53 +++++++++++++------ .../tagsync/source/atlas/AtlasTagSource.java | 54 +++++++++++++++++--- 4 files changed, 85 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b76e4e3..2ec6768 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ <atlas.guava.version>14.0</atlas.guava.version> <atlas.gson.version>2.5</atlas.gson.version> <atlas.jettison.version>1.3.7</atlas.jettison.version> + <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version> <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version> <avatica.version>1.7.1</avatica.version> <bouncycastle.version>1.55</bouncycastle.version> http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/src/main/assembly/tagsync.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml index 26b42ca..5139937 100644 --- a/src/main/assembly/tagsync.xml +++ b/src/main/assembly/tagsync.xml @@ -50,6 +50,7 @@ <include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-common:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include> + <include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include> <include>org.apache.hadoop:hadoop-auth</include> <include>org.apache.hadoop:hadoop-common</include> <include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include> http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 8641d60..1c7f063 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -51,7 +51,6 @@ import java.util.*; public class AtlasNotificationMapper { private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class); - private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>(); private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() { @@ -140,8 +139,20 @@ public class AtlasNotificationMapper { switch (opType) { case ENTITY_CREATE: ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits()); + if (!ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits associated with the entity. Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); + } + } break; case ENTITY_UPDATE: + ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits()); + if (!ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits associated with the entity."); + } + } + break; case ENTITY_DELETE: case TRAIT_ADD: case TRAIT_UPDATE: @@ -278,9 +289,8 @@ public class AtlasNotificationMapper { List<RangerTag> ret = new ArrayList<RangerTag>(); IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null; - if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) { - for (String traitName : entity.getTraits()) { - IStruct trait = entity.getTrait(traitName); + if(entity != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { + for (IStruct trait : entityWithTraits.getAllTraits()) { Map<String, String> tagAttrs = new HashMap<String, String>(); try { @@ -310,9 +320,8 @@ public class AtlasNotificationMapper { List<RangerTagDef> ret = new ArrayList<RangerTagDef>(); IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null; - if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) { - for (String traitName : entity.getTraits()) { - IStruct trait = entity.getTrait(traitName); + if(entity != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { + for (IStruct trait : entityWithTraits.getAllTraits()) { RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); try { @@ -415,7 +424,7 @@ public class AtlasNotificationMapper { if (serviceResource != null) { List<RangerTag> tags = getTags(entity, typeRegistry); - List<RangerTagDef> tagDefs = getTagDefs(entity); + List<RangerTagDef> tagDefs = getTagDefs(entity, typeRegistry); String serviceName = serviceResource.getServiceName(); ret = createOrGetServiceTags(serviceTagsMap, serviceName); @@ -477,28 +486,38 @@ public class AtlasNotificationMapper { return ret; } - static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) { + static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry) { List<RangerTagDef> ret = new ArrayList<>(); if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) { - List<AtlasClassification> traits = entity.getClassifications(); + List<AtlasClassification> classifications = entity.getClassifications(); - for (AtlasClassification trait : traits) { - RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); + for (AtlasClassification classification : classifications) { + ret.add(getTagDef(classification)); - if(MapUtils.isNotEmpty(trait.getAttributes())) { - for (String attrName : trait.getAttributes().keySet()) { - tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); + List<AtlasClassification> superClassifications = getSuperClassifications(classification, typeRegistry); + + if (CollectionUtils.isNotEmpty(superClassifications)) { + for (AtlasClassification superClassification : superClassifications) { + ret.add(getTagDef(superClassification)); } } - - ret.add(tagDef); } } return ret; } + static private RangerTagDef getTagDef(AtlasClassification classification) { + RangerTagDef tagDef = new RangerTagDef(classification.getTypeName(), "Atlas"); + if(MapUtils.isNotEmpty(classification.getAttributes())) { + for (String attrName : classification.getAttributes().keySet()) { + tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); + } + } + return tagDef; + } + static private List<AtlasClassification> getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry typeRegistry) { List<AtlasClassification> ret = null; AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 95ff8ec..3810442 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -160,32 +160,72 @@ public class AtlasTagSource extends AbstractTagSource { if (LOG.isDebugEnabled()) { LOG.debug("==> ConsumerRunnable.run()"); } + boolean seenCommitException = false; + long offsetOfLastMessageDeliveredToRanger = -1L; + while (true) { try { List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L); + int index = 0; + + if (seenCommitException) { + for (; index < messages.size(); index++) { + AtlasKafkaMessage<EntityNotification> message = messages.get(index); + if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) { + // Already delivered to Ranger + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + try { + consumer.commit(partition, message.getOffset()); + } catch (Exception commitException) { + LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); + LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!"); + } + } else { + seenCommitException = false; + offsetOfLastMessageDeliveredToRanger = -1L; + break; + } + } + } - for (AtlasKafkaMessage<EntityNotification> message : messages) { + for (; index < messages.size(); index++) { + AtlasKafkaMessage<EntityNotification> message = messages.get(index); EntityNotification notification = message != null ? message.getMessage() : null; if (notification != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Notification=" + getPrintableEntityNotification(notification)); + LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notification)); } ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); if (serviceTags != null) { updateSink(serviceTags); } - - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); - consumer.commit(partition, message.getOffset()); + offsetOfLastMessageDeliveredToRanger = message.getOffset(); + + if (!seenCommitException) { + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + try { + consumer.commit(partition, message.getOffset()); + } catch (Exception commitException) { + seenCommitException = true; + LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); + } + } } else { LOG.error("Null entityNotification received from Kafka!! Ignoring.."); } } } catch (Exception exception) { - LOG.error("Caught exception..: ", exception); - return; + LOG.error("Caught exception : ", exception); + // If transient error, retry after short interval + try { + Thread.sleep(100); + } catch (InterruptedException interrupted) { + LOG.error("Interrupted: ", interrupted); + LOG.error("Returning from thread. May cause process to be up but not processing events!!"); + return; + } } } }
