wuguowei1994 commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2588761310
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+ this.endOffsets = toImmutableOffsetMap(endOffsets);
+ }
+
+ private ImmutableMap<PartitionIdType, SequenceOffsetType>
toImmutableOffsetMap(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> input
+ )
+ {
+ if (input == null || input.isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ return input.entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(ImmutableMap.toImmutableMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue
+ ));
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType>
getCurrentOffsets()
+ {
+ return currentOffsets;
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+ {
+ return endOffsets;
+ }
+ }
+
+ protected final AtomicReference<OffsetSnapshot<PartitionIdType,
SequenceOffsetType>> offsetSnapshotRef =
Review Comment:
@cecemei
`SeekableStreamSupervisor` is the parent class, and it has three subclasses:
`KafkaSupervisor`, `KinesisSupervisor`, and `RabbitStreamSupervisor`.
The negative-lag issue mentioned in my PR doesn’t only exist in Kafka — it
should also be present in the other two subclasses.
When writing the code, I had the same concern. If we put `offsetSnapshotRef`
directly into the `KafkaSupervisor` subclass, then in the future, if someone
wants to fix the same issue in `KinesisSupervisor` or `RabbitStreamSupervisor`,
they would need to redefine `offsetSnapshotRef` in each subclass.
**The ideal approach would be:**
1. Define `offsetSnapshotRef` in the parent class.
2. Fix the negative-lag issue in all three subclasses.
If needed, I can update the changes in this way. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]