kfaraz commented on code in PR #17735:
URL: https://github.com/apache/druid/pull/17735#discussion_r1959204479


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -137,4 +137,13 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> 
partition,
    */
   @Override
   void close();
+
+  /**
+   *  Gets the timestamp for record at a particular offset.
+   *
+   * @param partition target partition
+   * @param offset target offset
+   * @param timeout poll timeout
+   */
+  long getTimeAtOffset(StreamPartition<PartitionIdType> partition, 
OrderedSequenceNumber<SequenceOffsetType> offset, long timeout);

Review Comment:
   Instead of this, you could expose method `getTimestamp` without needing to 
specify the offset.
   Please refer to the other comment for details.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -280,8 +281,43 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionRecordLag()
   @Override
   protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
   {
-    // time lag not currently support with kafka
-    return null;
+    return latestSequenceFromStream == null ? null : 
getTimeLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+  }
+
+  protected Map<KafkaTopicPartition, Long> 
getTimeLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  {
+    getRecordSupplierLock().lock();
+    Map<KafkaTopicPartition, Long> timeAtCurrOffsets = new HashMap<>();
+    try {
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
currentOffsets.entrySet()) {
+        timeAtCurrOffsets.put(entry.getKey(), entry.getValue() == null ? 0L : 
recordSupplier.getTimeAtOffset(
+                              new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()),
+                              KafkaSequenceNumber.of(entry.getValue()),
+                              getIoConfig().getPollTimeout()
+                          ));
+      }
+
+      return latestSequenceFromStream
+          .entrySet()
+          .stream()
+          .collect(
+            Collectors.toMap(
+              Entry::getKey,
+              e -> {
+                long latestTime = e.getValue() == null ? 0L : 
recordSupplier.getTimeAtOffset(
+                    new StreamPartition<>(getIoConfig().getStream(), 
e.getKey()),
+                    // Since latestSequqnceFromStream consits of next offset 
that should be read.
+                    KafkaSequenceNumber.of(e.getValue() - 1),
+                    getIoConfig().getPollTimeout()
+                  );
+                return latestTime > 0L ? latestTime - 
timeAtCurrOffsets.getOrDefault(e.getKey(), 0L) : latestTime;
+              }
+            )

Review Comment:
   Rather than doing this, we should initialize a `Map<KafkaTopicPartition, 
Long> latestTimestampsFromStream` in the method `updatePartitionLagFromStream` 
itself, same way as `latestSequenceFromStream` is initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to