This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 6d71f14 ATLAS-3344-Atlas Metrics to include topic partition wise
additional information.
6d71f14 is described below
commit 6d71f14a6e099ce8f4e2a2f16d8e5d5bc4579995
Author: Saqeeb Shaikh <[email protected]>
AuthorDate: Mon Jul 22 17:58:08 2019 +0530
ATLAS-3344-Atlas Metrics to include topic partition wise additional
information.
Signed-off-by: nixonrodrigues <[email protected]>
---
.../site/Statistics_Topic_Offset_table_tmpl.html | 8 +++--
.../public/js/templates/site/Statistics_tmpl.html | 15 ++------
dashboardv2/public/js/utils/Enums.js | 4 ++-
dashboardv2/public/js/views/site/Statistics.js | 23 ++++---------
.../apache/atlas/model/metrics/AtlasMetrics.java | 2 +-
.../org/apache/atlas/util/AtlasMetricsUtil.java | 40 +++++++++++++++++-----
6 files changed, 49 insertions(+), 43 deletions(-)
diff --git
a/dashboardv2/public/js/templates/site/Statistics_Topic_Offset_table_tmpl.html
b/dashboardv2/public/js/templates/site/Statistics_Topic_Offset_table_tmpl.html
index 4693edf..b4c8d86 100644
---
a/dashboardv2/public/js/templates/site/Statistics_Topic_Offset_table_tmpl.html
+++
b/dashboardv2/public/js/templates/site/Statistics_Topic_Offset_table_tmpl.html
@@ -16,10 +16,12 @@
-->
<thead>
<tr>
- <th>Kafka Topic</th>
- <th>Current Offset</th>
+ <th>Kafka Topic-Partition</th>
<th>Start Offset</th>
-
+ <th>Current Offset</th>
+ <th>Processed</th>
+ <th>Failed</th>
+ <th>Last Message Processed Time</th>
</tr>
</thead>
{{#if data}}
diff --git a/dashboardv2/public/js/templates/site/Statistics_tmpl.html
b/dashboardv2/public/js/templates/site/Statistics_tmpl.html
index ecd84d6..436767c 100644
--- a/dashboardv2/public/js/templates/site/Statistics_tmpl.html
+++ b/dashboardv2/public/js/templates/site/Statistics_tmpl.html
@@ -65,26 +65,16 @@
<div class="card-container panel panel-primary">
<div class="panel-heading">Notification Details</div>
<div class="panel-body">
- <table class="table stat-table">
- <tbody data-id="notification-small-card">
- <tr class="empty text-center">
- <td colspan="2"><span>No records
found!</span></td>
- </tr>
- </tbody>
- </table>
- <hr>
- </hr>
- <table data-id="notification-card" class="table
stat-table notification-table table-striped ">
+ <table data-id="offset-card" class="table
stat-table notification-table table-striped ">
<tbody>
<tr class="empty text-center">
<td colspan="2"><span>No records
found!</span></td>
</tr>
</tbody>
</table>
-
<hr>
</hr>
- <table data-id="offset-card" class="table
stat-table notification-table table-striped ">
+ <table data-id="notification-card" class="table
stat-table notification-table table-striped ">
<tbody>
<tr class="empty text-center">
<td colspan="2"><span>No records
found!</span></td>
@@ -94,7 +84,6 @@
</div>
</div>
</div>
-
</div>
</div>
</div>
diff --git a/dashboardv2/public/js/utils/Enums.js
b/dashboardv2/public/js/utils/Enums.js
index e38af74..1409139 100644
--- a/dashboardv2/public/js/utils/Enums.js
+++ b/dashboardv2/public/js/utils/Enums.js
@@ -185,7 +185,9 @@ define(['require'], function(require) {
"totalDeletes": "number",
"totalFailed": "number",
"totalUpdates": "number",
- "topicOffsets":"number"
+ "processedMessageCount": "number",
+ "lastMessageProcessedTime": "day",
+ "failedMessageCount": "number"
}
};
return Enums;
diff --git a/dashboardv2/public/js/views/site/Statistics.js
b/dashboardv2/public/js/views/site/Statistics.js
index a8e351e..8929e54 100644
--- a/dashboardv2/public/js/views/site/Statistics.js
+++ b/dashboardv2/public/js/views/site/Statistics.js
@@ -48,7 +48,6 @@ define(['require',
connectionCard: "[data-id='connection-card']",
notificationCard: "[data-id='notification-card']",
statsNotificationTable: "[data-id='stats-notification-table']",
- notificationSmallCard: "[data-id='notification-small-card']",
entityCard: "[data-id='entity-card']",
offsetCard: "[data-id='offset-card']"
},
@@ -237,14 +236,6 @@ define(['require',
})
);
- that.ui.notificationSmallCard.html(
- createTable({
- "enums": Enums.stats.Notification,
- "data": _.pick(data.Notification,
'lastMessageProcessedTime', 'offsetCurrent', 'offsetStart')
- })
- );
-
-
var offsetTableColumn = function(obj) {
var returnObj = []
_.each(obj, function(value, key) {
@@ -255,14 +246,12 @@ define(['require',
that.ui.offsetCard.html(
TopicOffsetTable({
- "enums": Enums.stats.Notification,
- "data": data.Notification.topicOffsets,
- "tableHeader": ['offsetCurrent', 'offsetStart'],
- "tableCol":
offsetTableColumn(data.Notification.topicOffsets),
- "getTmplValue": function(argument, args) {
- console.log(argument, args)
- var returnVal =
data.Notification.topicOffsets[argument.label][args];
- return returnVal ?
_.numberFormatWithComa(returnVal) : 0;
+ data: data.Notification.topicDetails,
+ tableHeader: ["offsetStart", "offsetCurrent",
"processedMessageCount", "failedMessageCount", "lastMessageProcessedTime"],
+ tableCol:
offsetTableColumn(data.Notification.topicDetails),
+ getTmplValue: function(argument, args) {
+ var returnVal =
data.Notification.topicDetails[argument.label][args];
+ return returnVal ? that.getValue({ value:
returnVal, type: Enums.stats.Notification[args] }) : 0;
}
})
)
diff --git
a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index a48d93b..c968302 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -55,7 +55,7 @@ public class AtlasMetrics {
public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR =
PREFIX_NOTIFICATION + "currentHourFailed";
public static final String STAT_NOTIFY_START_TIME_CURR_HOUR =
PREFIX_NOTIFICATION + "currentHourStartTime";
public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME =
PREFIX_NOTIFICATION + "lastMessageProcessedTime";
- public static final String STAT_NOTIFY_TOPIC_OFFSETS =
PREFIX_NOTIFICATION + "topicOffsets";
+ public static final String STAT_NOTIFY_TOPIC_DETAILS =
PREFIX_NOTIFICATION + "topicDetails";
public static final String STAT_NOTIFY_COUNT_PREV_DAY =
PREFIX_NOTIFICATION + "previousDay";
public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY =
PREFIX_NOTIFICATION + "previousDayAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY =
PREFIX_NOTIFICATION + "previousDayEntityCreates";
diff --git
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
index 2c78cbc..beb90e6 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -109,6 +109,11 @@ public class AtlasMetricsUtil {
}
partitionStat.setCurrentOffset(msgOffset + 1);
+ if(stats.isFailedMsg) {
+ partitionStat.incrFailedMessageCount();
+ }
+ partitionStat.incrProcessedMessageCount();
+
partitionStat.setLastMessageProcessedTime(messagesProcessed.getLastIncrTime().toEpochMilli());
}
public Map<String, Object> getStats() {
@@ -126,20 +131,25 @@ public class AtlasMetricsUtil {
ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ?
STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ?
STATUS_CONNECTED : STATUS_NOT_CONNECTED);
- Map<String, Map<String, Long>> topicOffsets = new HashMap<>();
+ Map<String, Map<String, Long>> topicDetails = new HashMap<>();
for (TopicStats tStat : topicStats.values()) {
for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
- Map<String, Long> tpOffsets = new HashMap<>();
-
- tpOffsets.put("offsetStart", tpStat.startOffset);
- tpOffsets.put("offsetCurrent", tpStat.currentOffset);
-
- topicOffsets.put(tpStat.topicName + "-" + tpStat.partition,
tpOffsets);
+ Map<String, Long> tpDetails = new HashMap<>();
+
+ tpDetails.put("offsetStart", tpStat.startOffset);
+ tpDetails.put("offsetCurrent", tpStat.currentOffset);
+ tpDetails.put("failedMessageCount", tpStat.failedMessageCount);
+ tpDetails.put("lastMessageProcessedTime",
tpStat.lastMessageProcessedTime);
+ tpDetails.put("processedMessageCount",
tpStat.processedMessageCount);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Setting failedMessageCount : {} and
lastMessageProcessedTime : {} for topic {}-{}", tpStat.failedMessageCount,
tpStat.lastMessageProcessedTime, tpStat.topicName, tpStat.partition);
+ }
+ topicDetails.put(tpStat.topicName + "-" + tpStat.partition,
tpDetails);
}
}
- ret.put(STAT_NOTIFY_TOPIC_OFFSETS, topicOffsets);
+ ret.put(STAT_NOTIFY_TOPIC_DETAILS, topicDetails);
ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME,
this.messagesProcessed.getLastIncrTime().toEpochMilli());
ret.put(STAT_NOTIFY_COUNT_TOTAL,
messagesProcessed.getCount(ALL));
@@ -345,6 +355,9 @@ public class AtlasMetricsUtil {
private final int partition;
private final long startOffset;
private long currentOffset;
+ private long lastMessageProcessedTime;
+ private long failedMessageCount;
+ private long processedMessageCount;
public TopicPartitionStat(String topicName, int partition, long
startOffset, long currentOffset) {
this.topicName = topicName;
@@ -373,5 +386,16 @@ public class AtlasMetricsUtil {
this.currentOffset = currentOffset;
}
+ public long getLastMessageProcessedTime() { return
lastMessageProcessedTime; }
+
+ public void setLastMessageProcessedTime(long lastMessageProcessedTime)
{ this.lastMessageProcessedTime = lastMessageProcessedTime; }
+
+ public long getFailedMessageCount() { return failedMessageCount; }
+
+ public void incrFailedMessageCount() { this.failedMessageCount++; }
+
+ public long getProcessedMessageCount() { return processedMessageCount;
}
+
+ public void incrProcessedMessageCount() {
this.processedMessageCount++; }
};
}