abhishekrb19 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3319137776


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -640,6 +640,50 @@ private Response handleResetRequest(
     );
   }
 
+  @POST
+  @Path("/{id}/resetOffsetsAndBackfill")

Review Comment:
   Thinking about it, the existing `/resetOffsets` API takes offsets and the 
`/reset` offsets simply does a hard reset. I think this API aligns with the 
latter, what do you think about renaming this to `resetToLatestAndBackfill`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    validateResetAndBackfill(id, supervisorPair);
+
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+    streamSupervisor.updatePartitionLagFromStream();
+    Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+    log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+    Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+    if (endOffsets == null || endOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get latest offsets from stream 
for supervisor[%s]", id);
+    }
+    if (startOffsets == null || startOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get checkpointed offsets for 
supervisor[%s]", id);
+    }
+
+    String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + 
"_backfill");
+
+    try {
+      Map<String, Object> normalizedStartOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+      Map<String, Object> normalizedEndOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+      BoundedStreamConfig boundedStreamConfig = new 
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+      SupervisorSpec backfillSpec = createBackfillSpec(streamSpec, 
backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+      createOrUpdateAndStartSupervisor(backfillSpec);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE(e, "Failed to create backfill supervisor spec for 
supervisor[%s]", id);
+    }
+
+    log.info("Started backfill supervisor[%s] for supervisor[%s]", 
backfillSupervisorId, id);
+
+    log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+    DataSourceMetadata resetMetadata = 
streamSupervisor.createDataSourceMetaDataForReset(
+        streamSupervisor.getIoConfig().getStream(),
+        endOffsets
+    );
+
+    streamSupervisor.resetOffsets(resetMetadata);
+
+    // Reset autoscaler if present
+    SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+    if (autoscaler != null) {
+      autoscaler.reset();
+    }
+
+    return ImmutableMap.of(
+        "id", id,
+        "backfillSupervisorId", backfillSupervisorId
+    );
+  }
+
+  private void validateResetAndBackfill(String id, Pair<Supervisor, 
SupervisorSpec> supervisorPair)
+  {
+    if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
+      throw new IAE("Supervisor[%s] is not a streaming supervisor", id);
+    }
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+      throw new IAE("Reset with skipped offsets is not supported when 
useEarliestOffset is true.");
+    }
+
+    if (!specHasConcurrentLocks(streamSpec)) {
+      throw new IAE(
+          "Backfill tasks require 'useConcurrentLocks' to be set to true in 
the supervisor context to allow concurrent writes with the main supervisor 
tasks"
+      );
+    }
+
+    if (supervisorPair.lhs.getState() != 
SupervisorStateManager.BasicState.RUNNING) {
+      throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a 
reset and backfill", id);
+    }
+  }
+
+  SupervisorSpec createBackfillSpec(
+      SeekableStreamSupervisorSpec sourceSpec,
+      String backfillSupervisorId,
+      BoundedStreamConfig boundedStreamConfig,
+      @Nullable Integer backfillTaskCount
+  ) throws JsonProcessingException
+  {
+    ObjectNode specNode = jsonMapper.valueToTree(sourceSpec);
+    specNode.put("id", backfillSupervisorId);
+    ObjectNode ioConfigNode = (ObjectNode) 
specNode.path("spec").path("ioConfig");
+    ioConfigNode.set("boundedStreamConfig", 
jsonMapper.valueToTree(boundedStreamConfig));
+    if (backfillTaskCount != null) {
+      ioConfigNode.put("taskCount", backfillTaskCount);
+    }
+    return jsonMapper.treeToValue(specNode, SupervisorSpec.class);
+  }

Review Comment:
   Instead of parsing the json tree here, what do you think about making this 
an abstract method on the `SeekableStreamSupervisorSpec` itself that would 
create a new spec based on the `sourceSpec` with the `boundedStreamConfig` 
populated? (similar to `createSuspendedSpec()` and `createRunningSpec()`)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    validateResetAndBackfill(id, supervisorPair);
+
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+    streamSupervisor.updatePartitionLagFromStream();
+    Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+    log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+    Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+    if (endOffsets == null || endOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get latest offsets from stream 
for supervisor[%s]", id);
+    }
+    if (startOffsets == null || startOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get checkpointed offsets for 
supervisor[%s]", id);
+    }
+
+    String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + 
"_backfill");
+
+    try {
+      Map<String, Object> normalizedStartOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+      Map<String, Object> normalizedEndOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+      BoundedStreamConfig boundedStreamConfig = new 
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+      SupervisorSpec backfillSpec = createBackfillSpec(streamSpec, 
backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+      createOrUpdateAndStartSupervisor(backfillSpec);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE(e, "Failed to create backfill supervisor spec for 
supervisor[%s]", id);
+    }
+
+    log.info("Started backfill supervisor[%s] for supervisor[%s]", 
backfillSupervisorId, id);
+
+    log.info("Resetting supervisor[%s] metadata to latest offsets", id);

Review Comment:
   I know this can be determined from the supervisor spec directly, but I think 
it’d be nice if we could also log the specific start/end offsets computed for 
the backfill supervisor so folks can audit things more easily if needed?
   
   Looks like there are logs for the reset path already: `log.info("Posting 
ResetOffsetsNotice with reset dataSource metadata[%s]", 
resetDataSourceMetadata);`



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    validateResetAndBackfill(id, supervisorPair);
+
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+    streamSupervisor.updatePartitionLagFromStream();
+    Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+    log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+    Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+    if (endOffsets == null || endOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get latest offsets from stream 
for supervisor[%s]", id);
+    }
+    if (startOffsets == null || startOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get checkpointed offsets for 
supervisor[%s]", id);
+    }
+
+    String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + 
"_backfill");
+
+    try {
+      Map<String, Object> normalizedStartOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+      Map<String, Object> normalizedEndOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+      BoundedStreamConfig boundedStreamConfig = new 
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+      SupervisorSpec backfillSpec = createBackfillSpec(streamSpec, 
backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+      createOrUpdateAndStartSupervisor(backfillSpec);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE(e, "Failed to create backfill supervisor spec for 
supervisor[%s]", id);
+    }
+
+    log.info("Started backfill supervisor[%s] for supervisor[%s]", 
backfillSupervisorId, id);
+
+    log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+    DataSourceMetadata resetMetadata = 
streamSupervisor.createDataSourceMetaDataForReset(
+        streamSupervisor.getIoConfig().getStream(),
+        endOffsets
+    );
+
+    streamSupervisor.resetOffsets(resetMetadata);
+
+    // Reset autoscaler if present
+    SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+    if (autoscaler != null) {
+      autoscaler.reset();
+    }
+
+    return ImmutableMap.of(
+        "id", id,
+        "backfillSupervisorId", backfillSupervisorId
+    );
+  }
+
+  private void validateResetAndBackfill(String id, Pair<Supervisor, 
SupervisorSpec> supervisorPair)
+  {
+    if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {

Review Comment:
   nit: maybe make this invoke `requireStreamSupervisor()` for consistency with 
the other manager operations



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    validateResetAndBackfill(id, supervisorPair);
+
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;

Review Comment:
   nit: maybe get the `streamSupervisor` and `streamSpec` once with the type 
casted and pass it down to `validateResetAndBackfill` as well since it requires 
it



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -631,4 +726,29 @@ private SupervisorSpec getSpec(String id)
       return supervisor == null ? null : supervisor.rhs;
     }
   }
+
+  /**
+   * Returns true if the spec's context enables concurrent (append) locks, 
accepting both
+   * {@code useConcurrentLocks: true} (or any truthy string) and {@code 
taskLockType: APPEND}.
+   */
+  private static boolean specHasConcurrentLocks(SeekableStreamSupervisorSpec 
spec)
+  {
+    Map<String, Object> context = spec.getContext();
+    if (context == null) {
+      return false;

Review Comment:
   Looks like the old code was effectively using this variable, which is 
currently set to false:
   
   ```suggestion
         return Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
   ```
   
   If the default changes someday, the different code flows would automatically 
pick it up
   



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a `SeekableStreamSupervisor`.
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.

Review Comment:
   Ah yeah, I was thinking about this requirement: `The supervisor's 
useEarliestSequenceNumber property must be false`
   
   Thinking about it more, ingesting from earliest + backfill doesn’t make much 
sense for the generic reset case, so this seems like a useful guardrail.



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. 
Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+**Duplicate ingestion notice:** The main supervisor is not quiesced before the 
reset. This means duplicate data can occur in two ways:
+- **Backfill overlap:** Any tasks that were in-flight at the time of the reset 
may publish segments covering part of the backfill range before being shut down.
+- **Reset race:** If a task checkpoint is written to the metadata store 
between when this endpoint captures the current offsets and when it applies the 
reset, that checkpoint can be overwritten, causing the main supervisor to 
re-ingest already-processed data.
+
+Both windows are narrow in practice, but cannot be fully eliminated without 
manually suspending the main supervisor before calling this endpoint and 
waiting for all pending tasks to complete.

Review Comment:
   This seems reasonable to me. If an exactly-once guarantee is truly required, 
operators can still perform these steps manually, right?
   
   I’m not sure what it would take to fully bake suspend + handoff semantics 
into this API, but that’s something we can evolve in the future if needed. In 
the interim, as the docs call out, operators can still:
   
   suspend supervisor → wait for tasks to checkpoint and complete → kick off 
backfill supervisor with checkpoints → reset main supervisor
   
   (Fwiw, `resetOffsetsAutomatically` and hard resets have similar caveats 
around exactly-once semantics)



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume 
reading from `{"0": 100, "1": 1
   ```
 </details>
 
+### Reset offsets and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. 
Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new 
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to 
catch it up to the latest offsets without losing the skipped data. The main 
supervisor resumes ingesting from the latest offsets, while the backfill 
supervisor processes the range from the previously checkpointed offsets up to 
the latest offsets at the time of the reset.
+
+**Duplicate ingestion notice:** The main supervisor is not quiesced before the 
reset. This means duplicate data can occur in two ways:
+- **Backfill overlap:** Any tasks that were in-flight at the time of the reset 
may publish segments covering part of the backfill range before being shut down.
+- **Reset race:** If a task checkpoint is written to the metadata store 
between when this endpoint captures the current offsets and when it applies the 
reset, that checkpoint can be overwritten, causing the main supervisor to 
re-ingest already-processed data.
+
+Both windows are narrow in practice, but cannot be fully eliminated without 
manually suspending the main supervisor before calling this endpoint and 
waiting for all pending tasks to complete.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a [streaming supervisor](../ingestion/supervisor.md).
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.
+- The supervisor context must have `useConcurrentLocks` set to `true` to allow 
the backfill supervisor's tasks to write concurrently with the main 
supervisor's tasks.
+- The supervisor must be in a `RUNNING` state.
+
+The backfill supervisor has the same configuration as the source supervisor 
except for its ID, which takes the form 
`{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which 
is set to the skipped offset range. If `backfillTaskCount` is specified, it 
overrides the `taskCount` for the backfill supervisor only.
+
+#### URL
+
+`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetOffsetsAndBackfill`
+
+#### Query parameters
+
+| Parameter | Type | Description | Default |
+|---------|---------|---------|---------|
+| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill 
supervisor. If not specified, inherits `taskCount` from the source supervisor. 
| None |
+
+#### Responses
+
+<Tabs>
+
+<TabItem value="5" label="200 SUCCESS">
+
+
+*Successfully reset and started backfill supervisor*
+
+</TabItem>
+<TabItem value="6" label="400 BAD REQUEST">
+
+
+*Supervisor does not meet requirements (wrong type, 
`useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or 
supervisor not RUNNING)*
+
+</TabItem>
+<TabItem value="7" label="404 NOT FOUND">
+
+
+*Invalid supervisor ID*
+
+</TabItem>
+<TabItem value="8" label="500 SERVER ERROR">
+
+
+*Failed to retrieve stream offsets or serialize the backfill spec*
+
+</TabItem>
+</Tabs>
+
+---
+
+#### Sample request
+
+The following example resets a supervisor named `social_media` and starts a 
backfill supervisor with 2 tasks.

Review Comment:
   For follow-up, what do you think about wiring this API to the web-console?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to