tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991390234


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -274,17 +269,19 @@ protected Map<Integer, Long> getPartitionTimeLag()
   @SuppressWarnings("SSBasedInspection")
   protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> 
currentOffsets)
   {
-    return currentOffsets
+    if (latestSequenceFromStream == null) {
+      return ImmutableMap.of();
+    }
+
+    return latestSequenceFromStream
         .entrySet()
         .stream()
         .collect(
             Collectors.toMap(
                 Entry::getKey,
-                e -> latestSequenceFromStream != null
-                     && latestSequenceFromStream.get(e.getKey()) != null
-                     && e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : Integer.MIN_VALUE
+                e -> e.getValue() != null

Review Comment:
   If any partitions are missing from the `latestSequenceFromStream`, Those 
partitions won't be considered during lag reporting. Now that idle behavior is 
decided based on these lags it's better to compare as per the latest stream 
stats.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to