abhishekrb19 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3277844036


##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a `SeekableStreamSupervisor`.

Review Comment:
   We could point to the supervisor 
[docs](https://druid.apache.org/docs/latest/ingestion/supervisor/) - an 
implementation of this should suffice (instead of the java class name which is 
not documented elsewhere afaik)



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java:
##########
@@ -1068,6 +1079,205 @@ public void 
test_isAnotherTaskGroupPublishingToPartitions()
     );
   }
 
+  @Test
+  public void testResetSupervisorAndBackfill() throws Exception

Review Comment:
   Thanks for the tests @aho135! Do you think it’d be possible to extend the 
embedded test `KafkaBoundedSupervisorTest` you added to include some end-to-end 
coverage for this API as well?



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a `SeekableStreamSupervisor`.
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.

Review Comment:
   Is this requirement needed? With `useConcurrentLocks` and two parallel 
supervisors, I'd imagine this would still work if the offset ranges are 
overlapping



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,115 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
+      throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
+    }
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    // Verify useEarliestOffset is false
+    if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+      throw new IAE("Reset with skipped offsets is not supported when 
useEarliestOffset is true.");
+    }
+
+    if (!specHasConcurrentLocks(streamSpec)) {
+      throw new IAE(
+          "Backfill tasks require 'useConcurrentLocks' to be set to true in 
the supervisor context to allow concurrent writes with the main supervisor 
tasks"
+      );
+    }
+
+    // We need an active recordSupplier to query the latest offsets from the 
stream
+    if (supervisorPair.lhs.getState() != 
SupervisorStateManager.BasicState.RUNNING) {
+      throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a 
reset and backfill", id);
+    }
+
+    log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+    streamSupervisor.updatePartitionLagFromStream();
+    Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+    log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+    Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+    // Validate that we successfully retrieved offsets
+    if (endOffsets == null || endOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get latest offsets from stream 
for supervisor[%s]", id);
+    }
+    if (startOffsets == null || startOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get checkpointed offsets for 
supervisor[%s]", id);
+    }
+

Review Comment:
   nit: It’ll likely be cleaner to move these validations into their own 
function



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java:
##########
@@ -336,7 +336,7 @@ protected OrderedSequenceNumber<String> 
makeSequenceNumber(String seq, boolean i
   }
 
   @Override
-  protected void updatePartitionLagFromStream()
+  public void updatePartitionLagFromStream()

Review Comment:
   Yeah, I think returning a 5xx that should be good enough
   
   We could probably add a one-line limitation in the docs for completeness 
noting that the feature doesn’t apply to Kinesis supervisors ? That can be 
removed once that gap is addressed in a future patch depending on when it lands



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to