FrankChen021 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3266159633


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java:
##########
@@ -336,7 +336,7 @@ protected OrderedSequenceNumber<String> 
makeSequenceNumber(String seq, boolean i
   }
 
   @Override
-  protected void updatePartitionLagFromStream()
+  public void updatePartitionLagFromStream()

Review Comment:
   [P2] Kinesis cannot provide backfill end offsets
   
   The new manager path calls `updatePartitionLagFromStream()` and then 
`getLatestSequencesFromStream()`, but Kinesis only updates time lag here and 
does not override `getLatestSequencesFromStream()`, so it inherits the base 
empty map. Any Kinesis supervisor that passes the earlier checks will fail with 
empty latest offsets instead of starting a backfill.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +399,117 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
+      throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
+    }
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    // Verify useEarliestOffset is false
+    if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+      throw new IAE("Reset with skipped offsets is not supported when 
useEarliestOffset is true.");
+    }
+
+    // Verify useConcurrentLocks is enabled
+    final Map<String, Object> context = streamSpec.getContext();
+    if (context == null || 
!Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {
+      throw new IAE(
+          "Backfill tasks require 'useConcurrentLocks' to be set to true in 
the supervisor context to allow concurrent writes with the main supervisor 
tasks"
+      );
+    }
+
+    // We need an active recordSupplier to query the latest offsets from the 
stream
+    if (supervisorPair.lhs.getState() != 
SupervisorStateManager.BasicState.RUNNING) {
+      throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a 
reset and backfill", id);
+    }
+
+    log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+    streamSupervisor.updatePartitionLagFromStream();
+    Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+    log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+    Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+    // Validate that we successfully retrieved offsets
+    if (endOffsets == null || endOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get latest offsets from stream 
for supervisor[%s]", id);
+    }
+    if (startOffsets == null || startOffsets.isEmpty()) {
+      throw new ISE("Skipping reset: Failed to get checkpointed offsets for 
supervisor[%s]", id);
+    }
+
+    log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+    DataSourceMetadata resetMetadata = 
streamSupervisor.createDataSourceMetaDataForReset(
+        streamSupervisor.getIoConfig().getStream(),
+        endOffsets
+    );
+
+    streamSupervisor.resetOffsets(resetMetadata);

Review Comment:
   [P1] Reset can skip data before backfill is guaranteed
   
   `resetOffsets` only enqueues a `ResetOffsetsNotice`; it does not 
synchronously update metadata. This code queues the main supervisor reset 
before the bounded config is fully built and before 
`createOrUpdateAndStartSupervisor` succeeds. If any later step fails, the 
queued reset can still advance the live supervisor to latest offsets with no 
backfill supervisor, losing the skipped range this endpoint is meant to 
preserve.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +399,117 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata resetData
     return true;
   }
 
+  /**
+   * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
+   * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
+   *
+   * @param id               supervisor ID
+   * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
+   * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
+   * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
+   *                                  if {@code useEarliestSequenceNumber} is 
true,
+   *                                  if {@code useConcurrentLocks} is not set 
to true in the supervisor context,
+   *                                  or if the supervisor is not in a RUNNING 
state
+   * @throws IllegalStateException    if the latest or checkpointed offsets 
cannot be retrieved,
+   *                                  or if the backfill spec cannot be 
serialized
+   */
+  public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable 
Integer backfillTaskCount)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(id, "id");
+
+    Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+    if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
+      throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
+    }
+    SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) 
supervisorPair.lhs;
+    SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) 
supervisorPair.rhs;
+
+    // Verify useEarliestOffset is false
+    if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+      throw new IAE("Reset with skipped offsets is not supported when 
useEarliestOffset is true.");
+    }
+
+    // Verify useConcurrentLocks is enabled
+    final Map<String, Object> context = streamSpec.getContext();
+    if (context == null || 
!Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {

Review Comment:
   [P2] Concurrent-lock check rejects valid true contexts
   
   This check only accepts a literal Boolean true under the hard-coded key. 
Other Druid paths in this class parse `Tasks.USE_CONCURRENT_LOCKS` with 
`QueryContexts.getAsBoolean`, which accepts values like string `true`. 
Supervisors whose tasks actually use concurrent locks can therefore be rejected 
by this endpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to