abhishekrb19 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3326938872


##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. 
Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+**Duplicate ingestion notice:** The main supervisor is not quiesced before the 
reset. This means duplicate data can occur in two ways:
+- **Backfill overlap:** Any tasks that were in-flight at the time of the reset 
may publish segments covering part of the backfill range before being shut down.
+- **Reset race:** If a task checkpoint is written to the metadata store 
between when this endpoint captures the current offsets and when it applies the 
reset, that checkpoint can be overwritten, causing the main supervisor to 
re-ingest already-processed data.
+
+Both windows are narrow in practice, but cannot be fully eliminated without 
manually suspending the main supervisor before calling this endpoint and 
waiting for all pending tasks to complete.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a [streaming supervisor](../ingestion/supervisor.md).
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.
+- The supervisor context must have `useConcurrentLocks` set to `true` to allow 
the backfill supervisor's tasks to write concurrently with the main 
supervisor's tasks.
+- The supervisor must be in a `RUNNING` state.
+
+The backfill supervisor has the same configuration as the source supervisor 
except for its ID, which takes the form 
`{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which 
is set to the skipped offset range. If `backfillTaskCount` is specified, it 
overrides the `taskCount` for the backfill supervisor only.
+
+#### URL
+
+`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetToLatestAndBackfill`
+
+#### Query parameters
+
+| Parameter | Type | Description | Default |
+|---------|---------|---------|---------|
+| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill 
supervisor. If not specified, inherits `taskCount` from the source supervisor. 
| None |

Review Comment:
   Since there's an implicit default:
   ```suggestion
   | `backfillTaskCount` | Integer | Number of parallel tasks for the backfill 
supervisor. | Defaults to `taskCount` from the source supervisor if not 
specified |
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -640,6 +640,50 @@ private Response handleResetRequest(
     );
   }
 
+  @POST
+  @Path("/{id}/resetToLatestAndBackfill")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response resetToLatestAndBackfill(
+      @PathParam("id") final String id,
+      @QueryParam("backfillTaskCount") @Nullable final Integer 
backfillTaskCount
+  )
+  {
+    return handleResetAndBackfill(id, backfillTaskCount);
+  }
+
+  private Response handleResetAndBackfill(final String id, @Nullable final 
Integer backfillTaskCount)

Review Comment:
   Perhaps also rename all references of `handleResetAndBackfill` to 
`handleResetToLatestAndBackfill`?
   



##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -197,6 +197,15 @@ ListenableFuture<CloseableIterator<TaskStatusPlus>> 
taskStatuses(
    */
   ListenableFuture<Map<String, String>> terminateSupervisor(String 
supervisorId);
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor.
+   * <p>
+   * API: {@code POST 
/druid/indexer/v1/supervisor/<id>/resetToLatestAndBackfill}
+   *
+   * @return Map containing "id" and "backfillSupervisorId"
+   */
+  ListenableFuture<Map<String, Object>> resetSupervisorAndBackfill(String 
supervisorId);

Review Comment:
   Same here & all implementations - it'd be good to align on the naming (reset 
vs resetToLatest)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +373,112 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.

Review Comment:
   Do you think it’s worth leaving the caveat as a comment here in the javadoc 
or before the reset operation is invoked that for correctness the supervisor 
need to be suspended/tasks checkpoint?
   
   In case someone wants to take a stab at it in the future, they’d have a 
reference point on where to adjust things



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. 
Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.

Review Comment:
   I think the this section's title could also be:
   `Reset offset to latest and start a backfill supervisor`
   



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -292,6 +292,47 @@ public void 
test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceeds
     Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");
   }
 
+  @Test
+  public void test_resetSupervisorAndBackfill()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Create a streaming supervisor with concurrent locks and 
withUseEarliestSequenceNumber=false
+    final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
+        .withContext(Map.of("useConcurrentLocks", true))
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withUseEarliestSequenceNumber(false)
+        )
+        .build(dataSource, topic);
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    waitForSupervisorDetailedState(supervisor.getId(), "RUNNING");
+
+    final int totalRecords = publish1kRecords(topic, false);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Reset the main supervisor and spin up a backfill supervisor.
+    // Since all records are already ingested before the call, the backfill
+    // supervisor will complete immediately without ingesting anything.
+    final Map<String, Object> result = 
cluster.callApi().resetSupervisorAndBackfill(supervisor.getId());

Review Comment:
   Yeah, I’m not sure how to induce lag in this setup easily :) 
   
   In addition to assertions on the supervisor state, it’d be nice if we could 
also assert on the records ingested/offset ranges or something similar even if 
the backfill ends up ingesting 0 records in this case.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -292,6 +292,47 @@ public void 
test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceeds
     Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");
   }
 
+  @Test
+  public void test_resetSupervisorAndBackfill()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Create a streaming supervisor with concurrent locks and 
withUseEarliestSequenceNumber=false
+    final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
+        .withContext(Map.of("useConcurrentLocks", true))
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withUseEarliestSequenceNumber(false)
+        )
+        .build(dataSource, topic);
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    waitForSupervisorDetailedState(supervisor.getId(), "RUNNING");
+
+    final int totalRecords = publish1kRecords(topic, false);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Reset the main supervisor and spin up a backfill supervisor.
+    // Since all records are already ingested before the call, the backfill
+    // supervisor will complete immediately without ingesting anything.
+    final Map<String, Object> result = 
cluster.callApi().resetSupervisorAndBackfill(supervisor.getId());
+    final String backfillSupervisorId = (String) 
result.get("backfillSupervisorId");

Review Comment:
   For completeness, also validate that the primary supervisor ID is also 
included in the response of this API?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +373,112 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+
+    if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {

Review Comment:
   Claude mentioned that this could NPE without a null check after calling 
`supervisors.get(id)`.
   
   I don’t think it can happen in practice bc 
`!manager.getSupervisorIds().contains(id)` in `SupervisorResource `would return 
earlier, but for safety, could we add a null check in the manager itself? Looks 
like the other methods in the manager have similar checks as well



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -173,6 +174,59 @@ protected KafkaSupervisorSpec toggleSuspend(boolean 
suspend)
     );
   }
 
+  @Override
+  public KafkaSupervisorSpec createBackfillSpec(

Review Comment:
   This looks much cleaner - thanks for adding this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to