This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 495110cf5 ATLAS-5154: update notification processing metrics to
include avgProcessingTime per topic-partition (#468)
495110cf5 is described below
commit 495110cf52846c54833bbe8ce322099099cf14e6
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Tue Oct 14 12:02:46 2025 -0700
ATLAS-5154: update notification processing metrics to include
avgProcessingTime per topic-partition (#468)
---
.../org/apache/atlas/util/AtlasMetricsUtil.java | 52 +++++++++++++---------
.../notification/NotificationHookConsumer.java | 6 +--
2 files changed, 35 insertions(+), 23 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index f04592451..4172abe9a 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.atlas.model.metrics.AtlasMetrics.STAT_NOTIFY_AVG_TIME_CURR_DAY;
import static
org.apache.atlas.model.metrics.AtlasMetrics.STAT_NOTIFY_AVG_TIME_CURR_HOUR;
@@ -162,7 +163,7 @@ public class AtlasMetricsUtil {
partitionStat.incrFailedMessageCount();
}
- partitionStat.incrProcessedMessageCount();
+ partitionStat.incrProcessedMessageCount(stats.timeTakenMs);
partitionStat.setLastMessageProcessedTime(messagesProcessed.getLastIncrTime().toEpochMilli());
}
@@ -187,15 +188,18 @@ public class AtlasMetricsUtil {
for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
Map<String, Long> tpDetails = new HashMap<>();
- tpDetails.put("offsetStart", tpStat.startOffset);
- tpDetails.put("offsetCurrent", tpStat.currentOffset);
- tpDetails.put("failedMessageCount", tpStat.failedMessageCount);
- tpDetails.put("lastMessageProcessedTime",
tpStat.lastMessageProcessedTime);
- tpDetails.put("processedMessageCount",
tpStat.processedMessageCount);
+ tpDetails.put("offsetStart", tpStat.getStartOffset());
+ tpDetails.put("offsetCurrent", tpStat.getCurrentOffset());
+ tpDetails.put("failedMessageCount",
tpStat.getFailedMessageCount());
+ tpDetails.put("lastMessageProcessedTime",
tpStat.getLastMessageProcessedTime());
+ tpDetails.put("processedMessageCount",
tpStat.getProcessedMessageCount());
+ tpDetails.put("avgProcessingTime",
tpStat.getAvgProcessingTime());
- LOG.debug("Setting failedMessageCount : {} and
lastMessageProcessedTime : {} for topic {}-{}", tpStat.failedMessageCount,
tpStat.lastMessageProcessedTime, tpStat.topicName, tpStat.partition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting failedMessageCount : {} and
lastMessageProcessedTime : {} for topic {}-{}", tpStat.getFailedMessageCount(),
tpStat.getLastMessageProcessedTime(), tpStat.getTopicName(),
tpStat.getPartition());
+ }
- topicDetails.put(tpStat.topicName + "-" + tpStat.partition,
tpDetails);
+ topicDetails.put(tpStat.getTopicName() + "-" +
tpStat.getPartition(), tpDetails);
}
}
@@ -418,13 +422,14 @@ public class AtlasMetricsUtil {
}
static class TopicPartitionStat {
- private final String topicName;
- private final int partition;
- private final long startOffset;
- private long currentOffset;
- private long lastMessageProcessedTime;
- private long failedMessageCount;
- private long processedMessageCount;
+ private final String topicName;
+ private final int partition;
+ private final long startOffset;
+ private long currentOffset;
+ private long lastMessageProcessedTime;
+ private final AtomicLong failedMessageCount = new AtomicLong();
+ private final AtomicLong processedMessageCount = new AtomicLong();
+ private final AtomicLong totalProcessingTimeMs = new AtomicLong();
public TopicPartitionStat(String topicName, int partition, long
startOffset, long currentOffset) {
this.topicName = topicName;
@@ -462,19 +467,26 @@ public class AtlasMetricsUtil {
}
public long getFailedMessageCount() {
- return failedMessageCount;
+ return failedMessageCount.get();
}
public void incrFailedMessageCount() {
- this.failedMessageCount++;
+ this.failedMessageCount.incrementAndGet();
}
public long getProcessedMessageCount() {
- return processedMessageCount;
+ return processedMessageCount.get();
}
- public void incrProcessedMessageCount() {
- this.processedMessageCount++;
+ public void incrProcessedMessageCount(long timeTakenMs) {
+ this.processedMessageCount.incrementAndGet();
+ this.totalProcessingTimeMs.addAndGet(timeTakenMs);
+ }
+
+ public long getAvgProcessingTime() {
+ long processedMessageCount = this.processedMessageCount.get();
+
+ return processedMessageCount == 0 ? 0 :
(totalProcessingTimeMs.get() / processedMessageCount);
}
}
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index baac18939..fbde605ae 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -708,7 +708,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
}
if (entities.size() - count > 0) {
- LOG.info("preprocess: moved {}
hive_process/hive_column_lineage entities to end of list (listSize={}).
topic-offset={}, partition={}", entities.size() - count, entities.size(),
kafkaMsg.getOffset(), kafkaMsg.getPartition());
+ LOG.info("preprocess: moved {}
hive_process/hive_column_lineage entities to end of list (listSize={}).
topic={}, partition={}, offset={}", entities.size() - count, entities.size(),
kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset());
}
}
}
@@ -1555,9 +1555,9 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
try {
String strMessage =
AbstractNotification.getMessageJson(message);
- LOG.warn("msgProcessingTime={}, msgSize={},
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
+ LOG.warn("msgProcessingTime={}, msgSize={}, topic={},
partition={}, offset={}}", stats.timeTakenMs, strMessage.length(),
kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset());
-
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topic\":{},\"partition\":{},\"topicOffset\":{},\"data\":{}}",
stats.timeTakenMs, strMessage.length(), kafkaMsg.getTopic(),
kafkaMsg.getPartition(), kafkaMsg.getOffset(), strMessage);
} catch (Throwable t) {
LOG.warn("error while recording large message:
msgProcessingTime={}, type={}, topic={}, partition={}, offset={}",
stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(),
kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
}