aho135 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3319702832


##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. 
Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+**Duplicate ingestion notice:** The main supervisor is not quiesced before the 
reset. This means duplicate data can occur in two ways:
+- **Backfill overlap:** Any tasks that were in-flight at the time of the reset 
may publish segments covering part of the backfill range before being shut down.
+- **Reset race:** If a task checkpoint is written to the metadata store 
between when this endpoint captures the current offsets and when it applies the 
reset, that checkpoint can be overwritten, causing the main supervisor to 
re-ingest already-processed data.
+
+Both windows are narrow in practice, but cannot be fully eliminated without 
manually suspending the main supervisor before calling this endpoint and 
waiting for all pending tasks to complete.

Review Comment:
   The current code requires the Supervisor to be in RUNNING state. To get this 
working with SUSPENDED we can drop that requirement. The usage is a bit clunky 
though. It would look something like
   
   1. Suspend the supervisor and wait for all tasks to complete
   2. Call resetToLatestAndBackfill. Since the backfill supervisor reuses the 
main supervisor spec it also gets created in SUSPENDED state
   3. Manually resume both supervisors
   
   Some minor modifications also need to be made to Kafka's 
`updatePartitionLagFromStream`. This function doesn't work in isolation as it 
requires first calling `recordSupplier.assign(partitions)` otherwise 
`recordSupplier.seekToLatest(partitions)` will throw an 
`IllegalStateException`.  Currently this is handled by 
`assignRecordSupplierToPartitionIds()` in SeekableStreamSupervisor. Similar 
tweaks may be needed for Rabbit/Kinesis. I thought to leave this as a follow up 
if it's needed in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to