cecemei commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2587006387


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected 
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
   )
   {
     KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
-    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences();

Review Comment:
   maybe pass `OffsetSnapshot` as param so that what's used in the lag 
calculation is consistent with line 183?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+  {
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
currentOffsets;
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+    public OffsetSnapshot(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+        @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+    )
+    {
+      this.currentOffsets = toImmutableOffsetMap(currentOffsets);

Review Comment:
    It’s not clear to me under what circumstances the inputs could be null. 
Could you document the scenarios where this might occur?
    



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+  {
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
currentOffsets;
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+    public OffsetSnapshot(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+        @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+    )
+    {
+      this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+      this.endOffsets = toImmutableOffsetMap(endOffsets);
+    }
+
+    private ImmutableMap<PartitionIdType, SequenceOffsetType> 
toImmutableOffsetMap(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> input
+    )
+    {
+      if (input == null || input.isEmpty()) {
+        return ImmutableMap.of();
+      }
+
+      return input.entrySet().stream()

Review Comment:
   nit: looks like you just want to remove null value from the map, in which 
case `Maps.filterValues` could be cleaner.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -316,19 +321,21 @@ private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences
   @Override
   protected Map<KafkaTopicPartition, Long> 
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
-    if (latestSequenceFromStream == null || currentOffsets == null) {
+    Map<KafkaTopicPartition, Long> endOffsets = 
offsetSnapshotRef.get().getEndOffsets();
+
+    if (endOffsets == null || currentOffsets == null) {

Review Comment:
   same here, can this be null?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,13 +295,17 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences()
   {
-    if (latestSequenceFromStream == null) {
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> currentOffsets = 
offsetSnapshot.getCurrentOffsets();
+    Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets();
+
+    if (endOffsets == null) {

Review Comment:
   Can `offsetSnapshot.getEndOffsets()` be null? I thought you have already 
replaced null with empty map?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected 
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
   )
   {
     KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
-    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences();
     return new KafkaSupervisorReportPayload(
         spec.getId(),
         spec.getDataSchema().getDataSource(),
         ioConfig.getStream(),
         numPartitions,
         ioConfig.getReplicas(),
         ioConfig.getTaskDuration().getMillis() / 1000,
-        includeOffsets ? latestSequenceFromStream : null,
+        includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null,

Review Comment:
   We're replacing `latestSequenceFromStream` with `offsetSnapshotRef` right? 
Maybe remove `latestSequenceFromStream` as class level variable?



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########


Review Comment:
   Have you considered adding some more complicated test cases to 
`KafkaSupervisorTest`? I'm not sure if it's feasible, thinking maybe we could 
use recordSupplier to mock the end_offset, indexerMetadataStorageCoordinator to 
mock current_offset, and test the lags the system emits. I'm not very familiar 
with this part of code base, so could be wrong. 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -526,6 +539,12 @@ protected void updatePartitionLagFromStream()
 
       latestSequenceFromStream =
           
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, 
recordSupplier::getPosition));
+
+      OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new 
OffsetSnapshot<>(

Review Comment:
   IIUC, we might want to get the current offsets before trying to get the end 
offset, to minimize the chance of current_offset > end_offset, i.e. move line 
544 to 521. I would not call this an atomic update since they're two separate 
processes, but may be worth call out this in the javadoc for `OffsetSnapshot`, 
this ensures end_offset > current_offset, and and reduce the issue of negative 
lag...



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+  {
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
currentOffsets;
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+    public OffsetSnapshot(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+        @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+    )
+    {
+      this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+      this.endOffsets = toImmutableOffsetMap(endOffsets);
+    }
+
+    private ImmutableMap<PartitionIdType, SequenceOffsetType> 
toImmutableOffsetMap(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> input
+    )
+    {
+      if (input == null || input.isEmpty()) {
+        return ImmutableMap.of();
+      }
+
+      return input.entrySet().stream()
+                  .filter(e -> e.getValue() != null)
+                  .collect(ImmutableMap.toImmutableMap(
+                      Map.Entry::getKey,
+                      Map.Entry::getValue
+                  ));
+    }
+
+    public ImmutableMap<PartitionIdType, SequenceOffsetType> 
getCurrentOffsets()
+    {
+      return currentOffsets;
+    }
+
+    public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+    {
+      return endOffsets;
+    }
+  }
+
+  protected final AtomicReference<OffsetSnapshot<PartitionIdType, 
SequenceOffsetType>> offsetSnapshotRef =

Review Comment:
   Is there any specific reason why you decided to put `offsetSnapshotRef` in 
`SeekableStreamSupervisor` instead of `KafkaSupervisor`? Maybe it's more 
suitable there for the sake of encapsulation.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>

Review Comment:
   Worth adding some javadoc explaining the decision of putting current_offset 
and end_offset in one class, how it might affect the lag metrics etc... 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -261,14 +261,15 @@ protected 
List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEnt
   @Override
   protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
   {
-    Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets();
 
-    if (latestSequenceFromStream == null) {
+    if (endOffsets == null) {

Review Comment:
   same here, this be null?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,13 +295,17 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences()
   {
-    if (latestSequenceFromStream == null) {
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> currentOffsets = 
offsetSnapshot.getCurrentOffsets();
+    Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets();
+
+    if (endOffsets == null) {
       return Collections.emptyMap();
     }
 
-    return latestSequenceFromStream
+    return endOffsets

Review Comment:
   Can `e.getValue()` be null? I thought you have removed all null values from 
the map. Also, we could use `currentOffsets.getOrDefault` instead of the 
nullable stuff. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to