Repository: ranger Updated Branches: refs/heads/ranger-1.0 89864e60b -> e3cdec010
RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/e3cdec01 Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/e3cdec01 Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/e3cdec01 Branch: refs/heads/ranger-1.0 Commit: e3cdec0102b398071a4f8ee26b29b3b7d94d5ad0 Parents: 89864e6 Author: Abhay Kulkarni <[email protected]> Authored: Thu May 17 09:07:11 2018 -0700 Committer: Abhay Kulkarni <[email protected]> Committed: Thu May 17 09:07:11 2018 -0700 ---------------------------------------------------------------------- pom.xml | 1 + src/main/assembly/tagsync.xml | 1 + .../source/atlas/AtlasNotificationMapper.java | 17 ++- .../tagsync/source/atlas/AtlasTagSource.java | 122 ++++++++++++++----- 4 files changed, 105 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c7632ba..3bf6964 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ <atlas.guava.version>14.0</atlas.guava.version> <atlas.gson.version>2.5</atlas.gson.version> <atlas.jettison.version>1.3.7</atlas.jettison.version> + <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version> <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version> <bouncycastle.version>1.55</bouncycastle.version> <c3p0.version>0.9.1.2</c3p0.version> http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/src/main/assembly/tagsync.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml index 0b17151..993fa5e 100644 --- a/src/main/assembly/tagsync.xml +++ b/src/main/assembly/tagsync.xml @@ -46,6 +46,7 @@ <include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-client:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-common:jar:${atlas.version}</include> + <include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include> <include>org.apache.hadoop:hadoop-auth</include> <include>org.apache.hadoop:hadoop-common</include> <include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include> http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index f007ae5..e9245e4 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -128,11 +128,22 @@ public class AtlasNotificationMapper { if(opType != null) { switch (opType) { - case ENTITY_CREATE: { - LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); + case ENTITY_CREATE: + ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits()); + if (!ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits associated with the entity. Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); + } + } break; - } case ENTITY_UPDATE: + ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits()); + if (!ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits associated with the entity."); + } + } + break; case ENTITY_DELETE: case TRAIT_ADD: case TRAIT_UPDATE: http://git-wip-us.apache.org/repos/asf/ranger/blob/e3cdec01/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index c382db0..403f31f 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -160,40 +160,96 @@ public class AtlasTagSource extends AbstractTagSource { if (LOG.isDebugEnabled()) { LOG.debug("==> ConsumerRunnable.run()"); } + + boolean seenCommitException = false; + long offsetOfLastMessageDeliveredToRanger = -1L; + while (true) { - try { - List<AtlasKafkaMessage<Object>> messages = consumer.receive(1000L); - for (AtlasKafkaMessage<Object> message : messages) { - Object kafkaMessage = message != null ? message.getMessage() : null; - - if (kafkaMessage != null) { - EntityNotification notification = null; - if (kafkaMessage instanceof EntityNotification) { - notification = (EntityNotification) kafkaMessage; - } else { - LOG.warn("Received Kafka notification of unexpected type:[" + kafkaMessage.getClass().toString() + "], Ignoring..."); - } - if (notification != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Notification=" + getPrintableEntityNotification(notification)); - } - - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); - if (serviceTags != null) { - updateSink(serviceTags); - } - } - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); - consumer.commit(partition, message.getOffset()); - } else { - LOG.error("Null message received from Kafka!! Ignoring.."); - } - } - } catch (Exception exception) { - LOG.error("Caught exception..: ", exception); - return; - } - } + try { + List<AtlasKafkaMessage<Object>> messages = consumer.receive(1000L); + + int index = 0; + + if (messages.size() > 0 && seenCommitException) { + if (LOG.isDebugEnabled()) { + LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" + offsetOfLastMessageDeliveredToRanger + "]"); + } + for (; index < messages.size(); index++) { + AtlasKafkaMessage<Object> message = messages.get(index); + Object kafkaMessage = message != null ? message.getMessage() : null; + + if (kafkaMessage != null) { + if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) { + // Already delivered to Ranger + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Committing previously commit-failed message with offset:[" + message.getOffset() + "]"); + } + consumer.commit(partition, message.getOffset()); + } catch (Exception commitException) { + LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); + LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!"); + } + } else { + break; + } + } + } + } + + seenCommitException = false; + offsetOfLastMessageDeliveredToRanger = -1L; + + for (; index < messages.size(); index++) { + AtlasKafkaMessage<Object> message = messages.get(index); + Object kafkaMessage = message != null ? message.getMessage() : null; + + if (kafkaMessage != null) { + EntityNotification notification = null; + if (kafkaMessage instanceof EntityNotification) { + notification = (EntityNotification) kafkaMessage; + } else { + LOG.warn("Received Kafka notification of unexpected type:[" + kafkaMessage.getClass().toString() + "], Ignoring..."); + } + + if (notification != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notification)); + } + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); + if (serviceTags != null) { + updateSink(serviceTags); + } + } + + offsetOfLastMessageDeliveredToRanger = message.getOffset(); + + if (!seenCommitException) { + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + try { + consumer.commit(partition, message.getOffset()); + } catch (Exception commitException) { + seenCommitException = true; + LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException); + } + } + } else { + LOG.error("Null entityNotification received from Kafka!! Ignoring.."); + } + } + } catch (Exception exception) { + LOG.error("Caught exception..: ", exception); + // If transient error, retry after short interval + try { + Thread.sleep(100); + } catch (InterruptedException interrupted) { + LOG.error("Interrupted: ", interrupted); + LOG.error("Returning from thread. May cause process to be up but not processing events!!"); + return; + } + } + } } } }
