This is an automated email from the ASF dual-hosted git repository.
aho135 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new cf275580199 feat: resetOffsetsAndBackfill using bounded stream
supervisor (#19477)
cf275580199 is described below
commit cf275580199e93485062a7b1d7e1dbb4e2c6e2bf
Author: aho135 <[email protected]>
AuthorDate: Fri May 29 18:06:27 2026 -0700
feat: resetOffsetsAndBackfill using bounded stream supervisor (#19477)
* resetOffsetsAndBackfill using bounded stream supervisor
* Reject non-positive backfillTaskCount
* Reset supervisor after backfill Supervisor has already been started
* Add helper method specHasConcurrentLocks
* Fix doc reference
* Move validations into helper function
* Add embedded-test for resetSupervisorAndBackfill
* Remove flaky waitUntilPublishedRecordsAreIngested
* Update KafkaBoundedSupervisorTest.java
* Wait for supervisor to be RUNNING
* Use checkpointed offset if > requested reset offset to prevent duplicate
ingestion
* Update KafkaBoundedSupervisorTest.java
* Revert "Use checkpointed offset if > requested reset offset to prevent
duplicate ingestion"
resetOffsetsForwardOnly does not fully close the race it targets (the write
is
still unconditional) and the duplicate scenario it addresses is narrower
than
the overlap case, which cannot be solved without suspending the main
supervisor.
Accepting the limitation and documenting it is preferable to the added
complexity.
This reverts commit 89b5fec25e3a7dc88441d6e995564723557f2312.
* Doc update - duplication notice and Kinesis callout
* Rename endpoint from resetOffsetsAndBackfill to resetToLatestAndBackfill
* Update test name to reflect new endpoint
* Address clean up from review comments
* Log out start/end offsets
* Add abstract createBackfillSpec
* Unit test createBackfillSpec
* Fix deprecation notices
* Rename functions to align with new endpoint name
* Add null check and rename for consistency
---
docs/api-reference/supervisor-api.md | 103 ++++++++
.../indexing/KafkaBoundedSupervisorTest.java | 51 ++++
.../supervisor/RabbitStreamSupervisor.java | 6 +-
.../supervisor/RabbitStreamSupervisorSpec.java | 50 ++++
.../indexing/kafka/supervisor/KafkaSupervisor.java | 10 +-
.../kafka/supervisor/KafkaSupervisorSpec.java | 54 ++++
.../kafka/supervisor/KafkaSupervisorSpecTest.java | 33 +++
.../kinesis/supervisor/KinesisSupervisor.java | 4 +-
.../kinesis/supervisor/KinesisSupervisorSpec.java | 54 ++++
.../overlord/supervisor/SupervisorManager.java | 169 ++++++++++--
.../overlord/supervisor/SupervisorResource.java | 44 ++++
.../supervisor/SeekableStreamSupervisor.java | 10 +-
.../supervisor/SeekableStreamSupervisorSpec.java | 6 +
.../overlord/supervisor/SupervisorManagerTest.java | 290 +++++++++++++++++++++
.../supervisor/SupervisorResourceTest.java | 105 ++++++++
.../SeekableStreamSupervisorSpecTest.java | 10 +
.../SeekableStreamSupervisorStateTest.java | 8 +-
.../SeekableStreamSupervisorTestBase.java | 14 +-
.../indexing/MSQWorkerTaskLauncherRetryTest.java | 6 +
.../druid/rpc/indexing/NoopOverlordClient.java | 6 +
.../apache/druid/rpc/indexing/OverlordClient.java | 9 +
.../druid/rpc/indexing/OverlordClientImpl.java | 17 ++
.../testing/embedded/EmbeddedClusterApis.java | 10 +
23 files changed, 1021 insertions(+), 48 deletions(-)
diff --git a/docs/api-reference/supervisor-api.md
b/docs/api-reference/supervisor-api.md
index d321af14302..8f9c5c36dc5 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume
reading from `{"0": 100, "1": 1
```
</details>
+### Reset offsets to latest and start a backfill supervisor
+
+This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors.
Amazon Kinesis is not supported yet.
+
+Resets the supervisor to the latest available stream offsets and starts a new
bounded backfill supervisor to ingest the data in the skipped range.
+
+This endpoint is useful when a supervisor has fallen behind and you want to
catch it up to the latest offsets without losing the skipped data. The main
supervisor resumes ingesting from the latest offsets, while the backfill
supervisor processes the range from the previously checkpointed offsets up to
the latest offsets at the time of the reset.
+
+**Duplicate ingestion notice:** The main supervisor is not quiesced before the
reset. This means duplicate data can occur in two ways:
+- **Backfill overlap:** Any tasks that were in-flight at the time of the reset
may publish segments covering part of the backfill range before being shut down.
+- **Reset race:** If a task checkpoint is written to the metadata store
between when this endpoint captures the current offsets and when it applies the
reset, that checkpoint can be overwritten, causing the main supervisor to
re-ingest already-processed data.
+
+Both windows are narrow in practice, but cannot be fully eliminated without
manually suspending the main supervisor before calling this endpoint and
waiting for all pending tasks to complete.
+
+The following requirements must be met before calling this endpoint:
+
+- The supervisor must be a [streaming supervisor](../ingestion/supervisor.md).
+- The supervisor's `useEarliestSequenceNumber` property must be `false`.
+- The supervisor context must have `useConcurrentLocks` set to `true` to allow
the backfill supervisor's tasks to write concurrently with the main
supervisor's tasks.
+- The supervisor must be in a `RUNNING` state.
+
+The backfill supervisor has the same configuration as the source supervisor
except for its ID, which takes the form
`{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which
is set to the skipped offset range. If `backfillTaskCount` is specified, it
overrides the `taskCount` for the backfill supervisor only.
+
+#### URL
+
+`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetToLatestAndBackfill`
+
+#### Query parameters
+
+| Parameter | Type | Description | Default |
+|---------|---------|---------|---------|
+| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill
supervisor. | Defaults to `taskCount` from the source supervisor if not
specified |
+
+#### Responses
+
+<Tabs>
+
+<TabItem value="5" label="200 SUCCESS">
+
+
+*Successfully reset and started backfill supervisor*
+
+</TabItem>
+<TabItem value="6" label="400 BAD REQUEST">
+
+
+*Supervisor does not meet requirements (wrong type,
`useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or
supervisor not RUNNING)*
+
+</TabItem>
+<TabItem value="7" label="404 NOT FOUND">
+
+
+*Invalid supervisor ID*
+
+</TabItem>
+<TabItem value="8" label="500 SERVER ERROR">
+
+
+*Failed to retrieve stream offsets or serialize the backfill spec*
+
+</TabItem>
+</Tabs>
+
+---
+
+#### Sample request
+
+The following example resets a supervisor named `social_media` and starts a
backfill supervisor with 2 tasks.
+
+<Tabs>
+
+<TabItem value="9" label="cURL">
+
+
+```shell
+curl --request POST
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2"
+```
+
+</TabItem>
+<TabItem value="10" label="HTTP">
+
+
+```HTTP
+POST
/druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2
HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+```
+
+</TabItem>
+</Tabs>
+
+#### Sample response
+
+<details>
+ <summary>View the response</summary>
+
+ ```json
+{
+ "id": "social_media",
+ "backfillSupervisorId": "social_media_backfill_abcdefgh"
+}
+ ```
+</details>
+
### Terminate a supervisor
Terminates a supervisor and its associated indexing tasks, triggering the
publishing of their segments. When you terminate a supervisor, Druid places a
tombstone marker in the metadata store to prevent reloading on restart.
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
index 7e22d85d9ca..fa184418df5 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java
@@ -292,6 +292,48 @@ public class KafkaBoundedSupervisorTest extends
StreamIndexTestBase
Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(),
"Supervisor state should be UNHEALTHY_SUPERVISOR");
}
+ @Test
+ public void test_resetToLatestAndBackfill()
+ {
+ final String topic = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic, 2);
+
+ // Create a streaming supervisor with concurrent locks and
withUseEarliestSequenceNumber=false
+ final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
+ .withContext(Map.of("useConcurrentLocks", true))
+ .withIoConfig(io -> io
+ .withKafkaInputFormat(new JsonInputFormat(null, null, null, null,
null))
+ .withUseEarliestSequenceNumber(false)
+ )
+ .build(dataSource, topic);
+
+ cluster.callApi().postSupervisor(supervisor);
+
+ waitForSupervisorDetailedState(supervisor.getId(), "RUNNING");
+
+ final int totalRecords = publish1kRecords(topic, false);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ // Reset the main supervisor and spin up a backfill supervisor.
+ // Since all records are already ingested before the call, the backfill
+ // supervisor will complete immediately without ingesting anything.
+ final Map<String, Object> result =
cluster.callApi().resetToLatestAndBackfill(supervisor.getId());
+ Assertions.assertEquals(supervisor.getId(), result.get("id"));
+ final String backfillSupervisorId = (String)
result.get("backfillSupervisorId");
+
+ // Wait for the backfill to finish
+ waitForSupervisorToComplete(backfillSupervisorId);
+
+ // Main supervisor should still be running
+ final SupervisorStatus mainStatus =
cluster.callApi().getSupervisorStatus(supervisor.getId());
+ Assertions.assertEquals("RUNNING", mainStatus.getState());
+ Assertions.assertTrue(mainStatus.isHealthy());
+
+ final SupervisorStatus backfillStatus =
cluster.callApi().getSupervisorStatus(backfillSupervisorId);
+ Assertions.assertEquals("COMPLETED", backfillStatus.getState());
+ Assertions.assertTrue(backfillStatus.isHealthy());
+ }
+
private void waitForSupervisorToComplete(String supervisorId)
{
overlord.latchableEmitter().waitForEvent(
@@ -301,6 +343,15 @@ public class KafkaBoundedSupervisorTest extends
StreamIndexTestBase
);
}
+ private void waitForSupervisorDetailedState(String supervisorId, String
detailedState)
+ {
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("supervisor/count")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+ .hasDimension("detailedState", detailedState)
+ );
+ }
+
private void waitForSupervisorToBeUnhealthy(String supervisorId)
{
overlord.latchableEmitter().waitForEvent(
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 6099105b337..04973a5272f 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -322,7 +322,7 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
}
@Override
- protected RabbitStreamDataSourceMetadata
createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
+ public RabbitStreamDataSourceMetadata
createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
{
return new RabbitStreamDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(topic, map));
}
@@ -408,7 +408,7 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();
@@ -435,7 +435,7 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
}
@Override
- protected Map<String, Long> getLatestSequencesFromStream()
+ public Map<String, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new
HashMap<>();
}
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
index 4a445f6f1c1..4763a949a61 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import
org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -155,6 +156,55 @@ public class RabbitStreamSupervisorSpec extends
SeekableStreamSupervisorSpec
supervisorStateManagerConfig);
}
+ @Override
+ public RabbitStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ RabbitStreamSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ RabbitStreamSupervisorIOConfig backfillIoConfig = new
RabbitStreamSupervisorIOConfig(
+ ioConfig.getStream(),
+ ioConfig.getUri(),
+ ioConfig.getInputFormat(),
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getConsumerProperties(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.getPollTimeout(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ?
ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ?
ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ?
ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getStopTaskCount(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new RabbitStreamSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (RabbitStreamIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig
+ );
+ }
+
@Override
public String toString()
{
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 727eb52db27..5863284cc2d 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -356,7 +356,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
}
@Override
- protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String
topic, Map<KafkaTopicPartition, Long> map)
+ public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String
topic, Map<KafkaTopicPartition, Long> map)
{
return new KafkaDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(topic, map));
}
@@ -548,7 +548,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
* </p>
*/
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
@@ -597,7 +597,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
}
@Override
- protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
+ public Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
@@ -630,7 +630,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
* Gets the offsets as stored in the metadata store. The map returned will
only contain
* offsets from topic partitions that match the current supervisor config
stream. This
* override is needed because in the case of multi-topic, a user could have
updated the supervisor
- * config from single topic to mult-topic, where the new multi-topic pattern
regex matches the
+ * config from single topic to multi-topic, where the new multi-topic
pattern regex matches the
* old config single topic. Without this override, the previously stored
metadata for the single
* topic would be deemed as different from the currently configure stream,
and not be included in
* the offset map returned. This implementation handles these cases
appropriately.
@@ -640,7 +640,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
* updated to single topic or multi-topic depending on the supervisor
config, as needed.
*/
@Override
- protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+ public Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index b607ade1acf..31d3e8fad69 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -36,6 +36,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -173,6 +174,59 @@ public class KafkaSupervisorSpec extends
SeekableStreamSupervisorSpec
);
}
+ @Override
+ public KafkaSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ KafkaSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ KafkaSupervisorIOConfig backfillIoConfig = new KafkaSupervisorIOConfig(
+ ioConfig.getTopic(),
+ ioConfig.getTopicPattern(),
+ ioConfig.getInputFormat(),
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getConsumerProperties(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.getLagAggregator(),
+ ioConfig.getPollTimeout(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ?
ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ?
ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ?
ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getConfigOverrides(),
+ ioConfig.getIdleConfig(),
+ ioConfig.getStopTaskCount(),
+ ioConfig.isEmitTimeLagMetrics(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new KafkaSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (KafkaIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig
+ );
+ }
+
/**
* Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to
ensure that the proposed spec and current spec are either both multi-topic or
both single-topic.
* <p>
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 8879ff6d975..06ca9b64ced 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -564,6 +565,38 @@ public class KafkaSupervisorSpecTest
sourceSpec.validateSpecUpdateTo(validDestSpec);
}
+ @Test
+ public void testCreateBackfillSpec()
+ {
+ KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(new
UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers",
"localhost:9092"))
+ .withTaskCount(3)
+ )
+ .build("testDs", "metrics");
+
+ BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(
+ Map.of("0", 100L, "1", 200L),
+ Map.of("0", 500L, "1", 600L)
+ );
+
+ KafkaSupervisorSpec backfill = (KafkaSupervisorSpec)
spec.createBackfillSpec("backfill-id", boundedStreamConfig, 2);
+
+ Assert.assertEquals("backfill-id", backfill.getId());
+ Assert.assertEquals("testDs",
backfill.getSpec().getDataSchema().getDataSource());
+ Assert.assertEquals("metrics",
backfill.getSpec().getIOConfig().getTopic());
+ Assert.assertEquals(2, backfill.getSpec().getIOConfig().getTaskCount());
+ Assert.assertEquals(boundedStreamConfig,
backfill.getSpec().getIOConfig().getBoundedStreamConfig());
+ }
+
private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
{
KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder()
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 0f91fc0965d..3f1f4034f3c 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -321,7 +321,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
}
@Override
- protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
@@ -336,7 +336,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire
the recordSupplierLock
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 8e661571680..4899337797b 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -193,4 +194,57 @@ public class KinesisSupervisorSpec extends
SeekableStreamSupervisorSpec
supervisorStateManagerConfig
);
}
+
+ @Override
+ public KinesisSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ KinesisSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ KinesisSupervisorIOConfig backfillIoConfig = new KinesisSupervisorIOConfig(
+ ioConfig.getStream(),
+ ioConfig.getInputFormat(),
+ ioConfig.getEndpoint(),
+ null,
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ?
ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ?
ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ?
ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getRecordsPerFetch(),
+ ioConfig.getFetchDelayMillis(),
+ ioConfig.getAwsAssumedRoleArn(),
+ ioConfig.getAwsExternalId(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.isDeaggregate(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new KinesisSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (KinesisIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ awsCredentialsConfig,
+ supervisorStateManagerConfig
+ );
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 52f3cba7fc1..fa7d96634ae 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
@@ -35,8 +37,11 @@ import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -129,33 +134,8 @@ public class SupervisorManager implements
SupervisorStatsProvider
final Supervisor supervisor = entry.getValue().lhs;
final SupervisorSpec supervisorSpec = entry.getValue().rhs;
- boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
- if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
- SeekableStreamSupervisorSpec seekableStreamSupervisorSpec =
(SeekableStreamSupervisorSpec) supervisorSpec;
- Map<String, Object> context =
seekableStreamSupervisorSpec.getContext();
- if (context != null) {
- Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
- Tasks.USE_CONCURRENT_LOCKS,
- context.get(Tasks.USE_CONCURRENT_LOCKS)
- );
- if (useConcurrentLocks == null) {
- TaskLockType taskLockType = QueryContexts.getAsEnum(
- Tasks.TASK_LOCK_TYPE,
- context.get(Tasks.TASK_LOCK_TYPE),
- TaskLockType.class
- );
- if (taskLockType == null) {
- hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
- } else if (taskLockType == TaskLockType.APPEND) {
- hasAppendLock = true;
- } else {
- hasAppendLock = false;
- }
- } else {
- hasAppendLock = useConcurrentLocks;
- }
- }
- }
+ boolean hasAppendLock = supervisorSpec instanceof
SeekableStreamSupervisorSpec
+ &&
specHasConcurrentLocks((SeekableStreamSupervisorSpec) supervisorSpec);
if (supervisor instanceof SeekableStreamSupervisor
&& !supervisorSpec.isSuspended()
@@ -393,6 +373,116 @@ public class SupervisorManager implements
SupervisorStatsProvider
return true;
}
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded
backfill supervisor to
+ * process the skipped range from the previously checkpointed offsets up to
the latest offsets.
+ *
+ * @param id supervisor ID
+ * @param backfillTaskCount number of tasks for the backfill supervisor, or
null to inherit from the source spec
+ * @return map with {@code "id"} (the original supervisor ID) and {@code
"backfillSupervisorId"}
+ * @throws IllegalArgumentException if the supervisor is not a {@link
SeekableStreamSupervisor},
+ * if {@code useEarliestSequenceNumber} is
true,
+ * if {@code useConcurrentLocks} is not set
to true in the supervisor context,
+ * or if the supervisor is not in a RUNNING
state
+ * @throws IllegalStateException if the latest or checkpointed offsets
cannot be retrieved,
+ * or if the backfill spec cannot be
serialized
+ */
+ public Map<String, Object> resetToLatestAndBackfill(String id, @Nullable
Integer backfillTaskCount)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(id, "id");
+
+ Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
+
+ if (supervisor == null) {
+ throw new IAE("Supervisor[%s] does not exist", id);
+ }
+
+ if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) {
+ throw new IAE("Supervisor[%s] is not a streaming supervisor", id);
+ }
+
+ SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor)
supervisor.lhs;
+ SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec)
supervisor.rhs;
+
+ validateResetAndBackfill(id, streamSupervisor, streamSpec);
+
+ log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+ streamSupervisor.updatePartitionLagFromStream();
+ Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+ log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+ Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get latest offsets from stream
for supervisor[%s]", id);
+ }
+ if (startOffsets == null || startOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get checkpointed offsets for
supervisor[%s]", id);
+ }
+
+ String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id +
"_backfill");
+
+ try {
+ Map<String, Object> normalizedStartOffsets =
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+ Map<String, Object> normalizedEndOffsets =
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+ BoundedStreamConfig boundedStreamConfig = new
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+ SupervisorSpec backfillSpec =
streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig,
backfillTaskCount);
+ createOrUpdateAndStartSupervisor(backfillSpec);
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE(e, "Failed to serialize offsets for backfill
supervisor[%s]", backfillSupervisorId);
+ }
+
+ log.info(
+ "Started backfill supervisor[%s] for supervisor[%s] with
startOffsets[%s] and endOffsets[%s]",
+ backfillSupervisorId,
+ id,
+ startOffsets,
+ endOffsets
+ );
+
+ log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+ DataSourceMetadata resetMetadata =
streamSupervisor.createDataSourceMetaDataForReset(
+ streamSupervisor.getIoConfig().getStream(),
+ endOffsets
+ );
+
+ streamSupervisor.resetOffsets(resetMetadata);
+
+ // Reset autoscaler if present
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.reset();
+ }
+
+ return ImmutableMap.of(
+ "id", id,
+ "backfillSupervisorId", backfillSupervisorId
+ );
+ }
+
+ private void validateResetAndBackfill(
+ String id,
+ SeekableStreamSupervisor streamSupervisor,
+ SeekableStreamSupervisorSpec streamSpec
+ )
+ {
+ if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+ throw new IAE("Reset with skipped offsets is not supported when
useEarliestOffset is true.");
+ }
+
+ if (!specHasConcurrentLocks(streamSpec)) {
+ throw new IAE(
+ "Backfill tasks require 'useConcurrentLocks' to be set to true in
the supervisor context to allow concurrent writes with the main supervisor
tasks"
+ );
+ }
+
+ if (streamSupervisor.getState() !=
SupervisorStateManager.BasicState.RUNNING) {
+ throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a
reset and backfill", id);
+ }
+ }
+
public boolean checkPointDataSourceMetadata(
String supervisorId,
int taskGroupId,
@@ -631,4 +721,29 @@ public class SupervisorManager implements
SupervisorStatsProvider
return supervisor == null ? null : supervisor.rhs;
}
}
+
+ /**
+ * Returns true if the spec's context enables concurrent (append) locks,
accepting both
+ * {@code useConcurrentLocks: true} (or any truthy string) and {@code
taskLockType: APPEND}.
+ */
+ private static boolean specHasConcurrentLocks(SeekableStreamSupervisorSpec
spec)
+ {
+ Map<String, Object> context = spec.getContext();
+ if (context == null) {
+ return Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
+ }
+ Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
+ Tasks.USE_CONCURRENT_LOCKS,
+ context.get(Tasks.USE_CONCURRENT_LOCKS)
+ );
+ if (useConcurrentLocks != null) {
+ return useConcurrentLocks;
+ }
+ TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ context.get(Tasks.TASK_LOCK_TYPE),
+ TaskLockType.class
+ );
+ return taskLockType == TaskLockType.APPEND;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index aff9edf19af..8d0e04eb798 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -640,6 +640,50 @@ public class SupervisorResource
);
}
+ @POST
+ @Path("/{id}/resetToLatestAndBackfill")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response resetToLatestAndBackfill(
+ @PathParam("id") final String id,
+ @QueryParam("backfillTaskCount") @Nullable final Integer
backfillTaskCount
+ )
+ {
+ return handleResetToLatestAndBackfill(id, backfillTaskCount);
+ }
+
+ private Response handleResetToLatestAndBackfill(final String id, @Nullable
final Integer backfillTaskCount)
+ {
+ if (backfillTaskCount != null && backfillTaskCount < 1) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", "backfillTaskCount must
be a positive integer"))
+ .build();
+ }
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ if (!manager.getSupervisorIds().contains(id)) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(ImmutableMap.of("error",
StringUtils.format("[%s] does not exist", id)))
+ .build();
+ }
+ try {
+ Map<String, Object> result = manager.resetToLatestAndBackfill(id,
backfillTaskCount);
+ return Response.ok(result).build();
+ }
+ catch (IllegalArgumentException e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ catch (Exception e) {
+ return Response.serverError()
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ }
+ );
+ }
+
private Response asLeaderWithSupervisorManager(Function<SupervisorManager,
Response> f)
{
Optional<SupervisorManager> supervisorManager =
taskMaster.getSupervisorManager();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 9bef543496b..74329c68e1d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2195,7 +2195,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final boolean metadataUpdateSuccess;
final DataSourceMetadata metadata =
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(supervisorId);
if (metadata == null) {
- log.info("Checkpointed metadata in null for supervisor[%s] for
dataSource[%s] - inserting metadata[%s]", supervisorId, dataSource,
resetMetadata);
+ log.info("Checkpointed metadata is null for supervisor[%s] for
dataSource[%s] - inserting metadata[%s]", supervisorId, dataSource,
resetMetadata);
metadataUpdateSuccess =
indexerMetadataStorageCoordinator.insertDataSourceMetadata(supervisorId,
resetMetadata);
} else {
if (!checkSourceMetadataMatch(metadata)) {
@@ -3311,7 +3311,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* gets mapping of partitions in stream to their latest offsets.
*/
- protected Map<PartitionIdType, SequenceOffsetType>
getLatestSequencesFromStream()
+ public Map<PartitionIdType, SequenceOffsetType>
getLatestSequencesFromStream()
{
return new HashMap<>();
}
@@ -4589,7 +4589,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
- protected Map<PartitionIdType, SequenceOffsetType>
getOffsetsFromMetadataStorage()
+ public Map<PartitionIdType, SequenceOffsetType>
getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
@@ -4976,7 +4976,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
coalesceAndAwait(futures);
}
- protected abstract void updatePartitionLagFromStream();
+ public abstract void updatePartitionLagFromStream();
/**
* Gets 'lag' of currently processed offset behind latest offset as a
measure of difference between offsets.
@@ -5233,7 +5233,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* @param map partitionId -> sequence
* @return specific instance of datasource metadata
*/
- protected abstract SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> createDataSourceMetaDataForReset(
+ public abstract SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> createDataSourceMetaDataForReset(
String stream,
Map<PartitionIdType, SequenceOffsetType> map
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 842f0de4774..ecbd51757c3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -297,4 +297,10 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean
suspend);
+ public abstract SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ );
+
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 525444e23de..199e004b424 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -19,6 +19,9 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -26,6 +29,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.InvalidInput;
@@ -35,7 +40,11 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -43,6 +52,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.metrics.SupervisorStatsProvider;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -59,6 +69,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
+import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
@@ -1068,6 +1079,186 @@ public class SupervisorManagerTest extends
EasyMockSupport
);
}
+ @Test
+ public void testResetToLatestAndBackfill() throws Exception
+ {
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+ replayAll();
+ manager.start();
+
+ final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>>
supervisorsMap = getSupervisorsMap();
+ final SeekableStreamSupervisorSpec streamSpec =
EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
+ final SeekableStreamSupervisor streamSupervisor =
EasyMock.createNiceMock(SeekableStreamSupervisor.class);
+ final SeekableStreamSupervisorIOConfig ioConfig =
EasyMock.createNiceMock(SeekableStreamSupervisorIOConfig.class);
+
+ // non-SeekableStream supervisor → IAE
+ // Use a concrete anonymous Supervisor (not a mock) to reliably fail
instanceof SeekableStreamSupervisor
+ final Supervisor nonStreamSupervisor = new Supervisor()
+ {
+ @Override
+ public void start()
+ {
+ }
+
+ @Override
+ public void stop(boolean stopGracefully)
+ {
+ }
+
+ @Override
+ public SupervisorReport getStatus()
+ {
+ return null;
+ }
+
+ @Override
+ public SupervisorStateManager.State getState()
+ {
+ return null;
+ }
+
+ @Override
+ public void reset(DataSourceMetadata dataSourceMetadata)
+ {
+ }
+ };
+ supervisorsMap.put("id1", Pair.of(nonStreamSupervisor, streamSpec));
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+
+ // useEarliestSequenceNumber=true → IAE
+ supervisorsMap.put("id1", Pair.of(streamSupervisor, streamSpec));
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(true).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // useConcurrentLocks not set (null context) → IAE
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(null).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // useConcurrentLocks=false → IAE
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+
EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks",
false)).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // useConcurrentLocks="true" (string) → accepted, fails at next guard (not
RUNNING)
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+
EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks",
"true")).once();
+
EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // taskLockType=APPEND → accepted, fails at next guard (not RUNNING)
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+
EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("taskLockType",
"APPEND")).once();
+
EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // supervisor not RUNNING → IAE
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+
EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks",
true)).once();
+
EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // empty latest offsets → ISE
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+
EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks",
true)).once();
+
EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once();
+ streamSupervisor.updatePartitionLagFromStream();
+ EasyMock.expectLastCall().once();
+
EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of()).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // empty start offsets from metadata → ISE
+
EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+
EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks",
true)).once();
+
EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once();
+ streamSupervisor.updatePartitionLagFromStream();
+ EasyMock.expectLastCall().once();
+
EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of("0",
100L)).once();
+
EasyMock.expect(streamSupervisor.getOffsetsFromMetadataStorage()).andReturn(ImmutableMap.of()).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+
+ verifyAll();
+ }
+
+ @Test
+ public void testCreateBackfillSpec()
+ {
+ final TestBackfillSupervisorSpec.IOConfig ioConfig = new
TestBackfillSupervisorSpec.IOConfig("test-stream", null, null);
+ final TestBackfillSupervisorSpec.IngestionSpec ingestionSpec = new
TestBackfillSupervisorSpec.IngestionSpec(ioConfig);
+ final SeekableStreamSupervisorSpec sourceSpec = new
TestBackfillSupervisorSpec("original-id", ingestionSpec);
+
+ final BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(
+ ImmutableMap.of("0", 100L),
+ ImmutableMap.of("0", 200L)
+ );
+
+ // Without overriding taskCount
+ final SupervisorSpec backfillSpec =
sourceSpec.createBackfillSpec("backfill-id", boundedStreamConfig, null);
+ Assert.assertEquals("backfill-id", backfillSpec.getId());
+ final TestBackfillSupervisorSpec backfillCast =
(TestBackfillSupervisorSpec) backfillSpec;
+ final BoundedStreamConfig actualConfig =
backfillCast.getIoConfig().getBoundedStreamConfig();
+ Assert.assertNotNull(actualConfig);
+ Assert.assertEquals(ImmutableMap.of("0", 100L),
actualConfig.getStartSequenceNumbers());
+ Assert.assertEquals(ImmutableMap.of("0", 200L),
actualConfig.getEndSequenceNumbers());
+ Assert.assertEquals(1, backfillCast.getIoConfig().getTaskCount());
+
+ // With overriding taskCount
+ final SupervisorSpec backfillSpecWithCount =
sourceSpec.createBackfillSpec("backfill-id-2", boundedStreamConfig, 5);
+ Assert.assertEquals("backfill-id-2", backfillSpecWithCount.getId());
+ final TestBackfillSupervisorSpec backfillWithCount =
(TestBackfillSupervisorSpec) backfillSpecWithCount;
+ Assert.assertEquals(5, backfillWithCount.getIoConfig().getTaskCount());
+ }
+
private static class TestSupervisorSpec implements SupervisorSpec
{
private final String id;
@@ -1137,4 +1328,103 @@ public class SupervisorManagerTest extends
EasyMockSupport
return Collections.singletonList(id);
}
}
+
+ @JsonTypeName("testBackfill")
+ private static class TestBackfillSupervisorSpec extends
SeekableStreamSupervisorSpec
+ {
+ @JsonCreator
+ TestBackfillSupervisorSpec(
+ @JsonProperty("id") String id,
+ @JsonProperty("spec") IngestionSpec ingestionSpec
+ )
+ {
+ super(
+ id,
+ ingestionSpec,
+ ImmutableMap.of("useConcurrentLocks", true),
+ false,
+ null, null, null, null,
+ MAPPER,
+ null, null, null, null
+ );
+ }
+
+ @Override
+ public Supervisor createSupervisor()
+ {
+ return null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "testBackfill";
+ }
+
+ @Override
+ public String getSource()
+ {
+ return "test-stream";
+ }
+
+ @Override
+ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+ {
+ return this;
+ }
+
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return new TestBackfillSupervisorSpec(
+ backfillId,
+ new IngestionSpec(new IOConfig(getIoConfig().getStream(), taskCount,
boundedStreamConfig))
+ );
+ }
+
+ @Override
+ public SeekableStreamSupervisorIOConfig getIoConfig()
+ {
+ return getSpec().getIOConfig();
+ }
+
+ @JsonTypeName("testBackfillIngestionSpec")
+ static class IngestionSpec extends SeekableStreamSupervisorIngestionSpec
+ {
+ @JsonCreator
+ IngestionSpec(
+ @JsonProperty("ioConfig") IOConfig ioConfig
+ )
+ {
+ super(
+ new DataSchema(
+ "testDS",
+ new TimestampSpec("time", "auto", null),
+ new DimensionsSpec(Collections.emptyList()),
+ null, null, null, null, null
+ ),
+ ioConfig,
+ null
+ );
+ }
+ }
+
+ @JsonTypeName("testBackfillIOConfig")
+ static class IOConfig extends SeekableStreamSupervisorIOConfig
+ {
+ @JsonCreator
+ IOConfig(
+ @JsonProperty("stream") String stream,
+ @JsonProperty("taskCount") Integer taskCount,
+ @JsonProperty("boundedStreamConfig") BoundedStreamConfig
boundedStreamConfig
+ )
+ {
+ super(stream, null, 1, taskCount, null, null, null, false, null, null,
null, null, LagAggregator.DEFAULT, null, null, null, null, boundedStreamConfig);
+ }
+ }
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 4ccf4659994..31e0d604a22 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -34,6 +34,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
@@ -1379,6 +1380,100 @@ public class SupervisorResourceTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void testResetToLatestAndBackfill()
+ {
+ // 200 - success
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id"));
+ EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null))
+ .andReturn(ImmutableMap.of("id", "my-id", "backfillSupervisorId",
"my-id_backfill_abcdefgh"));
+ replayAll();
+
+ Response response = supervisorResource.resetToLatestAndBackfill("my-id",
null);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("id", "my-id", "backfillSupervisorId",
"my-id_backfill_abcdefgh"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 404 - supervisor does not exist
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of());
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(404, response.getStatus());
+ verifyAll();
+ resetAll();
+
+ // 400 - IAE (e.g. supervisor not running)
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id"));
+ EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null))
+ .andThrow(new IllegalArgumentException("Supervisor[my-id] must be
in a RUNNING state"));
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "Supervisor[my-id] must be in a RUNNING
state"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 500 - ISE (e.g. failed to retrieve offsets)
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id"));
+ EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null))
+ .andThrow(new IllegalStateException("Failed to get latest offsets
from stream"));
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(500, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "Failed to get latest offsets from stream"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 400 - invalid backfillTaskCount (zero)
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", 0);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "backfillTaskCount must be a positive
integer"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 400 - invalid backfillTaskCount (negative)
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", -1);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "backfillTaskCount must be a positive
integer"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 503 - no supervisor manager (not leader)
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(503, response.getStatus());
+ verifyAll();
+ }
+
@Test
public void testNoopSupervisorSpecSerde() throws Exception
{
@@ -1668,6 +1763,16 @@ public class SupervisorResourceTest extends
EasyMockSupport
return null;
}
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return null;
+ }
+
@JsonIgnore
@Nonnull
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 8d1b5350e8c..80120d07fdf 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -942,6 +942,16 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
return null;
}
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return null;
+ }
+
@Override
public String getType()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index d61049777c8..9e45920ad71 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3059,7 +3059,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor()
{
@Override
- protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
@@ -3284,7 +3284,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
// do nothing
}
@@ -3381,7 +3381,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
@Override
- protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
@@ -3521,7 +3521,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
@Override
- protected Map<String, String> getLatestSequencesFromStream()
+ public Map<String, String> getLatestSequencesFromStream()
{
return streamOffsets;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
index 4eefaed9bd9..c96a64211b9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
@@ -124,7 +124,7 @@ public abstract class SeekableStreamSupervisorTestBase
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
// do nothing
}
@@ -205,7 +205,7 @@ public abstract class SeekableStreamSupervisorTestBase
}
@Override
- protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
@@ -436,6 +436,16 @@ public abstract class SeekableStreamSupervisorTestBase
{
return null;
}
+
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return null;
+ }
}
protected static SeekableStreamSupervisorTuningConfig getTuningConfig()
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
index 0ca643f109f..35488be081b 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
@@ -322,6 +322,12 @@ public class MSQWorkerTaskLauncherRetryTest
throw new UOE("Not implemented");
}
+ @Override
+ public ListenableFuture<Map<String, Object>>
resetToLatestAndBackfill(String supervisorId)
+ {
+ throw new UOE("Not implemented");
+ }
+
@Override
public ListenableFuture<CloseableIterator<SupervisorStatus>>
supervisorStatuses()
{
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
index 81fccf19f13..2b1ad6a555a 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java
@@ -114,6 +114,12 @@ public class NoopOverlordClient implements OverlordClient
throw new UnsupportedOperationException();
}
+ @Override
+ public ListenableFuture<Map<String, Object>> resetToLatestAndBackfill(String
supervisorId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public ListenableFuture<CloseableIterator<SupervisorStatus>>
supervisorStatuses()
{
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index c4d34899777..baf7e4297c9 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -197,6 +197,15 @@ public interface OverlordClient
*/
ListenableFuture<Map<String, String>> terminateSupervisor(String
supervisorId);
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded
backfill supervisor.
+ * <p>
+ * API: {@code POST
/druid/indexer/v1/supervisor/<id>/resetToLatestAndBackfill}
+ *
+ * @return Map containing "id" and "backfillSupervisorId"
+ */
+ ListenableFuture<Map<String, Object>> resetToLatestAndBackfill(String
supervisorId);
+
/**
* Returns all current supervisor statuses.
*/
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
index 0499a62f090..3657d8b83a6 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
@@ -265,6 +265,23 @@ public class OverlordClientImpl implements OverlordClient
);
}
+ @Override
+ public ListenableFuture<Map<String, Object>> resetToLatestAndBackfill(String
supervisorId)
+ {
+ final String path = StringUtils.format(
+ "/druid/indexer/v1/supervisor/%s/resetToLatestAndBackfill",
+ StringUtils.urlEncode(supervisorId)
+ );
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, path),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new
TypeReference<>() {})
+ );
+ }
+
@Override
public ListenableFuture<CloseableIterator<SupervisorStatus>>
supervisorStatuses()
{
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index 257533aecbd..6ae8750b8d8 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -430,6 +430,16 @@ public class EmbeddedClusterApis implements
EmbeddedResource
return onLeaderOverlord(o -> o.postSupervisor(supervisor)).get("id");
}
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded
backfill supervisor.
+ *
+ * @return Map containing "id" and "backfillSupervisorId"
+ */
+ public Map<String, Object> resetToLatestAndBackfill(String supervisorId)
+ {
+ return onLeaderOverlord(o -> o.resetToLatestAndBackfill(supervisorId));
+ }
+
/**
* Fetches the current status of the given supervisor ID.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]