kfaraz commented on code in PR #17735:
URL: https://github.com/apache/druid/pull/17735#discussion_r1976265230


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java:
##########
@@ -41,13 +41,25 @@ public class OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType, Rec
   private final PartitionIdType partitionId;
   private final SequenceOffsetType sequenceNumber;
   private final List<RecordType> data;
+  private final long timestamp;

Review Comment:
   Thinking about this, I think it would be better to just use a boxed `Long` 
and return `null` if it has not been set.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            yetToReadPartitions.add(entry.getKey());
+            continue;
+          }
+
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);

Review Comment:
   ```suggestion
             if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
               yetToReadPartitions.add(entry.getKey());
             } else {
                recordSupplier.seek(new 
StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() 
- 1);
                }
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            yetToReadPartitions.add(entry.getKey());
+            continue;
+          }
+
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = 
getTimestampPerPartitionAtCurrentOffset(partitions);
+      // TODO: this might give wierd values for lag when the tasks are yet to 
start processing
+      yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+      recordSupplier.seekToLatest(partitions);
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream = 
recordSupplier.getLatestSequenceNumbers(partitions);
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        // if there are no messages .getEndOffset would return 0, but if there 
are n msgs it would return n+1
+        // and hence we need to seek to n - 2 to get the nth msg in the next 
poll.
+        if (entry.getValue() != 0) {
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+        }
+      }
+
+      partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, Long> 
getTimestampPerPartitionAtCurrentOffset(Set<StreamPartition<KafkaTopicPartition>>
 allPartitions)
+  {
+    Map<KafkaTopicPartition, Long> result = new HashMap<>();
+    Set<StreamPartition<KafkaTopicPartition>> remainingPartitions = new 
HashSet<>(allPartitions);
+
+    try {
+      int maxPolls = 5;
+      while (!remainingPartitions.isEmpty() && maxPolls-- > 0) {
+        for (OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity> record : 
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+          if (!result.containsKey(record.getPartitionId())) {
+            result.put(record.getPartitionId(), record.getTimestamp());
+            remainingPartitions.remove(new 
StreamPartition<>(getIoConfig().getStream(), record.getPartitionId()));
+            if (remainingPartitions.isEmpty()) {
+              break;
+            }
+          }
+          recordSupplier.assign(remainingPartitions);
+        }
+      }
+    }
+    finally {
+      recordSupplier.assign(allPartitions);
+    }
+
+    if (!remainingPartitions.isEmpty()) {
+      log.info("Couldn't fetch the latest timestamp for the following 
partitions: [%s]", remainingPartitions);

Review Comment:
   ```suggestion
         log.info("Could not fetch the latest timestamp for partitions[%s].", 
remainingPartitions);
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            yetToReadPartitions.add(entry.getKey());
+            continue;
+          }
+
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = 
getTimestampPerPartitionAtCurrentOffset(partitions);
+      // TODO: this might give wierd values for lag when the tasks are yet to 
start processing
+      yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+      recordSupplier.seekToLatest(partitions);
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream = 
recordSupplier.getLatestSequenceNumbers(partitions);
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        // if there are no messages .getEndOffset would return 0, but if there 
are n msgs it would return n+1
+        // and hence we need to seek to n - 2 to get the nth msg in the next 
poll.
+        if (entry.getValue() != 0) {
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+        }
+      }
+
+      partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
+              )
+          );

Review Comment:
   The older method used to end with `seekToLatest`, I wonder if we should do 
that here too.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {

Review Comment:
   Rather than this, it would be more straightforward to iterate over the 
contents of `partitionIds`.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -51,6 +52,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
   private final KafkaConfigOverrides configOverrides;
   private final String topic;
   private final String topicPattern;
+  private final boolean publishTimeLag;

Review Comment:
   ```suggestion
     private final boolean emitTimeLagMetrics;
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            yetToReadPartitions.add(entry.getKey());
+            continue;
+          }
+
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = 
getTimestampPerPartitionAtCurrentOffset(partitions);
+      // TODO: this might give wierd values for lag when the tasks are yet to 
start processing

Review Comment:
   Maybe remove the todo for now, it is enough to call out the problem.
   
   ```suggestion
         // Note: this might give weird values for lag when the tasks are yet 
to start processing
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();

Review Comment:
   Please add a comment before this line denoting what is being done here.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            yetToReadPartitions.add(entry.getKey());
+            continue;
+          }
+
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = 
getTimestampPerPartitionAtCurrentOffset(partitions);
+      // TODO: this might give wierd values for lag when the tasks are yet to 
start processing
+      yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+      recordSupplier.seekToLatest(partitions);
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream = 
recordSupplier.getLatestSequenceNumbers(partitions);
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        // if there are no messages .getEndOffset would return 0, but if there 
are n msgs it would return n+1
+        // and hence we need to seek to n - 2 to get the nth msg in the next 
poll.
+        if (entry.getValue() != 0) {
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);

Review Comment:
   I forget the details of the discussion, but I think we had come to the 
conclusion that this should be -1, right?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            yetToReadPartitions.add(entry.getKey());
+            continue;
+          }
+
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = 
getTimestampPerPartitionAtCurrentOffset(partitions);
+      // TODO: this might give wierd values for lag when the tasks are yet to 
start processing
+      yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+      recordSupplier.seekToLatest(partitions);
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed

Review Comment:
   this comment is not needed
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to