This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 495110cf5 ATLAS-5154: update notification processing metrics to 
include avgProcessingTime per topic-partition (#468)
495110cf5 is described below

commit 495110cf52846c54833bbe8ce322099099cf14e6
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Tue Oct 14 12:02:46 2025 -0700

    ATLAS-5154: update notification processing metrics to include 
avgProcessingTime per topic-partition (#468)
---
 .../org/apache/atlas/util/AtlasMetricsUtil.java    | 52 +++++++++++++---------
 .../notification/NotificationHookConsumer.java     |  6 +--
 2 files changed, 35 insertions(+), 23 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index f04592451..4172abe9a 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.atlas.model.metrics.AtlasMetrics.STAT_NOTIFY_AVG_TIME_CURR_DAY;
 import static 
org.apache.atlas.model.metrics.AtlasMetrics.STAT_NOTIFY_AVG_TIME_CURR_HOUR;
@@ -162,7 +163,7 @@ public class AtlasMetricsUtil {
             partitionStat.incrFailedMessageCount();
         }
 
-        partitionStat.incrProcessedMessageCount();
+        partitionStat.incrProcessedMessageCount(stats.timeTakenMs);
         
partitionStat.setLastMessageProcessedTime(messagesProcessed.getLastIncrTime().toEpochMilli());
     }
 
@@ -187,15 +188,18 @@ public class AtlasMetricsUtil {
             for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
                 Map<String, Long> tpDetails = new HashMap<>();
 
-                tpDetails.put("offsetStart", tpStat.startOffset);
-                tpDetails.put("offsetCurrent", tpStat.currentOffset);
-                tpDetails.put("failedMessageCount", tpStat.failedMessageCount);
-                tpDetails.put("lastMessageProcessedTime", 
tpStat.lastMessageProcessedTime);
-                tpDetails.put("processedMessageCount", 
tpStat.processedMessageCount);
+                tpDetails.put("offsetStart", tpStat.getStartOffset());
+                tpDetails.put("offsetCurrent", tpStat.getCurrentOffset());
+                tpDetails.put("failedMessageCount", 
tpStat.getFailedMessageCount());
+                tpDetails.put("lastMessageProcessedTime", 
tpStat.getLastMessageProcessedTime());
+                tpDetails.put("processedMessageCount", 
tpStat.getProcessedMessageCount());
+                tpDetails.put("avgProcessingTime", 
tpStat.getAvgProcessingTime());
 
-                LOG.debug("Setting failedMessageCount : {} and 
lastMessageProcessedTime : {} for topic {}-{}", tpStat.failedMessageCount, 
tpStat.lastMessageProcessedTime, tpStat.topicName, tpStat.partition);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Setting failedMessageCount : {} and 
lastMessageProcessedTime : {} for topic {}-{}", tpStat.getFailedMessageCount(), 
tpStat.getLastMessageProcessedTime(), tpStat.getTopicName(), 
tpStat.getPartition());
+                }
 
-                topicDetails.put(tpStat.topicName + "-" + tpStat.partition, 
tpDetails);
+                topicDetails.put(tpStat.getTopicName() + "-" + 
tpStat.getPartition(), tpDetails);
             }
         }
 
@@ -418,13 +422,14 @@ public class AtlasMetricsUtil {
     }
 
     static class TopicPartitionStat {
-        private final String topicName;
-        private final int    partition;
-        private final long   startOffset;
-        private       long   currentOffset;
-        private       long   lastMessageProcessedTime;
-        private       long   failedMessageCount;
-        private       long   processedMessageCount;
+        private final String     topicName;
+        private final int        partition;
+        private final long       startOffset;
+        private       long       currentOffset;
+        private       long       lastMessageProcessedTime;
+        private final AtomicLong failedMessageCount    = new AtomicLong();
+        private final AtomicLong processedMessageCount = new AtomicLong();
+        private final AtomicLong totalProcessingTimeMs = new AtomicLong();
 
         public TopicPartitionStat(String topicName, int partition, long 
startOffset, long currentOffset) {
             this.topicName     = topicName;
@@ -462,19 +467,26 @@ public class AtlasMetricsUtil {
         }
 
         public long getFailedMessageCount() {
-            return failedMessageCount;
+            return failedMessageCount.get();
         }
 
         public void incrFailedMessageCount() {
-            this.failedMessageCount++;
+            this.failedMessageCount.incrementAndGet();
         }
 
         public long getProcessedMessageCount() {
-            return processedMessageCount;
+            return processedMessageCount.get();
         }
 
-        public void incrProcessedMessageCount() {
-            this.processedMessageCount++;
+        public void incrProcessedMessageCount(long timeTakenMs) {
+            this.processedMessageCount.incrementAndGet();
+            this.totalProcessingTimeMs.addAndGet(timeTakenMs);
+        }
+
+        public long getAvgProcessingTime() {
+            long processedMessageCount = this.processedMessageCount.get();
+
+            return processedMessageCount == 0 ? 0 : 
(totalProcessingTimeMs.get() / processedMessageCount);
         }
     }
 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index baac18939..fbde605ae 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -708,7 +708,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 }
 
                 if (entities.size() - count > 0) {
-                    LOG.info("preprocess: moved {} 
hive_process/hive_column_lineage entities to end of list (listSize={}). 
topic-offset={}, partition={}", entities.size() - count, entities.size(), 
kafkaMsg.getOffset(), kafkaMsg.getPartition());
+                    LOG.info("preprocess: moved {} 
hive_process/hive_column_lineage entities to end of list (listSize={}). 
topic={}, partition={}, offset={}", entities.size() - count, entities.size(), 
kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset());
                 }
             }
         }
@@ -1555,9 +1555,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                     try {
                         String strMessage = 
AbstractNotification.getMessageJson(message);
 
-                        LOG.warn("msgProcessingTime={}, msgSize={}, 
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
+                        LOG.warn("msgProcessingTime={}, msgSize={}, topic={}, 
partition={}, offset={}}", stats.timeTakenMs, strMessage.length(), 
kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset());
 
-                        
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
 stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+                        
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topic\":{},\"partition\":{},\"topicOffset\":{},\"data\":{}}",
 stats.timeTakenMs, strMessage.length(), kafkaMsg.getTopic(), 
kafkaMsg.getPartition(), kafkaMsg.getOffset(), strMessage);
                     } catch (Throwable t) {
                         LOG.warn("error while recording large message: 
msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", 
stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), 
kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
                     }

Reply via email to