kfaraz commented on code in PR #17735:
URL: https://github.com/apache/druid/pull/17735#discussion_r1961863814


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4506,6 +4510,21 @@ protected void emitNoticeProcessTime(String noticeType, 
long timeInMillis)
     }
   }
 
+  protected void emitUpdateOffsetsTime(long timeInMillis)
+  {
+    try {
+      emitter.emit(
+          ServiceMetricEvent.builder()
+                            .setDimension("dataSource", dataSource)

Review Comment:
   ```suggestion
                               .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
                               .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -151,6 +154,11 @@ public boolean isMultiTopic()
     return topicPattern != null;
   }
 
+  public boolean isPublishTimeLag()

Review Comment:
   Please annotate with `@JsonProperty` and add a short javadoc.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4506,6 +4510,21 @@ protected void emitNoticeProcessTime(String noticeType, 
long timeInMillis)
     }
   }
 
+  protected void emitUpdateOffsetsTime(long timeInMillis)
+  {
+    try {
+      emitter.emit(
+          ServiceMetricEvent.builder()
+                            .setDimension("dataSource", dataSource)
+                            .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS))
+                            .setMetric("ingest/updateOffsets/time", 
timeInMillis)

Review Comment:
   ```suggestion
                               
.setMetric(StringUtils.format("ingest/%s/fetchOffsets/time", spec.getType()), 
timeInMillis)
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -102,6 +104,7 @@ public KafkaSupervisorIOConfig(
     this.configOverrides = configOverrides;
     this.topic = topic;
     this.topicPattern = topicPattern;
+    this.publishTimeLag = publishTimeLag != null && publishTimeLag;

Review Comment:
   ```suggestion
       this.publishTimeLag = Configs.valueOrDefault(publishTimeLag, false);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java:
##########
@@ -56,13 +68,19 @@ public OrderedPartitionableRecord(
     this.partitionId = partitionId;
     this.sequenceNumber = sequenceNumber;
     this.data = data == null ? ImmutableList.of() : data;
+    this.timestamp = timestamp;
   }
 
   public String getStream()
   {
     return stream;
   }
 
+  public long getTimestamp()

Review Comment:
   Please add a short javadoc.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -91,6 +93,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
 
   private final Pattern pattern;
   private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
+  private volatile Map<KafkaTopicPartition, Long> lastestTimestampsFromStream;

Review Comment:
   Since this represents the lag itself, rename it to something like:
   ```suggestion
     private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));
+
+      // .position() gives next value to read, and we need seek by -2 to get 
the current record in next poll()
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+      }
+
+      lastestTimestampsFromStream = 
getRecordPerPartitionAtCurrentOffset(partitionIds)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue().getTimestamp() - 
lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, 
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> 
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)

Review Comment:
   Please add a short javadoc for this method.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));
+
+      // .position() gives next value to read, and we need seek by -2 to get 
the current record in next poll()
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+      }
+
+      lastestTimestampsFromStream = 
getRecordPerPartitionAtCurrentOffset(partitionIds)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue().getTimestamp() - 
lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, 
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> 
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+  {
+    Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition, 
Long, KafkaRecordEntity>> result = new HashMap<>();
+    int maxPolls = 10;

Review Comment:
   10 polls seems excessive, maybe we should start with a smaller number.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));
+
+      // .position() gives next value to read, and we need seek by -2 to get 
the current record in next poll()
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+      }
+
+      lastestTimestampsFromStream = 
getRecordPerPartitionAtCurrentOffset(partitionIds)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue().getTimestamp() - 
lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, 
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> 
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+  {
+    Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition, 
Long, KafkaRecordEntity>> result = new HashMap<>();
+    int maxPolls = 10;
+    while (maxPolls-- > 0) {
+      for (OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity> record : 
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+        if (!result.containsKey(record.getPartitionId())) {
+          result.put(record.getPartitionId(), record);
+          if (partitions.size() == result.size()) {
+            break;
+          }
+        }
+      }
+      if (partitions.size() == result.size()) {
+        break;
+      }
+    }
+
+    return result;

Review Comment:
   If we couldn't find the timestamp for any partition, maybe we should log 
those partition IDs here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4506,6 +4510,21 @@ protected void emitNoticeProcessTime(String noticeType, 
long timeInMillis)
     }
   }
 
+  protected void emitUpdateOffsetsTime(long timeInMillis)

Review Comment:
   Why protected, can it be private instead?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -137,4 +137,13 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> 
partition,
    */
   @Override
   void close();
+
+  /**
+   *  Gets the timestamp for record at a particular offset.
+   *
+   * @param partition target partition
+   * @param offset target offset
+   * @param timeout poll timeout
+   */
+  long getTimeAtOffset(StreamPartition<PartitionIdType> partition, 
OrderedSequenceNumber<SequenceOffsetType> offset, long timeout);

Review Comment:
   I suppose this method (and its implemntations) can be removed now.



##########
docs/operations/metrics.md:
##########
@@ -271,6 +271,7 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
 |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of 
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
coordinator cycle time.|
 |`task/autoScaler/requiredCount`|Count of required tasks based on the 
calculations of `lagBased` auto scaler.|`dataSource`, `stream`, 
`scalingSkipReason`|Depends on auto scaler config.|
+|`ingest/updateOffsets/time`|Time (in milliseconds) taken to update the 
offsets.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few 
seconds at most.|

Review Comment:
   Please move this metric to Kafka ingestion section and add a corresponding 
metric for Kinesis ingestion too.
   Use the stream type (`kafka` or `kinesis`) in the metric name to correspond 
with other metrics such as `ingest/kafka/lag`.
   
   ```suggestion
   |`ingest/kafka/updateOffsets/time`|Total time (in milliseconds) taken to 
fetch the latest offsets from Kafka stream and the ingestion 
tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few 
seconds at most.|
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));
+
+      // .position() gives next value to read, and we need seek by -2 to get 
the current record in next poll()
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+      }
+
+      lastestTimestampsFromStream = 
getRecordPerPartitionAtCurrentOffset(partitionIds)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue().getTimestamp() - 
lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, 
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> 
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+  {
+    Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition, 
Long, KafkaRecordEntity>> result = new HashMap<>();
+    int maxPolls = 10;
+    while (maxPolls-- > 0) {
+      for (OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity> record : 
recordSupplier.poll(getIoConfig().getPollTimeout())) {

Review Comment:
   Before the next poll, should we unassign some partitions from the consumer 
to increase the likelihood of getting records from the required partitions?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));

Review Comment:
   Rather than a call to `seekToLatest()` and multiple calls to 
`getPosition()`, we could add a new method 
`KafkaRecordSupplier.getEndOffsets()` which just calls 
`consumer.getEndOffsets()`.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));
+
+      // .position() gives next value to read, and we need seek by -2 to get 
the current record in next poll()
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+      }
+
+      lastestTimestampsFromStream = 
getRecordPerPartitionAtCurrentOffset(partitionIds)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue().getTimestamp() - 
lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, 
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> 
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+  {
+    Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition, 
Long, KafkaRecordEntity>> result = new HashMap<>();

Review Comment:
   I guess this method need not return the whole record, it could just return 
the timestamp for every partition.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()

Review Comment:
   This method can be private.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -72,7 +73,8 @@ public KafkaSupervisorIOConfig(
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime 
lateMessageRejectionStartDateTime,
       @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
       @JsonProperty("idleConfig") IdleConfig idleConfig,
-      @JsonProperty("stopTaskCount") Integer stopTaskCount
+      @JsonProperty("stopTaskCount") Integer stopTaskCount,
+      @JsonProperty("publishTimeLag") Boolean publishTimeLag

Review Comment:
   ```suggestion
         @JsonProperty("publishTimeLag") @Nullable Boolean publishTimeLag
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to