Copilot commented on code in PR #18301:
URL: https://github.com/apache/druid/pull/18301#discussion_r2218552394
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3992,31 +3999,46 @@ private OrderedSequenceNumber<SequenceOffsetType>
getOffsetFromStorageForPartiti
if (sequence != null) {
log.debug("Getting sequence [%s] from metadata storage for partition
[%s]", sequence, partition);
if (!taskTuningConfig.isSkipSequenceNumberAvailabilityCheck()) {
- if (!checkOffsetAvailability(partition, sequence)) {
- if (taskTuningConfig.isResetOffsetAutomatically()) {
+ // Check if current sequence is less than or equal to
earliestSequenceNumber in the stream
+ SequenceOffsetType earliestSequenceNumber =
getOffsetFromStreamForPartition(partition, true);
+ boolean isOffsetAvailable = earliestSequenceNumber != null
+ &&
makeSequenceNumber(sequence).isAvailableWithEarliest(makeSequenceNumber(earliestSequenceNumber));
+
+ if (!isOffsetAvailable) {
+ if (taskTuningConfig.isResetOffsetAutomatically() &&
earliestSequenceNumber != null) {
resetInternal(
- createDataSourceMetaDataForReset(ioConfig.getStream(),
ImmutableMap.of(partition, sequence))
+ createDataSourceMetaDataForReset(ioConfig.getStream(),
+ // Reset to the earliest
offset for the partition
+ ImmutableMap.of(partition,
earliestSequenceNumber)),
+ // auto reset flag
+ true
);
- throw new StreamException(
- new ISE(
- "Previous sequenceNumber [%s] is no longer available for
partition [%s] - automatically resetting"
- + " sequence",
- sequence,
- partition
- )
+ log.makeAlert("Offsets were reset automatically, potential data
duplication or loss")
+ .addData("dataSource", this.dataSource)
+ .addData("partition", partition)
+ .addData("offset", sequence.toString())
+ .addData("newOffset", earliestSequenceNumber.toString())
+ .emit();
+
+ // Return the earliest offset as the new sequence number
+ return makeSequenceNumber(
+ earliestSequenceNumber,
+ useExclusiveStartSequenceNumberForNonFirstSequence()
);
} else {
throw new StreamException(
new ISE(
- "Previous sequenceNumber [%s] is no longer available for
partition [%s]. You can clear the previous"
+ "Previous sequenceNumber [%s] is no longer available for
partition [%s] which now has the least sequence number [%s]. You can clear the
previous"
+ " sequenceNumber and start reading from a valid message
by using the supervisor's reset API.",
- sequence,
+ sequence.toString(),
+ earliestSequenceNumber == null ? "null" :
earliestSequenceNumber.toString(),
partition
Review Comment:
The error message construction places the `earliestSequenceNumber` parameter
in the wrong position. Based on the message format, it should be:
`sequence.toString(), partition, earliestSequenceNumber == null ? "null" :
earliestSequenceNumber.toString()`
```suggestion
partition,
earliestSequenceNumber == null ? "null" :
earliestSequenceNumber.toString()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1900,7 +1903,8 @@ public void resetInternal(DataSourceMetadata
dataSourceMetadata)
if (currentMetadata == null) {
metadataUpdateSuccess = true;
} else {
- final DataSourceMetadata newMetadata =
currentMetadata.minus(resetMetadata);
+ final DataSourceMetadata newMetadata =
currentMetadata.plus(resetMetadata);
Review Comment:
This line changes the logic from `minus(resetMetadata)` to
`plus(resetMetadata)`, which fundamentally alters how metadata is combined
during reset operations. This appears to be a breaking change that could cause
incorrect offset handling. Please verify this change is intentional and correct.
```suggestion
final DataSourceMetadata newMetadata =
currentMetadata.minus(resetMetadata);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3992,31 +3999,46 @@ private OrderedSequenceNumber<SequenceOffsetType>
getOffsetFromStorageForPartiti
if (sequence != null) {
log.debug("Getting sequence [%s] from metadata storage for partition
[%s]", sequence, partition);
if (!taskTuningConfig.isSkipSequenceNumberAvailabilityCheck()) {
- if (!checkOffsetAvailability(partition, sequence)) {
- if (taskTuningConfig.isResetOffsetAutomatically()) {
+ // Check if current sequence is less than or equal to
earliestSequenceNumber in the stream
+ SequenceOffsetType earliestSequenceNumber =
getOffsetFromStreamForPartition(partition, true);
+ boolean isOffsetAvailable = earliestSequenceNumber != null
+ &&
makeSequenceNumber(sequence).isAvailableWithEarliest(makeSequenceNumber(earliestSequenceNumber));
+
+ if (!isOffsetAvailable) {
+ if (taskTuningConfig.isResetOffsetAutomatically() &&
earliestSequenceNumber != null) {
resetInternal(
- createDataSourceMetaDataForReset(ioConfig.getStream(),
ImmutableMap.of(partition, sequence))
+ createDataSourceMetaDataForReset(ioConfig.getStream(),
+ // Reset to the earliest
offset for the partition
+ ImmutableMap.of(partition,
earliestSequenceNumber)),
+ // auto reset flag
+ true
);
- throw new StreamException(
- new ISE(
- "Previous sequenceNumber [%s] is no longer available for
partition [%s] - automatically resetting"
- + " sequence",
- sequence,
- partition
- )
+ log.makeAlert("Offsets were reset automatically, potential data
duplication or loss")
+ .addData("dataSource", this.dataSource)
+ .addData("partition", partition)
+ .addData("offset", sequence.toString())
+ .addData("newOffset", earliestSequenceNumber.toString())
+ .emit();
+
+ // Return the earliest offset as the new sequence number
+ return makeSequenceNumber(
+ earliestSequenceNumber,
+ useExclusiveStartSequenceNumberForNonFirstSequence()
);
} else {
throw new StreamException(
new ISE(
- "Previous sequenceNumber [%s] is no longer available for
partition [%s]. You can clear the previous"
+ "Previous sequenceNumber [%s] is no longer available for
partition [%s] which now has the least sequence number [%s]. You can clear the
previous"
Review Comment:
The error message refers to 'least sequence number' but should use 'earliest
sequence number' to be consistent with the variable name
`earliestSequenceNumber` used in the code.
```suggestion
"Previous sequenceNumber [%s] is no longer available for
partition [%s] which now has the earliest sequence number [%s]. You can clear
the previous"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]