kfaraz commented on code in PR #17735:
URL: https://github.com/apache/druid/pull/17735#discussion_r1961863814
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4506,6 +4510,21 @@ protected void emitNoticeProcessTime(String noticeType,
long timeInMillis)
}
}
+ protected void emitUpdateOffsetsTime(long timeInMillis)
+ {
+ try {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension("dataSource", dataSource)
Review Comment:
```suggestion
.setDimension(DruidMetrics.DATASOURCE,
dataSource)
.setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -151,6 +154,11 @@ public boolean isMultiTopic()
return topicPattern != null;
}
+ public boolean isPublishTimeLag()
Review Comment:
Please annotate with `@JsonProperty` and add a short javadoc.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4506,6 +4510,21 @@ protected void emitNoticeProcessTime(String noticeType,
long timeInMillis)
}
}
+ protected void emitUpdateOffsetsTime(long timeInMillis)
+ {
+ try {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension("dataSource", dataSource)
+ .setDimensionIfNotNull(DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS))
+ .setMetric("ingest/updateOffsets/time",
timeInMillis)
Review Comment:
```suggestion
.setMetric(StringUtils.format("ingest/%s/fetchOffsets/time", spec.getType()),
timeInMillis)
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -102,6 +104,7 @@ public KafkaSupervisorIOConfig(
this.configOverrides = configOverrides;
this.topic = topic;
this.topicPattern = topicPattern;
+ this.publishTimeLag = publishTimeLag != null && publishTimeLag;
Review Comment:
```suggestion
this.publishTimeLag = Configs.valueOrDefault(publishTimeLag, false);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java:
##########
@@ -56,13 +68,19 @@ public OrderedPartitionableRecord(
this.partitionId = partitionId;
this.sequenceNumber = sequenceNumber;
this.data = data == null ? ImmutableList.of() : data;
+ this.timestamp = timestamp;
}
public String getStream()
{
return stream;
}
+ public long getTimestamp()
Review Comment:
Please add a short javadoc.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -91,6 +93,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
+ private volatile Map<KafkaTopicPartition, Long> lastestTimestampsFromStream;
Review Comment:
Since this represents the lag itself, rename it to something like:
```suggestion
private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey()) &&
highestCurrentOffsets.get(entry.getKey()) != null) {
+ // since we need to consider the last arrived record at that
sequence do a `-1`
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+ OrderedPartitionableRecord::getTimestamp
+ );
+
+ Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p
-> recordSupplier.getPosition(p)));
+
+ // .position() gives next value to read, and we need seek by -2 to get
the current record in next poll()
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+
+ lastestTimestampsFromStream =
getRecordPerPartitionAtCurrentOffset(partitionIds)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue().getTimestamp() -
lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition,
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>>
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
Review Comment:
Please add a short javadoc for this method.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey()) &&
highestCurrentOffsets.get(entry.getKey()) != null) {
+ // since we need to consider the last arrived record at that
sequence do a `-1`
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+ OrderedPartitionableRecord::getTimestamp
+ );
+
+ Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p
-> recordSupplier.getPosition(p)));
+
+ // .position() gives next value to read, and we need seek by -2 to get
the current record in next poll()
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+
+ lastestTimestampsFromStream =
getRecordPerPartitionAtCurrentOffset(partitionIds)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue().getTimestamp() -
lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition,
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>>
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+ {
+ Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition,
Long, KafkaRecordEntity>> result = new HashMap<>();
+ int maxPolls = 10;
Review Comment:
10 polls seems excessive, maybe we should start with a smaller number.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey()) &&
highestCurrentOffsets.get(entry.getKey()) != null) {
+ // since we need to consider the last arrived record at that
sequence do a `-1`
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+ OrderedPartitionableRecord::getTimestamp
+ );
+
+ Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p
-> recordSupplier.getPosition(p)));
+
+ // .position() gives next value to read, and we need seek by -2 to get
the current record in next poll()
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+
+ lastestTimestampsFromStream =
getRecordPerPartitionAtCurrentOffset(partitionIds)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue().getTimestamp() -
lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition,
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>>
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+ {
+ Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition,
Long, KafkaRecordEntity>> result = new HashMap<>();
+ int maxPolls = 10;
+ while (maxPolls-- > 0) {
+ for (OrderedPartitionableRecord<KafkaTopicPartition, Long,
KafkaRecordEntity> record :
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+ if (!result.containsKey(record.getPartitionId())) {
+ result.put(record.getPartitionId(), record);
+ if (partitions.size() == result.size()) {
+ break;
+ }
+ }
+ }
+ if (partitions.size() == result.size()) {
+ break;
+ }
+ }
+
+ return result;
Review Comment:
If we couldn't find the timestamp for any partition, maybe we should log
those partition IDs here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4506,6 +4510,21 @@ protected void emitNoticeProcessTime(String noticeType,
long timeInMillis)
}
}
+ protected void emitUpdateOffsetsTime(long timeInMillis)
Review Comment:
Why protected, can it be private instead?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -137,4 +137,13 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType>
partition,
*/
@Override
void close();
+
+ /**
+ * Gets the timestamp for record at a particular offset.
+ *
+ * @param partition target partition
+ * @param offset target offset
+ * @param timeout poll timeout
+ */
+ long getTimeAtOffset(StreamPartition<PartitionIdType> partition,
OrderedSequenceNumber<SequenceOffsetType> offset, long timeout);
Review Comment:
I suppose this method (and its implemntations) can be removed now.
##########
docs/operations/metrics.md:
##########
@@ -271,6 +271,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
coordinator cycle time.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the
calculations of `lagBased` auto scaler.|`dataSource`, `stream`,
`scalingSkipReason`|Depends on auto scaler config.|
+|`ingest/updateOffsets/time`|Time (in milliseconds) taken to update the
offsets.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few
seconds at most.|
Review Comment:
Please move this metric to Kafka ingestion section and add a corresponding
metric for Kinesis ingestion too.
Use the stream type (`kafka` or `kinesis`) in the metric name to correspond
with other metrics such as `ingest/kafka/lag`.
```suggestion
|`ingest/kafka/updateOffsets/time`|Total time (in milliseconds) taken to
fetch the latest offsets from Kafka stream and the ingestion
tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few
seconds at most.|
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey()) &&
highestCurrentOffsets.get(entry.getKey()) != null) {
+ // since we need to consider the last arrived record at that
sequence do a `-1`
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+ OrderedPartitionableRecord::getTimestamp
+ );
+
+ Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p
-> recordSupplier.getPosition(p)));
+
+ // .position() gives next value to read, and we need seek by -2 to get
the current record in next poll()
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+
+ lastestTimestampsFromStream =
getRecordPerPartitionAtCurrentOffset(partitionIds)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue().getTimestamp() -
lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition,
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>>
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+ {
+ Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition,
Long, KafkaRecordEntity>> result = new HashMap<>();
+ int maxPolls = 10;
+ while (maxPolls-- > 0) {
+ for (OrderedPartitionableRecord<KafkaTopicPartition, Long,
KafkaRecordEntity> record :
recordSupplier.poll(getIoConfig().getPollTimeout())) {
Review Comment:
Before the next poll, should we unassign some partitions from the consumer
to increase the likelihood of getting records from the required partitions?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey()) &&
highestCurrentOffsets.get(entry.getKey()) != null) {
+ // since we need to consider the last arrived record at that
sequence do a `-1`
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+ OrderedPartitionableRecord::getTimestamp
+ );
+
+ Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p
-> recordSupplier.getPosition(p)));
Review Comment:
Rather than a call to `seekToLatest()` and multiple calls to
`getPosition()`, we could add a new method
`KafkaRecordSupplier.getEndOffsets()` which just calls
`consumer.getEndOffsets()`.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey()) &&
highestCurrentOffsets.get(entry.getKey()) != null) {
+ // since we need to consider the last arrived record at that
sequence do a `-1`
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+ OrderedPartitionableRecord::getTimestamp
+ );
+
+ Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p
-> recordSupplier.getPosition(p)));
+
+ // .position() gives next value to read, and we need seek by -2 to get
the current record in next poll()
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+
+ lastestTimestampsFromStream =
getRecordPerPartitionAtCurrentOffset(partitionIds)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue().getTimestamp() -
lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition,
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>>
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+ {
+ Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition,
Long, KafkaRecordEntity>> result = new HashMap<>();
Review Comment:
I guess this method need not return the whole record, it could just return
the timestamp for every partition.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ protected void updatePartitionTimeAndRecordLagFromStream()
Review Comment:
This method can be private.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -72,7 +73,8 @@ public KafkaSupervisorIOConfig(
@JsonProperty("lateMessageRejectionStartDateTime") DateTime
lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig,
- @JsonProperty("stopTaskCount") Integer stopTaskCount
+ @JsonProperty("stopTaskCount") Integer stopTaskCount,
+ @JsonProperty("publishTimeLag") Boolean publishTimeLag
Review Comment:
```suggestion
@JsonProperty("publishTimeLag") @Nullable Boolean publishTimeLag
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]