kfaraz commented on code in PR #17735: URL: https://github.com/apache/druid/pull/17735#discussion_r1976265230
########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java: ########## @@ -41,13 +41,25 @@ public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, Rec private final PartitionIdType partitionId; private final SequenceOffsetType sequenceNumber; private final List<RecordType> data; + private final long timestamp; Review Comment: Thinking about this, I think it would be better to just use a boxed `Long` and return `null` if it has not been set. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + yetToReadPartitions.add(entry.getKey()); + continue; + } + + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); Review Comment: ```suggestion if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { yetToReadPartitions.add(entry.getKey()); } else { recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); } ``` ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + yetToReadPartitions.add(entry.getKey()); + continue; + } + + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); + } + } + + final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = getTimestampPerPartitionAtCurrentOffset(partitions); + // TODO: this might give wierd values for lag when the tasks are yet to start processing + yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L)); + + recordSupplier.seekToLatest(partitions); + // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is + // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest + // task offsets from the latest offsets from the stream when it is needed + latestSequenceFromStream = recordSupplier.getLatestSequenceNumbers(partitions); + + for (Map.Entry<KafkaTopicPartition, Long> entry : latestSequenceFromStream.entrySet()) { + // if there are no messages .getEndOffset would return 0, but if there are n msgs it would return n+1 + // and hence we need to seek to n - 2 to get the nth msg in the next poll. + if (entry.getValue() != 0) { + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 2); + } + } + + partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions) + .entrySet().stream().filter(e -> lastIngestedTimestamps.containsKey(e.getKey())) + .collect( + Collectors.toMap( + Entry::getKey, + e -> e.getValue() - lastIngestedTimestamps.get(e.getKey()) + ) + ); + } + catch (InterruptedException e) { + throw new StreamException(e); + } + finally { + getRecordSupplierLock().unlock(); + } + } + + private Map<KafkaTopicPartition, Long> getTimestampPerPartitionAtCurrentOffset(Set<StreamPartition<KafkaTopicPartition>> allPartitions) + { + Map<KafkaTopicPartition, Long> result = new HashMap<>(); + Set<StreamPartition<KafkaTopicPartition>> remainingPartitions = new HashSet<>(allPartitions); + + try { + int maxPolls = 5; + while (!remainingPartitions.isEmpty() && maxPolls-- > 0) { + for (OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity> record : recordSupplier.poll(getIoConfig().getPollTimeout())) { + if (!result.containsKey(record.getPartitionId())) { + result.put(record.getPartitionId(), record.getTimestamp()); + remainingPartitions.remove(new StreamPartition<>(getIoConfig().getStream(), record.getPartitionId())); + if (remainingPartitions.isEmpty()) { + break; + } + } + recordSupplier.assign(remainingPartitions); + } + } + } + finally { + recordSupplier.assign(allPartitions); + } + + if (!remainingPartitions.isEmpty()) { + log.info("Couldn't fetch the latest timestamp for the following partitions: [%s]", remainingPartitions); Review Comment: ```suggestion log.info("Could not fetch the latest timestamp for partitions[%s].", remainingPartitions); ``` ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + yetToReadPartitions.add(entry.getKey()); + continue; + } + + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); + } + } + + final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = getTimestampPerPartitionAtCurrentOffset(partitions); + // TODO: this might give wierd values for lag when the tasks are yet to start processing + yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L)); + + recordSupplier.seekToLatest(partitions); + // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is + // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest + // task offsets from the latest offsets from the stream when it is needed + latestSequenceFromStream = recordSupplier.getLatestSequenceNumbers(partitions); + + for (Map.Entry<KafkaTopicPartition, Long> entry : latestSequenceFromStream.entrySet()) { + // if there are no messages .getEndOffset would return 0, but if there are n msgs it would return n+1 + // and hence we need to seek to n - 2 to get the nth msg in the next poll. + if (entry.getValue() != 0) { + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 2); + } + } + + partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions) + .entrySet().stream().filter(e -> lastIngestedTimestamps.containsKey(e.getKey())) + .collect( + Collectors.toMap( + Entry::getKey, + e -> e.getValue() - lastIngestedTimestamps.get(e.getKey()) + ) + ); Review Comment: The older method used to end with `seekToLatest`, I wonder if we should do that here too. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { Review Comment: Rather than this, it would be more straightforward to iterate over the contents of `partitionIds`. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java: ########## @@ -51,6 +52,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig private final KafkaConfigOverrides configOverrides; private final String topic; private final String topicPattern; + private final boolean publishTimeLag; Review Comment: ```suggestion private final boolean emitTimeLagMetrics; ``` ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + yetToReadPartitions.add(entry.getKey()); + continue; + } + + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); + } + } + + final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = getTimestampPerPartitionAtCurrentOffset(partitions); + // TODO: this might give wierd values for lag when the tasks are yet to start processing Review Comment: Maybe remove the todo for now, it is enough to call out the problem. ```suggestion // Note: this might give weird values for lag when the tasks are yet to start processing ``` ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); Review Comment: Please add a comment before this line denoting what is being done here. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + yetToReadPartitions.add(entry.getKey()); + continue; + } + + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); + } + } + + final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = getTimestampPerPartitionAtCurrentOffset(partitions); + // TODO: this might give wierd values for lag when the tasks are yet to start processing + yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L)); + + recordSupplier.seekToLatest(partitions); + // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is + // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest + // task offsets from the latest offsets from the stream when it is needed + latestSequenceFromStream = recordSupplier.getLatestSequenceNumbers(partitions); + + for (Map.Entry<KafkaTopicPartition, Long> entry : latestSequenceFromStream.entrySet()) { + // if there are no messages .getEndOffset would return 0, but if there are n msgs it would return n+1 + // and hence we need to seek to n - 2 to get the nth msg in the next poll. + if (entry.getValue() != 0) { + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 2); Review Comment: I forget the details of the discussion, but I think we had come to the conclusion that this should be -1, right? ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -380,6 +382,108 @@ public LagStats computeLagStats() return computeLags(partitionRecordLag); } + /** + * This method is similar to updatePartitionLagFromStream + * but also determines time lag. Once this method has been + * tested, we can remove the older one. + */ + private void updatePartitionTimeAndRecordLagFromStream() + { + final Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + + getRecordSupplierLock().lock(); + try { + Set<KafkaTopicPartition> partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>(); + for (Map.Entry<KafkaTopicPartition, Long> entry : highestCurrentOffsets.entrySet()) { + if (partitionIds.contains(entry.getKey())) { + if (highestCurrentOffsets.get(entry.getKey()) == null || highestCurrentOffsets.get(entry.getKey()) == 0) { + yetToReadPartitions.add(entry.getKey()); + continue; + } + + recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue() - 1); + } + } + + final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = getTimestampPerPartitionAtCurrentOffset(partitions); + // TODO: this might give wierd values for lag when the tasks are yet to start processing + yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L)); + + recordSupplier.seekToLatest(partitions); + // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is + // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest + // task offsets from the latest offsets from the stream when it is needed Review Comment: this comment is not needed ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org