abhishekagarwal87 commented on code in PR #14292:
URL: https://github.com/apache/druid/pull/14292#discussion_r1200006382


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4220,6 +4220,18 @@ protected void emitLag()
           return;
         }
 
+        // Try emitting lag even with stale metrics provided that none of the 
partitions has negative lag
+        final boolean areOffsetsStale =
+            sequenceLastUpdated != null
+            && sequenceLastUpdated.getMillis()
+               < System.currentTimeMillis() - 
tuningConfig.getOffsetFetchPeriod().getMillis();
+        if (areOffsetsStale && partitionLags.values().stream().anyMatch(x -> x 
< 0)) {
+          log.warn("Lag is negative and will not be emitted because topic 
offsets have become stale. "
+                   + "This will not impact data processing. "
+                   + "Offsets may become stale because of connectivity 
issues.");

Review Comment:
   "Offsets may become stale because of connectivity issues." - This isn't very 
helpful. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4220,6 +4220,18 @@ protected void emitLag()
           return;
         }
 
+        // Try emitting lag even with stale metrics provided that none of the 
partitions has negative lag
+        final boolean areOffsetsStale =
+            sequenceLastUpdated != null
+            && sequenceLastUpdated.getMillis()
+               < System.currentTimeMillis() - 
tuningConfig.getOffsetFetchPeriod().getMillis();
+        if (areOffsetsStale && partitionLags.values().stream().anyMatch(x -> x 
< 0)) {
+          log.warn("Lag is negative and will not be emitted because topic 
offsets have become stale. "
+                   + "This will not impact data processing. "
+                   + "Offsets may become stale because of connectivity 
issues.");

Review Comment:
   ```suggestion
                      + "Offsets usually become stale when tasks cannot connect 
to Kafka cluster.");
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4220,6 +4220,18 @@ protected void emitLag()
           return;
         }
 
+        // Try emitting lag even with stale metrics provided that none of the 
partitions has negative lag
+        final boolean areOffsetsStale =
+            sequenceLastUpdated != null
+            && sequenceLastUpdated.getMillis()
+               < System.currentTimeMillis() - 
tuningConfig.getOffsetFetchPeriod().getMillis();
+        if (areOffsetsStale && partitionLags.values().stream().anyMatch(x -> x 
< 0)) {
+          log.warn("Lag is negative and will not be emitted because topic 
offsets have become stale. "

Review Comment:
   that info can bloat the log a lot. We can just say that "Check the task 
report for more details around lag". 



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4220,6 +4220,18 @@ protected void emitLag()
           return;
         }
 
+        // Try emitting lag even with stale metrics provided that none of the 
partitions has negative lag
+        final boolean areOffsetsStale =
+            sequenceLastUpdated != null
+            && sequenceLastUpdated.getMillis()
+               < System.currentTimeMillis() - 
tuningConfig.getOffsetFetchPeriod().getMillis();
+        if (areOffsetsStale && partitionLags.values().stream().anyMatch(x -> x 
< 0)) {
+          log.warn("Lag is negative and will not be emitted because topic 
offsets have become stale. "
+                   + "This will not impact data processing. "
+                   + "Offsets may become stale because of connectivity 
issues.");
+          return;

Review Comment:
   If we do that, it will be very easy to get into a wrong debugging trail 
where the overall lag might appear lower than it actually is. I am in favor of 
not emitting lag for any partition at all. The partition level lag would still 
be available in the task reports. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to