abhishekrb19 commented on code in PR #19477:
URL: https://github.com/apache/druid/pull/19477#discussion_r3277844036
##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume
reading from `{"0": 100, "1": 1
```
</details>
+### Reset offsets and start a backfill supervisor
+
+Resets the supervisor to the latest available stream offsets and starts a new
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to
catch it up to the latest offsets without losing the skipped data. The main
supervisor resumes ingesting from the latest offsets, while the backfill
supervisor processes the range from the previously checkpointed offsets up to
the latest offsets at the time of the reset.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a `SeekableStreamSupervisor`.
Review Comment:
We could point to the supervisor
[docs](https://druid.apache.org/docs/latest/ingestion/supervisor/) - an
implementation of this should suffice (instead of the java class name which is
not documented elsewhere afaik)
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java:
##########
@@ -1068,6 +1079,205 @@ public void
test_isAnotherTaskGroupPublishingToPartitions()
);
}
+ @Test
+ public void testResetSupervisorAndBackfill() throws Exception
Review Comment:
Thanks for the tests @aho135! Do you think it’d be possible to extend the
embedded test `KafkaBoundedSupervisorTest` you added to include some end-to-end
coverage for this API as well?
##########
docs/api-reference/supervisor-api.md:
##########
@@ -3539,6 +3539,101 @@ when the supervisor's tasks restart, they resume
reading from `{"0": 100, "1": 1
```
</details>
+### Reset offsets and start a backfill supervisor
+
+Resets the supervisor to the latest available stream offsets and starts a new
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to
catch it up to the latest offsets without losing the skipped data. The main
supervisor resumes ingesting from the latest offsets, while the backfill
supervisor processes the range from the previously checkpointed offsets up to
the latest offsets at the time of the reset.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a `SeekableStreamSupervisor`.
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.
Review Comment:
Is this requirement needed? With `useConcurrentLocks` and two parallel
supervisors, I'd imagine this would still work if the offset ranges are
overlapping
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -393,6 +374,115 @@ public boolean resetSupervisor(String id, @Nullable
DataSourceMetadata resetData
return true;
}
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded
backfill supervisor to
+ * process the skipped range from the previously checkpointed offsets up to
the latest offsets.
+ *
+ * @param id supervisor ID
+ * @param backfillTaskCount number of tasks for the backfill supervisor, or
null to inherit from the source spec
+ * @return map with {@code "id"} (the original supervisor ID) and {@code
"backfillSupervisorId"}
+ * @throws IllegalArgumentException if the supervisor is not a {@link
SeekableStreamSupervisor},
+ * if {@code useEarliestSequenceNumber} is
true,
+ * if {@code useConcurrentLocks} is not set
to true in the supervisor context,
+ * or if the supervisor is not in a RUNNING
state
+ * @throws IllegalStateException if the latest or checkpointed offsets
cannot be retrieved,
+ * or if the backfill spec cannot be
serialized
+ */
+ public Map<String, Object> resetSupervisorAndBackfill(String id, @Nullable
Integer backfillTaskCount)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(id, "id");
+
+ Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
+ if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
+ throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
+ }
+ SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor)
supervisorPair.lhs;
+ SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec)
supervisorPair.rhs;
+
+ // Verify useEarliestOffset is false
+ if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+ throw new IAE("Reset with skipped offsets is not supported when
useEarliestOffset is true.");
+ }
+
+ if (!specHasConcurrentLocks(streamSpec)) {
+ throw new IAE(
+ "Backfill tasks require 'useConcurrentLocks' to be set to true in
the supervisor context to allow concurrent writes with the main supervisor
tasks"
+ );
+ }
+
+ // We need an active recordSupplier to query the latest offsets from the
stream
+ if (supervisorPair.lhs.getState() !=
SupervisorStateManager.BasicState.RUNNING) {
+ throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a
reset and backfill", id);
+ }
+
+ log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+ streamSupervisor.updatePartitionLagFromStream();
+ Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+ log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+ Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+ // Validate that we successfully retrieved offsets
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get latest offsets from stream
for supervisor[%s]", id);
+ }
+ if (startOffsets == null || startOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get checkpointed offsets for
supervisor[%s]", id);
+ }
+
Review Comment:
nit: It’ll likely be cleaner to move these validations into their own
function
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java:
##########
@@ -336,7 +336,7 @@ protected OrderedSequenceNumber<String>
makeSequenceNumber(String seq, boolean i
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
Review Comment:
Yeah, I think returning a 5xx that should be good enough
We could probably add a one-line limitation in the docs for completeness
noting that the feature doesn’t apply to Kinesis supervisors ? That can be
removed once that gap is addressed in a future patch depending on when it lands
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]