This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit c788d00145ee5b7eb0b967d3e99d3fde46256c2b Author: Madhan Neethiraj <[email protected]> AuthorDate: Tue Oct 14 12:02:46 2025 -0700 ATLAS-5154: update notification processing metrics to include avgProcessingTime per topic-partition (#468) (cherry picked from commit 495110cf52846c54833bbe8ce322099099cf14e6) --- .../org/apache/atlas/util/AtlasMetricsUtil.java | 52 +++++++++++++--------- .../notification/NotificationHookConsumer.java | 6 +-- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java index f04592451..4172abe9a 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.atlas.model.metrics.AtlasMetrics.STAT_NOTIFY_AVG_TIME_CURR_DAY; import static org.apache.atlas.model.metrics.AtlasMetrics.STAT_NOTIFY_AVG_TIME_CURR_HOUR; @@ -162,7 +163,7 @@ public class AtlasMetricsUtil { partitionStat.incrFailedMessageCount(); } - partitionStat.incrProcessedMessageCount(); + partitionStat.incrProcessedMessageCount(stats.timeTakenMs); partitionStat.setLastMessageProcessedTime(messagesProcessed.getLastIncrTime().toEpochMilli()); } @@ -187,15 +188,18 @@ public class AtlasMetricsUtil { for (TopicPartitionStat tpStat : tStat.partitionStats.values()) { Map<String, Long> tpDetails = new HashMap<>(); - tpDetails.put("offsetStart", tpStat.startOffset); - tpDetails.put("offsetCurrent", tpStat.currentOffset); - tpDetails.put("failedMessageCount", tpStat.failedMessageCount); - tpDetails.put("lastMessageProcessedTime", tpStat.lastMessageProcessedTime); - tpDetails.put("processedMessageCount", tpStat.processedMessageCount); + tpDetails.put("offsetStart", tpStat.getStartOffset()); + tpDetails.put("offsetCurrent", tpStat.getCurrentOffset()); + tpDetails.put("failedMessageCount", tpStat.getFailedMessageCount()); + tpDetails.put("lastMessageProcessedTime", tpStat.getLastMessageProcessedTime()); + tpDetails.put("processedMessageCount", tpStat.getProcessedMessageCount()); + tpDetails.put("avgProcessingTime", tpStat.getAvgProcessingTime()); - LOG.debug("Setting failedMessageCount : {} and lastMessageProcessedTime : {} for topic {}-{}", tpStat.failedMessageCount, tpStat.lastMessageProcessedTime, tpStat.topicName, tpStat.partition); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting failedMessageCount : {} and lastMessageProcessedTime : {} for topic {}-{}", tpStat.getFailedMessageCount(), tpStat.getLastMessageProcessedTime(), tpStat.getTopicName(), tpStat.getPartition()); + } - topicDetails.put(tpStat.topicName + "-" + tpStat.partition, tpDetails); + topicDetails.put(tpStat.getTopicName() + "-" + tpStat.getPartition(), tpDetails); } } @@ -418,13 +422,14 @@ public class AtlasMetricsUtil { } static class TopicPartitionStat { - private final String topicName; - private final int partition; - private final long startOffset; - private long currentOffset; - private long lastMessageProcessedTime; - private long failedMessageCount; - private long processedMessageCount; + private final String topicName; + private final int partition; + private final long startOffset; + private long currentOffset; + private long lastMessageProcessedTime; + private final AtomicLong failedMessageCount = new AtomicLong(); + private final AtomicLong processedMessageCount = new AtomicLong(); + private final AtomicLong totalProcessingTimeMs = new AtomicLong(); public TopicPartitionStat(String topicName, int partition, long startOffset, long currentOffset) { this.topicName = topicName; @@ -462,19 +467,26 @@ public class AtlasMetricsUtil { } public long getFailedMessageCount() { - return failedMessageCount; + return failedMessageCount.get(); } public void incrFailedMessageCount() { - this.failedMessageCount++; + this.failedMessageCount.incrementAndGet(); } public long getProcessedMessageCount() { - return processedMessageCount; + return processedMessageCount.get(); } - public void incrProcessedMessageCount() { - this.processedMessageCount++; + public void incrProcessedMessageCount(long timeTakenMs) { + this.processedMessageCount.incrementAndGet(); + this.totalProcessingTimeMs.addAndGet(timeTakenMs); + } + + public long getAvgProcessingTime() { + long processedMessageCount = this.processedMessageCount.get(); + + return processedMessageCount == 0 ? 0 : (totalProcessingTimeMs.get() / processedMessageCount); } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 001e57579..642bb0bf1 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -701,7 +701,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (entities.size() - count > 0) { - LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic-offset={}, partition={}", entities.size() - count, entities.size(), kafkaMsg.getOffset(), kafkaMsg.getPartition()); + LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic={}, partition={}, offset={}", entities.size() - count, entities.size(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset()); } } } @@ -1533,9 +1533,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { String strMessage = AbstractNotification.getMessageJson(message); - LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset()); + LOG.warn("msgProcessingTime={}, msgSize={}, topic={}, partition={}, offset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset()); - LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage); + LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topic\":{},\"partition\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), strMessage); } catch (Throwable t) { LOG.warn("error while recording large message: msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t); }
