This is an automated email from the ASF dual-hosted git repository.
abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new e8a6125ba RANGER-4130: Improve performance of event processing in
agsync by optimizing number of commits to Kafka broker
e8a6125ba is described below
commit e8a6125ba99b5ca4f62923552ddb251ee476cfdd
Author: Abhay Kulkarni <[email protected]>
AuthorDate: Tue Apr 18 18:07:32 2023 -0700
RANGER-4130: Improve performance of event processing in agsync by
optimizing number of commits to Kafka broker
---
.../tagsync/source/atlas/AtlasTagSource.java | 68 ++++++++++------------
1 file changed, 32 insertions(+), 36 deletions(-)
diff --git
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index a618cc986..34a39f73c 100644
---
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -52,9 +52,9 @@ public class AtlasTagSource extends AbstractTagSource {
public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME =
"atlas-application.properties";
- public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS =
"atlas.kafka.bootstrap.servers";
- public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT =
"atlas.kafka.zookeeper.connect";
- public static final String TAGSYNC_ATLAS_CONSUMER_GROUP =
"atlas.kafka.entities.group.id";
+ public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS =
"atlas.kafka.bootstrap.servers";
+ public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT =
"atlas.kafka.zookeeper.connect";
+ public static final String TAGSYNC_ATLAS_CONSUMER_GROUP =
"atlas.kafka.entities.group.id";
public static final int MAX_WAIT_TIME_IN_MILLIS = 1000;
@@ -168,11 +168,10 @@ public class AtlasTagSource extends AbstractTagSource {
private final List<RangerAtlasEntityWithTags>
atlasEntitiesWithTags = new ArrayList<>();
private final List<AtlasKafkaMessage<EntityNotification>>
messages = new ArrayList<>();
+ private AtlasKafkaMessage<EntityNotification>
lastUnhandledMessage = null;
- private long offsetOfLastMessageDeliveredToRanger = -1L;
private long offsetOfLastMessageCommittedToKafka = -1L;
-
- private boolean isHandlingDeleteOps = false;
+ private boolean isHandlingDeleteOps = false;
private
ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
this.consumer = consumer;
@@ -222,10 +221,11 @@ public class AtlasTagSource extends AbstractTagSource {
}
atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper));
+
messages.add(message);
} else {
AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper);
+
lastUnhandledMessage = message;
}
-
messages.add(message);
}
} else {
LOG.error("Null
entityNotification received from Kafka!! Ignoring..");
@@ -235,6 +235,10 @@ public class AtlasTagSource extends AbstractTagSource {
buildAndUploadServiceTags();
}
}
+ if (lastUnhandledMessage != null) {
+
commitToKafka(lastUnhandledMessage);
+ lastUnhandledMessage = null;
+ }
} catch (Exception exception) {
LOG.error("Caught exception..: ",
exception);
@@ -255,9 +259,7 @@ public class AtlasTagSource extends AbstractTagSource {
LOG.debug("==> buildAndUploadServiceTags()");
}
- if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
-
- commitToKafka();
+ if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)
&& CollectionUtils.isNotEmpty(messages)) {
Map<String, ServiceTags> serviceTagsMap =
AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
@@ -284,17 +286,16 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
- offsetOfLastMessageDeliveredToRanger =
messages.get(messages.size() - 1).getOffset();
+ AtlasKafkaMessage<EntityNotification>
latestMessageDeliveredToRanger = messages.get(messages.size() - 1);
+ commitToKafka(latestMessageDeliveredToRanger);
+
+ atlasEntitiesWithTags.clear();
+ messages.clear();
if (LOG.isDebugEnabled()) {
LOG.debug("Completed processing batch
of messages of size:[" + messages.size() + "] received from
NotificationConsumer");
}
- commitToKafka();
-
- atlasEntitiesWithTags.clear();
- messages.clear();
-
}
if (LOG.isDebugEnabled()) {
@@ -302,34 +303,29 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
- private void commitToKafka() {
+ private void
commitToKafka(AtlasKafkaMessage<EntityNotification> messageToCommit) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> commitToKafka()");
+ LOG.debug("==> commitToKafka(" +
messageToCommit + ")");
}
- for (AtlasKafkaMessage<EntityNotification> message :
messages) {
- if (message.getOffset() >
offsetOfLastMessageCommittedToKafka) {
- if (message.getOffset() <=
offsetOfLastMessageDeliveredToRanger) {
- // Already delivered to Ranger
- TopicPartition partition = new
TopicPartition("ATLAS_ENTITIES", message.getPartition());
- try {
- if
(LOG.isDebugEnabled()) {
-
LOG.debug("Committing message with offset:[" + message.getOffset() + "] to
Kafka");
- }
-
consumer.commit(partition, message.getOffset());
-
offsetOfLastMessageCommittedToKafka = message.getOffset();
- } catch (Exception
commitException) {
- LOG.warn("Ranger
tagsync already processed message at offset " + message.getOffset() + ".
Ignoring failure in committing this message and continuing to process next
message", commitException);
- LOG.warn("This will
cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!!
This may be unrecoverable error!!");
- }
- } else {
- break;
+ long messageOffset = messageToCommit.getOffset();
+ int partitionId = messageToCommit.getPartition();
+
+ if (offsetOfLastMessageCommittedToKafka <
messageOffset) {
+ TopicPartition partition = new
TopicPartition("ATLAS_ENTITIES", partitionId);
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing message
with offset:[" + messageOffset + "] to Kafka");
}
+ consumer.commit(partition,
messageOffset);
+ offsetOfLastMessageCommittedToKafka =
messageOffset;
+ } catch (Exception commitException) {
+ LOG.warn("Ranger tagsync already
processed message at offset " + messageOffset + ". Ignoring failure in
committing message:[" + messageToCommit + "]", commitException);
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== commitToKafka()");
+ LOG.debug("<== commitToKafka(" +
messageToCommit + ")");
}
}
}