adithyachakilam commented on code in PR #17735: URL: https://github.com/apache/druid/pull/17735#discussion_r1975444369
########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,104 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + protected void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> emptyPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + emptyPartitions.add(entry.getKey()); Review Comment: Renamed it as yetToRead and modified small logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org