FrankChen021 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3288574516


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,120 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    validateResetAndBackfill(id, supervisorPair);
+
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+    streamSupervisor.updatePartitionLagFromStream();
+    Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+    log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+    Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+    if (endOffsets == null || endOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get latest offsets from stream 
for supervisor[%s]", id);
+    }
+    if (startOffsets == null || startOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get checkpointed offsets for 
supervisor[%s]", id);
+    }
+
+    String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + 
"_backfill");
+
+    try {
+      Map<String, Object> normalizedStartOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+      Map<String, Object> normalizedEndOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+      BoundedStreamConfig boundedStreamConfig = new 
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+      SupervisorSpec backfillSpec = createBackfillSpec(streamSpec, 
backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+      createOrUpdateAndStartSupervisor(backfillSpec);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE(e, "Failed to create backfill supervisor spec for 
supervisor[%s]", id);
+    }
+
+    log.info("Started backfill supervisor[%s] for supervisor[%s]", 
backfillSupervisorId, id);
+
+    log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+    DataSourceMetadata resetMetadata = 
streamSupervisor.createDataSourceMetaDataForReset(
+        streamSupervisor.getIoConfig().getStream(),
+        endOffsets
+    );
+
+    streamSupervisor.resetOffsets(resetMetadata);

Review Comment:
   [P1] Avoid resetting the main supervisor with stale offsets
   
   The main supervisor keeps ingesting while this method captures 
`endOffsets`/`startOffsets`, creates the backfill supervisor, and only then 
calls `resetOffsets`. If a running task checkpoints after the captured 
`endOffsets` but before this call, `resetOffsetsInternal` builds 
`currentMetadata.plus(resetMetadata)`, and `plus` overwrites the newer 
partition value with the stale captured end offset. That rolls the main 
supervisor checkpoint backward and can reingest rows that were already 
published. The reset should be conditional on metadata still matching the 
captured range, retried with fresh offsets, or performed while the main 
supervisor is quiesced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to