This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f724df8 ATLAS-3133: enhanced Atlas server to process notifications
from multiple Kafka topics
f724df8 is described below
commit f724df8201328b3b893c2eb8975bca57ecfce3dc
Author: Saqeeb Shaikh <[email protected]>
AuthorDate: Thu Jul 11 12:28:30 2019 +0530
ATLAS-3133: enhanced Atlas server to process notifications from multiple
Kafka topics
Signed-off-by: Madhan Neethiraj <[email protected]>
(cherry picked from commit 47d4d588f2ed70396bb64b80c6362d8115350339)
---
.../java/org/apache/atlas/AtlasConfiguration.java | 26 +++
.../apache/atlas/model/metrics/AtlasMetrics.java | 3 +-
.../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 2 +-
.../org/apache/atlas/kafka/AtlasKafkaMessage.java | 28 ++--
.../org/apache/atlas/kafka/KafkaNotification.java | 176 +++++++++++++++------
.../org/apache/atlas/kafka/KafkaConsumerTest.java | 32 ++--
.../atlas/kafka/KafkaNotificationMockTest.java | 6 +-
.../AbstractNotificationConsumerTest.java | 4 +-
.../org/apache/atlas/util/AtlasMetricsCounter.java | 111 +++++++------
.../org/apache/atlas/util/AtlasMetricsUtil.java | 121 +++++++++++---
.../apache/atlas/services/MetricsServiceTest.java | 4 +-
.../notification/NotificationHookConsumer.java | 7 +-
.../NotificationHookConsumerKafkaTest.java | 5 +-
.../notification/NotificationHookConsumerTest.java | 5 +-
14 files changed, 372 insertions(+), 158 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 3ff1316..9da51f5 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.atlas;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
/**
* Enum that encapsulated each property name and its default value.
@@ -39,6 +40,9 @@ public enum AtlasConfiguration {
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name",
"ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name",
"ATLAS_ENTITIES"),
+
NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names",
"ATLAS_HOOK"), // a comma separated list of topic names
+
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names",
"ATLAS_ENTITIES"), // a comma separated list of topic names
+
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes",
(1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
15 * 60),
@@ -84,6 +88,28 @@ public enum AtlasConfiguration {
return APPLICATION_PROPERTIES.getString(propertyName,
defaultValue.toString());
}
+ public String[] getStringArray() {
+ String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName);
+
+ if (ret == null || ret.length == 0 || (ret.length == 1 &&
StringUtils.isEmpty(ret[0]))) {
+ if (defaultValue != null) {
+ ret = StringUtils.split(defaultValue.toString(), ',');
+ }
+ }
+
+ return ret;
+ }
+
+ public String[] getStringArray(String... defaultValue) {
+ String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName);
+
+ if (ret == null || ret.length == 0 || (ret.length == 1 &&
StringUtils.isEmpty(ret[0]))) {
+ ret = defaultValue;
+ }
+
+ return ret;
+ }
+
public Object get() {
Object value = APPLICATION_PROPERTIES.getProperty(propertyName);
return value == null ? defaultValue : value;
diff --git
a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index c011ad9..a48d93b 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -55,8 +55,7 @@ public class AtlasMetrics {
public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR =
PREFIX_NOTIFICATION + "currentHourFailed";
public static final String STAT_NOTIFY_START_TIME_CURR_HOUR =
PREFIX_NOTIFICATION + "currentHourStartTime";
public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME =
PREFIX_NOTIFICATION + "lastMessageProcessedTime";
- public static final String STAT_NOTIFY_START_OFFSET =
PREFIX_NOTIFICATION + "offsetStart";
- public static final String STAT_NOTIFY_CURRENT_OFFSET =
PREFIX_NOTIFICATION + "offsetCurrent";
+ public static final String STAT_NOTIFY_TOPIC_OFFSETS =
PREFIX_NOTIFICATION + "topicOffsets";
public static final String STAT_NOTIFY_COUNT_PREV_DAY =
PREFIX_NOTIFICATION + "previousDay";
public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY =
PREFIX_NOTIFICATION + "previousDayAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY =
PREFIX_NOTIFICATION + "previousDayEntityCreates";
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 5c840c3..49f9ba3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -88,7 +88,7 @@ public class AtlasKafkaConsumer<T> extends
AbstractNotificationConsumer<T> {
continue;
}
- messages.add(new AtlasKafkaMessage(message, record.offset(),
record.partition()));
+ messages.add(new AtlasKafkaMessage(message, record.offset(),
record.topic(), record.partition()));
}
}
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index b04aba9..22bd79f 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -18,15 +18,17 @@
package org.apache.atlas.kafka;
+import org.apache.kafka.common.TopicPartition;
+
public class AtlasKafkaMessage<T> {
- private final T message;
- private final long offset;
- private final int partition;
-
- public AtlasKafkaMessage(T message, long offset, int partition) {
- this.message = message;
- this.offset = offset;
- this.partition = partition;
+ private final T message;
+ private final long offset;
+ private final TopicPartition topicPartition;
+
+ public AtlasKafkaMessage(T message, long offset, String topic, int
partition) {
+ this.message = message;
+ this.offset = offset;
+ this.topicPartition = new TopicPartition(topic, partition);
}
public T getMessage() {
@@ -37,8 +39,16 @@ public class AtlasKafkaMessage<T> {
return offset;
}
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ public String getTopic() {
+ return topicPartition.topic();
+ }
+
public int getPartition() {
- return partition;
+ return topicPartition.partition();
}
}
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 449eb6f..46c68be 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -57,20 +57,30 @@ public class KafkaNotification extends AbstractNotification
implements Service {
public static final String ATLAS_ENTITIES_TOPIC =
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
+ private static final String[] ATLAS_HOOK_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
+ private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
+
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This
consumer has already been closed.";
- private static final Map<NotificationType, String> TOPIC_MAP = new
HashMap<NotificationType, String>() {
+ private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP =
new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}
};
- private final Properties properties;
- private final Long pollTimeOutMs;
- private KafkaConsumer consumer;
- private KafkaProducer producer;
- private String consumerClosedErrorMsg;
+ private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP =
new HashMap<NotificationType, String[]>() {
+ {
+ put(NotificationType.HOOK,
trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
+ put(NotificationType.ENTITIES,
trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
+ }
+ };
+
+ private final Properties properties;
+ private final Long pollTimeOutMs;
+ private final Map<NotificationType, List<KafkaConsumer>> consumers = new
HashMap<>();
+ private final Map<NotificationType, KafkaProducer> producers = new
HashMap<>();
+ private String
consumerClosedErrorMsg;
// ----- Constructors ----------------------------------------------------
@@ -125,8 +135,8 @@ public class KafkaNotification extends AbstractNotification
implements Service {
}
@VisibleForTesting
- String getTopicName(NotificationType notificationType) {
- return TOPIC_MAP.get(notificationType);
+ String getProducerTopicName(NotificationType notificationType) {
+ return PRODUCER_TOPIC_MAP.get(notificationType);
}
// ----- Service ---------------------------------------------------------
@@ -156,10 +166,43 @@ public class KafkaNotification extends
AbstractNotification implements Service {
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType
notificationType, int numConsumers, boolean autoCommitEnabled) {
LOG.info("==> KafkaNotification.createConsumers(notificationType={},
numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers,
autoCommitEnabled);
- Properties consumerProperties =
getConsumerProperties(notificationType);
- AtlasKafkaConsumer kafkaConsumer = new
AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties,
notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs);
+ String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
+
+ if (numConsumers < topics.length) {
+ LOG.warn("consumers count {} is fewer than number of topics {}.
Creating {} consumers, so that consumer count is equal to number of topics.",
numConsumers, topics.length, topics.length);
+
+ numConsumers = topics.length;
+ } else if (numConsumers > topics.length) {
+ LOG.warn("consumers count {} is higher than number of topics {}.
Creating {} consumers, so that consumer count is equal to number of topics",
numConsumers, topics.length, topics.length);
+
+ numConsumers = topics.length;
+ }
+
+ List<KafkaConsumer> notificationConsumers =
this.consumers.get(notificationType);
+
+ if (notificationConsumers == null) {
+ notificationConsumers = new ArrayList<>(numConsumers);
+
+ this.consumers.put(notificationType, notificationConsumers);
+ }
+
+ List<NotificationConsumer<T>> consumers = new ArrayList<>();
+ Properties consumerProperties =
getConsumerProperties(notificationType);
+
+ consumerProperties.put("enable.auto.commit", autoCommitEnabled);
- List<NotificationConsumer<T>> consumers =
Collections.singletonList(kafkaConsumer);
+ for (int i = 0; i < numConsumers; i++) {
+ KafkaConsumer existingConsumer = notificationConsumers.size() > i
? notificationConsumers.get(i) : null;
+ KafkaConsumer kafkaConsumer =
getOrCreateKafkaConsumer(existingConsumer, consumerProperties,
notificationType, i);
+
+ if (notificationConsumers.size() > i) {
+ notificationConsumers.set(i, kafkaConsumer);
+ } else {
+ notificationConsumers.add(kafkaConsumer);
+ }
+
+ consumers.add(new AtlasKafkaConsumer(notificationType,
kafkaConsumer, autoCommitEnabled, pollTimeOutMs));
+ }
LOG.info("<== KafkaNotification.createConsumers(notificationType={},
numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers,
autoCommitEnabled);
@@ -170,29 +213,33 @@ public class KafkaNotification extends
AbstractNotification implements Service {
public void close() {
LOG.info("==> KafkaNotification.close()");
- if (producer != null) {
- producer.close();
-
- producer = null;
+ for (KafkaProducer producer : producers.values()) {
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (Throwable t) {
+ LOG.error("failed to close Kafka producer. Ignoring", t);
+ }
+ }
}
+ producers.clear();
+
LOG.info("<== KafkaNotification.close()");
}
// ----- AbstractNotification --------------------------------------------
@Override
- public void sendInternal(NotificationType type, List<String> messages)
throws NotificationException {
- if (producer == null) {
- createProducer();
- }
+ public void sendInternal(NotificationType notificationType, List<String>
messages) throws NotificationException {
+ KafkaProducer producer = getOrCreateProducer(notificationType);
- sendInternalToProducer(producer, type, messages);
+ sendInternalToProducer(producer, notificationType, messages);
}
@VisibleForTesting
- void sendInternalToProducer(Producer p, NotificationType type,
List<String> messages) throws NotificationException {
- String topic = TOPIC_MAP.get(type);
+ void sendInternalToProducer(Producer p, NotificationType notificationType,
List<String> messages) throws NotificationException {
+ String topic =
PRODUCER_TOPIC_MAP.get(notificationType);
List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) {
@@ -229,53 +276,82 @@ public class KafkaNotification extends
AbstractNotification implements Service {
}
}
+ // Get properties for consumer request
+ @VisibleForTesting
+ public Properties getConsumerProperties(NotificationType notificationType)
{
+ // find the configured group id for the given notification type
+ String groupId =
properties.getProperty(notificationType.toString().toLowerCase() + "." +
CONSUMER_GROUP_ID_PROPERTY);
- public KafkaConsumer getKafkaConsumer(Properties consumerProperties,
NotificationType type, boolean autoCommitEnabled) {
- if (consumer == null || !isKafkaConsumerOpen(consumer)) {
- try {
- String topic = TOPIC_MAP.get(type);
+ if (StringUtils.isEmpty(groupId)) {
+ throw new IllegalStateException("No configuration group id set for
the notification type " + notificationType);
+ }
- consumerProperties.put("enable.auto.commit",
autoCommitEnabled);
+ Properties consumerProperties = new Properties();
+
+ consumerProperties.putAll(properties);
+ consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- this.consumer = new KafkaConsumer(consumerProperties);
+ return consumerProperties;
+ }
+
+ @VisibleForTesting
+ public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer
existingConsumer, Properties consumerProperties, NotificationType
notificationType, int idxConsumer) {
+ KafkaConsumer ret = existingConsumer;
- this.consumer.subscribe(Arrays.asList(topic));
- } catch (Exception ee) {
- LOG.error("Exception in getKafkaConsumer ", ee);
+ try {
+ if (ret == null || !isKafkaConsumerOpen(ret)) {
+ String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
+ String topic = topics[idxConsumer % topics.length];
+
+ LOG.debug("Creating new KafkaConsumer for topic : {}, index :
{}", topic, idxConsumer);
+
+ ret = new KafkaConsumer(consumerProperties);
+
+ ret.subscribe(Arrays.asList(topic));
}
+ } catch (Exception ee) {
+ LOG.error("Exception in getKafkaConsumer ", ee);
}
- return this.consumer;
+ return ret;
}
+ private KafkaProducer getOrCreateProducer(NotificationType
notificationType) {
+ LOG.debug("==> KafkaNotification.getOrCreateProducer()");
- @VisibleForTesting
- public
- // Get properties for consumer request
- Properties getConsumerProperties(NotificationType type) {
- // find the configured group id for the given notification type
- String groupId = properties.getProperty(type.toString().toLowerCase()
+ "." + CONSUMER_GROUP_ID_PROPERTY);
+ KafkaProducer ret = producers.get(notificationType);
- if (StringUtils.isEmpty(groupId)) {
- throw new IllegalStateException("No configuration group id set for
the notification type " + type);
- }
+ if (ret == null) {
+ synchronized (this) {
+ ret = producers.get(notificationType);
- Properties consumerProperties = new Properties();
+ if (ret == null) {
+ ret = new KafkaProducer(properties);
- consumerProperties.putAll(properties);
- consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ producers.put(notificationType, ret);
+ }
+ }
+ }
- return consumerProperties;
+ LOG.debug("<== KafkaNotification.getOrCreateProducer()");
+
+ return ret;
}
- private synchronized void createProducer() {
- LOG.info("==> KafkaNotification.createProducer()");
+ public static String[] trimAndPurge(String[] strings) {
+ List<String> ret = new ArrayList<>();
- if (producer == null) {
- producer = new KafkaProducer(properties);
+ if (strings != null) {
+ for (int i = 0; i < strings.length; i++) {
+ String str = StringUtils.trim(strings[i]);
+
+ if (StringUtils.isNotEmpty(str)) {
+ ret.add(str);
+ }
+ }
}
- LOG.info("<== KafkaNotification.createProducer()");
+ return ret.toArray(new String[ret.size()]);
}
private class MessageContext {
diff --git
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 847caa3..1af1f3e 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -54,7 +54,8 @@ import static org.testng.Assert.*;
public class KafkaConsumerTest {
private static final String TRAIT_NAME = "MyTrait";
- private final String ATLAS_HOOK_TOPIC =
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+ private static final String ATLAS_HOOK_TOPIC =
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+ private static final String[] ATLAS_HOOK_CONSUMER_TOPICS =
KafkaNotification.trimAndPurge(AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC));
@Mock
@@ -67,11 +68,25 @@ public class KafkaConsumerTest {
@Test
public void testReceive() throws Exception {
- Referenceable entity = getEntity(TRAIT_NAME);
- EntityUpdateRequest message = new
EntityUpdateRequest("user1", entity);
+ for (String topic : ATLAS_HOOK_CONSUMER_TOPICS) {
+ String traitName = TRAIT_NAME + "_" + topic;
+ Referenceable entity = getEntity(traitName);
+ EntityUpdateRequest message = new EntityUpdateRequest("user1",
entity);
+ List<AtlasKafkaMessage<HookNotification>> messageList =
testReceiveHelper(message, topic);
+ assertTrue(messageList.size() > 0);
+
+ HookNotification consumedMessage = messageList.get(0).getMessage();
+
+ assertMessagesEqual(message, consumedMessage, entity);
+ }
+ }
+
+
+ private List<AtlasKafkaMessage<HookNotification>>
testReceiveHelper(EntityUpdateRequest message, String topic) throws Exception {
+
String json = AtlasType.toV1Json(new
AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
- TopicPartition tp = new
TopicPartition(ATLAS_HOOK_TOPIC, 0);
- List<ConsumerRecord<String, String>> klist =
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L,
"mykey", json));
+ TopicPartition tp = new
TopicPartition(topic, 0);
+ List<ConsumerRecord<String, String>> klist =
Collections.singletonList(new ConsumerRecord<>(topic, 0, 0L, "mykey", json));
Map mp =
Collections.singletonMap(tp, klist);
ConsumerRecords records = new ConsumerRecords(mp);
@@ -81,12 +96,7 @@ public class KafkaConsumerTest {
AtlasKafkaConsumer consumer = new
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
List<AtlasKafkaMessage<HookNotification>> messageList =
consumer.receive();
-
- assertTrue(messageList.size() > 0);
-
- HookNotification consumedMessage = messageList.get(0).getMessage();
-
- assertMessagesEqual(message, consumedMessage, entity);
+ return messageList;
}
@Test
diff --git
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 263903b..9b5891f 100644
---
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -74,7 +74,7 @@ public class KafkaNotificationMockTest {
KafkaNotification kafkaNotification = new
KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
- String topicName =
kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+ String topicName =
kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
@@ -96,7 +96,7 @@ public class KafkaNotificationMockTest {
KafkaNotification kafkaNotification = new
KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
- String topicName =
kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+ String topicName =
kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenThrow(new RuntimeException("Simulating
exception"));
@@ -121,7 +121,7 @@ public class KafkaNotificationMockTest {
KafkaNotification kafkaNotification = new
KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
- String topicName =
kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+ String topicName =
kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message1 = "This is a test message1";
String message2 = "This is a test message2";
Future returnValue1 = mock(Future.class);
diff --git
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index c963830..05d0d81 100644
---
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -183,6 +183,8 @@ public class AbstractNotificationConsumerTest {
}
private static class TestNotificationConsumer extends
AbstractNotificationConsumer<TestMessage> {
+ private static final String TEST_TOPIC_NAME = "TEST_TOPIC";
+
private final List<TestMessage> messageList;
private int index = 0;
@@ -217,7 +219,7 @@ public class AbstractNotificationConsumerTest {
public List<AtlasKafkaMessage<TestMessage>> receive(long
timeoutMilliSeconds) {
List<AtlasKafkaMessage<TestMessage>> tempMessageList = new
ArrayList();
for(Object json : messageList) {
- tempMessageList.add(new
AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1));
+ tempMessageList.add(new
AtlasKafkaMessage(deserializer.deserialize((String) json), -1, TEST_TOPIC_NAME,
-1));
}
return tempMessageList;
}
diff --git
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
index d5a4412..10319d0 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
@@ -24,6 +24,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
@@ -87,10 +88,10 @@ public class AtlasMetricsCounter {
}
}
- public Stats report() {
+ public StatsReport report() {
updateForTime(clock.instant());
- return new Stats(stats, dayStartTime.toEpochMilli(),
hourStartTime.toEpochMilli());
+ return new StatsReport(stats, dayStartTime.toEpochMilli(),
hourStartTime.toEpochMilli());
}
// visible only for testing
@@ -179,16 +180,15 @@ public class AtlasMetricsCounter {
return LocalDateTime.of(time.toLocalDate().plusDays(1),
LocalTime.MIN).toInstant(ZoneOffset.UTC);
}
-
public static class Stats {
private static final int NUM_PERIOD = Period.values().length;
- private final long dayStartTimeMs;
- private final long hourStartTimeMs;
- private final long[] count = new long[NUM_PERIOD];
- private final long[] measureSum = new long[NUM_PERIOD];
- private final long[] measureMin = new long[NUM_PERIOD];
- private final long[] measureMax = new long[NUM_PERIOD];
+ private final long dayStartTimeMs;
+ private final long hourStartTimeMs;
+ private final AtomicLong[] count = new
AtomicLong[NUM_PERIOD];
+ private final AtomicLong[] measureSum = new
AtomicLong[NUM_PERIOD];
+ private final AtomicLong[] measureMin = new
AtomicLong[NUM_PERIOD];
+ private final AtomicLong[] measureMax = new
AtomicLong[NUM_PERIOD];
public Stats() {
@@ -200,7 +200,57 @@ public class AtlasMetricsCounter {
}
}
- public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
+ public void addCount(Period period, long num) {
+ count[period.ordinal()].addAndGet(num);
+ }
+
+ public void addMeasure(Period period, long measure) {
+ int idx = period.ordinal();
+
+ measureSum[idx].addAndGet(measure);
+
+ if (measureMin[idx].get() > measure) {
+ measureMin[idx].set(measure);
+ }
+
+ if (measureMax[idx].get() < measure) {
+ measureMax[idx].set(measure);
+ }
+ }
+
+ private void copy(Period src, Period dest) {
+ int srcIdx = src.ordinal();
+ int destIdx = dest.ordinal();
+
+ count[destIdx].set(count[srcIdx].get());
+ measureSum[destIdx].set(measureSum[srcIdx].get());
+ measureMin[destIdx].set(measureMin[srcIdx].get());
+ measureMax[destIdx].set( measureMax[srcIdx].get());
+ }
+
+ private void reset(Period period) {
+ int idx = period.ordinal();
+
+ count[idx] = new AtomicLong(0);
+ measureSum[idx] = new AtomicLong(0);
+ measureMin[idx] = new AtomicLong(Long.MAX_VALUE);
+ measureMax[idx] = new AtomicLong(Long.MIN_VALUE);
+ }
+
+ }
+
+ public static class StatsReport {
+ private static final int NUM_PERIOD = Period.values().length;
+
+ private final long dayStartTimeMs;
+ private final long hourStartTimeMs;
+ private final long[] count = new long[NUM_PERIOD];
+ private final long[] measureSum = new long[NUM_PERIOD];
+ private final long[] measureMin = new long[NUM_PERIOD];
+ private final long[] measureMax = new long[NUM_PERIOD];
+
+
+ public StatsReport(Stats other, long dayStartTimeMs, long
hourStartTimeMs) {
this.dayStartTimeMs = dayStartTimeMs;
this.hourStartTimeMs = hourStartTimeMs;
@@ -229,46 +279,9 @@ public class AtlasMetricsCounter {
return c != 0 ? (measureSum[idx] / c) : 0;
}
- public void addCount(Period period, long num) {
- count[period.ordinal()] += num;
- }
-
- public void addMeasure(Period period, long measure) {
- int idx = period.ordinal();
-
- measureSum[idx] += measure;
-
- if (measureMin[idx] > measure) {
- measureMin[idx] = measure;
- }
-
- if (measureMax[idx] < measure) {
- measureMax[idx] = measure;
- }
- }
-
- private void copy(Period src, Period dest) {
- int srcIdx = src.ordinal();
- int destIdx = dest.ordinal();
-
- count[destIdx] = count[srcIdx];
- measureSum[destIdx] = measureSum[srcIdx];
- measureMin[destIdx] = measureMin[srcIdx];
- measureMax[destIdx] = measureMax[srcIdx];
- }
-
- private void reset(Period period) {
- int idx = period.ordinal();
-
- count[idx] = 0;
- measureSum[idx] = 0;
- measureMin[idx] = Long.MAX_VALUE;
- measureMax[idx] = Long.MIN_VALUE;
- }
-
- private void copy(long[] src, long[] dest) {
+ private void copy(AtomicLong[] src, long[] dest) {
for (int i = 0; i < dest.length; i++) {
- dest[i] = src[i];
+ dest[i] = src[i].get();
}
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index f658caa..2c78cbc 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -21,7 +21,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
-import org.apache.atlas.util.AtlasMetricsCounter.Stats;
+import org.apache.atlas.util.AtlasMetricsCounter.StatsReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,16 +50,15 @@ public class AtlasMetricsUtil {
private static final String STATUS_CONNECTED = "connected";
private static final String STATUS_NOT_CONNECTED = "not-connected";
- private final AtlasGraph graph;
- private long serverStartTime = 0;
- private long serverActiveTime = 0;
- private long msgOffsetStart = -1;
- private long msgOffsetCurrent = 0;
- private final AtlasMetricsCounter messagesProcessed = new
AtlasMetricsCounter("messagesProcessed");
- private final AtlasMetricsCounter messagesFailed = new
AtlasMetricsCounter("messagesFailed");
- private final AtlasMetricsCounter entityCreates = new
AtlasMetricsCounter("entityCreates");
- private final AtlasMetricsCounter entityUpdates = new
AtlasMetricsCounter("entityUpdates");
- private final AtlasMetricsCounter entityDeletes = new
AtlasMetricsCounter("entityDeletes");
+ private final AtlasGraph graph;
+ private long serverStartTime = 0;
+ private long serverActiveTime = 0;
+ private final Map<String, TopicStats> topicStats = new HashMap<>();
+ private final AtlasMetricsCounter messagesProcessed = new
AtlasMetricsCounter("messagesProcessed");
+ private final AtlasMetricsCounter messagesFailed = new
AtlasMetricsCounter("messagesFailed");
+ private final AtlasMetricsCounter entityCreates = new
AtlasMetricsCounter("entityCreates");
+ private final AtlasMetricsCounter entityUpdates = new
AtlasMetricsCounter("entityUpdates");
+ private final AtlasMetricsCounter entityDeletes = new
AtlasMetricsCounter("entityDeletes");
@Inject
public AtlasMetricsUtil(AtlasGraph graph) {
@@ -83,7 +82,7 @@ public class AtlasMetricsUtil {
serverActiveTime = System.currentTimeMillis();
}
- public void onNotificationProcessingComplete(long msgOffset,
NotificationStat stats) {
+ public void onNotificationProcessingComplete(String topicName, int
partition, long msgOffset, NotificationStat stats) {
messagesProcessed.incrWithMeasure(stats.timeTakenMs);
entityCreates.incrBy(stats.entityCreates);
entityUpdates.incrBy(stats.entityUpdates);
@@ -93,21 +92,33 @@ public class AtlasMetricsUtil {
messagesFailed.incr();
}
- if (msgOffsetStart == -1) {
- msgOffsetStart = msgOffset;
+ TopicStats topicStat = topicStats.get(topicName);
+
+ if (topicStat == null) {
+ topicStat = new TopicStats(topicName);
+
+ topicStats.put(topicName, topicStat);
}
- msgOffsetCurrent = ++msgOffset;
+ TopicPartitionStat partitionStat = topicStat.get(partition);
+
+ if (partitionStat == null) {
+ partitionStat = new TopicPartitionStat(topicName, partition,
msgOffset, msgOffset);
+
+ topicStat.set(partition, partitionStat);
+ }
+
+ partitionStat.setCurrentOffset(msgOffset + 1);
}
public Map<String, Object> getStats() {
Map<String, Object> ret = new HashMap<>();
- Stats messagesProcessed = this.messagesProcessed.report();
- Stats messagesFailed = this.messagesFailed.report();
- Stats entityCreates = this.entityCreates.report();
- Stats entityUpdates = this.entityUpdates.report();
- Stats entityDeletes = this.entityDeletes.report();
+ StatsReport messagesProcessed = this.messagesProcessed.report();
+ StatsReport messagesFailed = this.messagesFailed.report();
+ StatsReport entityCreates = this.entityCreates.report();
+ StatsReport entityUpdates = this.entityUpdates.report();
+ StatsReport entityDeletes = this.entityDeletes.report();
ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime);
ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime);
@@ -115,8 +126,20 @@ public class AtlasMetricsUtil {
ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ?
STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ?
STATUS_CONNECTED : STATUS_NOT_CONNECTED);
- ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart);
- ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent);
+ Map<String, Map<String, Long>> topicOffsets = new HashMap<>();
+
+ for (TopicStats tStat : topicStats.values()) {
+ for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
+ Map<String, Long> tpOffsets = new HashMap<>();
+
+ tpOffsets.put("offsetStart", tpStat.startOffset);
+ tpOffsets.put("offsetCurrent", tpStat.currentOffset);
+
+ topicOffsets.put(tpStat.topicName + "-" + tpStat.partition,
tpOffsets);
+ }
+ }
+
+ ret.put(STAT_NOTIFY_TOPIC_OFFSETS, topicOffsets);
ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME,
this.messagesProcessed.getLastIncrTime().toEpochMilli());
ret.put(STAT_NOTIFY_COUNT_TOTAL,
messagesProcessed.getCount(ALL));
@@ -297,4 +320,58 @@ public class AtlasMetricsUtil {
return collection != null ? collection.size() : 0;
}
}
+
+ class TopicStats {
+ private final String topicName;
+ private final Map<Integer, TopicPartitionStat> partitionStats = new
HashMap<>();
+
+ public TopicStats(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public String getTopicName() { return topicName; }
+
+ public Map<Integer, TopicPartitionStat> getPartitionStats() { return
partitionStats; }
+
+ public TopicPartitionStat get(Integer partition) { return
partitionStats.get(partition); }
+
+ public void set(Integer partition, TopicPartitionStat partitionStat) {
+ partitionStats.put(partition, partitionStat);
+ }
+ }
+
+ class TopicPartitionStat {
+ private final String topicName;
+ private final int partition;
+ private final long startOffset;
+ private long currentOffset;
+
+ public TopicPartitionStat(String topicName, int partition, long
startOffset, long currentOffset) {
+ this.topicName = topicName;
+ this.partition = partition;
+ this.startOffset = startOffset;
+ this.currentOffset = currentOffset;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getCurrentOffset() {
+ return currentOffset;
+ }
+
+ public void setCurrentOffset(long currentOffset) {
+ this.currentOffset = currentOffset;
+ }
+
+ };
}
diff --git
a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index b56019e..b2f2633 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -227,10 +227,10 @@ public class MetricsServiceTest {
private void processMessage(Instant instant) {
clock.setInstant(instant);
- metricsUtil.onNotificationProcessingComplete(++msgOffset, new
AtlasMetricsUtil.NotificationStat(true, 1));
+ metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0,
++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1));
for (int i = 0; i < 10; i++) {
- metricsUtil.onNotificationProcessingComplete(msgOffset++, new
AtlasMetricsUtil.NotificationStat(false, 1));
+ metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0,
msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1));
}
clock.setInstant(null);
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1f8e810..1c8d72b 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,7 +23,6 @@ import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
@@ -118,7 +117,6 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String THREADNAME_PREFIX =
NotificationHookConsumer.class.getSimpleName();
- private static final String ATLAS_HOOK_TOPIC =
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String CONSUMER_THREADS_PROPERTY =
"atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY =
"atlas.notification.hook.maxretries";
@@ -701,7 +699,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
stats.timeTakenMs = System.currentTimeMillis() - startTime;
-
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats);
+
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(),
kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs)
{
String strMessage =
AbstractNotification.getMessageJson(message);
@@ -785,9 +783,8 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
try {
recordFailedMessages();
- TopicPartition partition = new
TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition());
+ consumer.commit(kafkaMessage.getTopicPartition(),
kafkaMessage.getOffset() + 1);
- consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true;
} finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus,
kafkaMessage.getOffset());
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index fb3ff26..b1b0e9f 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -182,7 +182,10 @@ public class NotificationHookConsumerKafkaTest {
ExceptionThrowingCommitConsumer
createNewConsumerThatThrowsExceptionInCommit(KafkaNotification
kafkaNotification, boolean autoCommitEnabled) {
Properties prop =
kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
- KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop,
NotificationInterface.NotificationType.HOOK, true);
+
+ prop.put("enable.auto.commit", autoCommitEnabled);
+
+ KafkaConsumer consumer =
kafkaNotification.getOrCreateKafkaConsumer(null, prop,
NotificationInterface.NotificationType.HOOK, 0);
return new
ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK,
consumer, autoCommitEnabled, 1000);
}
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 3e35511..ece46a4 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import
org.apache.atlas.model.notification.HookNotification.HookNotificationType;
@@ -136,7 +137,7 @@ public class NotificationHookConsumerTest {
when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE);
when(message.getEntities()).thenReturn(Arrays.asList(mock));
- hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+ hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1,
KafkaNotification.ATLAS_HOOK_TOPIC, -1));
verify(consumer).commit(any(TopicPartition.class), anyInt());
}
@@ -150,7 +151,7 @@ public class NotificationHookConsumerTest {
when(atlasEntityStore.createOrUpdate(any(EntityStream.class),
anyBoolean())).thenThrow(new RuntimeException("Simulating exception in
processing message"));
- hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+ hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1,
KafkaNotification.ATLAS_HOOK_TOPIC, -1));
verifyZeroInteractions(consumer);
}