This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 37db5d9b81 Reset offsets supervisor API (#14772)
37db5d9b81 is described below

commit 37db5d9b81be63848403d7ce5f7c0c5f090de437
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Thu Aug 17 17:13:10 2023 -0400

    Reset offsets supervisor API (#14772)
    
    * Add supervisor /resetOffsets API.
    
    - Add a new endpoint 
/druid/indexer/v1/supervisor/<supervisorId>/resetOffsets
    which accepts DataSourceMetadata as a body parameter.
    - Update logs, unit tests and docs.
    
    * Add a new interface method for backwards compatibility.
    
    * Rename
    
    * Adjust tests and javadocs.
    
    * Use CoreInjectorBuilder instead of deprecated makeInjectorWithModules
    
    * UT fix
    
    * Doc updates.
    
    * remove extraneous debugging logs.
    
    * Remove the boolean setting; only ResetHandle() and resetInternal()
    
    * Relax constraints and add a new ResetOffsetsNotice; cleanup old logic.
    
    * A separate ResetOffsetsNotice and some cleanup.
    
    * Minor cleanup
    
    * Add a check & test to verify that sequence numbers are only of type 
SeekableStreamEndSequenceNumbers
    
    * Add unit tests for the no op implementations for test coverage
    
    * CodeQL fix
    
    * checkstyle from merge conflict
    
    * Doc changes
    
    * DOCUSAURUS code tabs fix. Thanks, Brian!
---
 docs/api-reference/supervisor-api.md               | 100 +++-
 .../extensions-core/kafka-supervisor-operations.md |  64 +++
 .../extensions-core/kinesis-ingestion.md           |  12 +-
 .../MaterializedViewSupervisor.java                |   6 +
 .../MaterializedViewSupervisorTest.java            |  33 ++
 .../kafka/KafkaDataSourceMetadataTest.java         |  56 +++
 .../kinesis/KinesisDataSourceMetadataTest.java     |  55 ++
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   1 -
 .../overlord/supervisor/SupervisorManager.java     |   8 +-
 .../overlord/supervisor/SupervisorResource.java    |  25 +-
 .../supervisor/SeekableStreamSupervisor.java       | 144 +++++-
 .../overlord/supervisor/SupervisorManagerTest.java |  30 ++
 .../supervisor/SupervisorResourceTest.java         |  49 ++
 .../TestSeekableStreamDataSourceMetadata.java      |  48 ++
 .../SeekableStreamSupervisorStateTest.java         | 552 ++++++++++++++++++++-
 .../overlord/supervisor/NoopSupervisorSpec.java    |   5 +
 .../indexing/overlord/supervisor/Supervisor.java   |  13 +
 .../druid/indexing/NoopSupervisorSpecTest.java     |  18 +
 website/.spelling                                  |   1 +
 19 files changed, 1208 insertions(+), 12 deletions(-)

diff --git a/docs/api-reference/supervisor-api.md 
b/docs/api-reference/supervisor-api.md
index 300e46ff1b..b315971ec2 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3065,7 +3065,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 ### Reset a supervisor
 
-Resets the specified supervisor. This endpoint clears stored offsets in Kafka 
or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor will start from the earliest or latest available 
position, depending on the platform (offsets in Kafka or sequence numbers in 
Kinesis). It kills and recreates active tasks to read from valid positions.
+Resets the specified supervisor. This endpoint clears _all_ stored offsets in 
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor will start from the earliest or latest available 
position, depending on the platform (offsets in Kafka or sequence numbers in 
Kinesis). It kills and recreates active tasks to read from valid positions.
 
 Use this endpoint to recover from a stopped state due to missing offsets in 
Kafka or sequence numbers in Kinesis. Use this endpoint with caution as it may 
result in skipped messages and lead to data loss or duplicate data.
 
@@ -3130,6 +3130,104 @@ Host: http://ROUTER_IP:ROUTER_PORT
   ```
 </details>
 
+### Reset Offsets for a supervisor
+
+Resets the specified offsets for a supervisor. This endpoint clears _only_ the 
specified offsets in Kafka or sequence numbers in Kinesis, prompting the 
supervisor to resume data reading.
+If there are no stored offsets, the specified offsets will be set in the 
metadata store. The supervisor will start from the reset offsets for the 
partitions specified and for the other partitions from the stored offset.
+It kills and recreates active tasks pertaining to the partitions specified to 
read from valid offsets.
+
+Use this endpoint to selectively reset offsets for partitions without 
resetting the entire set.
+
+#### URL
+
+<code class="postAPI">POST</code> 
<code>/druid/indexer/v1/supervisor/:supervisorId/resetOffsets</code>
+
+#### Responses
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--200 SUCCESS-->
+
+*Successfully reset offsets*
+
+<!--404 NOT FOUND-->
+
+*Invalid supervisor ID*
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+---
+#### Reset Offsets Metadata
+
+This section presents the structure and details of the reset offsets metadata 
payload.
+
+| Field | Type | Description | Required |
+|---------|---------|---------|---------|
+| `type` | String | The type of reset offsets metadata payload. It must match 
the supervisor's `type`. Possible values: `kafka` or `kinesis`. | Yes |
+| `partitions` | Object | An object representing the reset metadata. See below 
for details. | Yes |
+
+#### Partitions
+
+The following table defines the fields within the `partitions` object in the 
reset offsets metadata payload.
+
+| Field | Type | Description | Required |
+|---------|---------|---------|---------|
+| `type` | String | Must be set as `end`.  Indicates the end sequence numbers 
for the reset offsets. | Yes |
+| `stream` | String | The stream to be reset. It must be a valid stream 
consumed by the supervisor. | Yes |
+| `partitionOffsetMap` | Object | A map of partitions to corresponding offsets 
for the stream to be reset.| Yes |
+
+#### Sample request
+
+The following example shows how to reset offsets for a kafka supervisor with 
the name `social_media`. Let's say the supervisor is reading
+from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0, 
"1": 10, "2": 20, "3": 40}`.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--cURL-->
+
+```shell
+curl --request POST 
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets";
+--header 'Content-Type: application/json'
+--data-raw 
'{"type":"kafka","partitions":{"type":"end","stream":"ads_media_stream","partitionOffsetMap":{"0":100,
 "2": 650}}}'
+```
+
+<!--HTTP-->
+
+```HTTP
+POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+Content-Type: application/json
+
+{
+  "type": "kafka",
+  "partitions": {
+    "type": "end",
+    "stream": "ads_media_stream",
+    "partitionOffsetMap": {
+      "0": 100,
+      "2": 650
+    }
+  }
+}
+```
+
+The above operation will reset offsets only for partitions 0 and 2 to 100 and 
650 respectively. After a successful reset,
+when the supervisor's tasks restart, they will resume reading from `{"0": 100, 
"1": 10, "2": 650, "3": 40}`.
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+#### Sample response
+
+<details>
+  <summary>Click to show sample response</summary>
+
+  ```json
+{
+    "id": "social_media"
+}
+  ```
+</details>
+
 ### Terminate a supervisor
 
 Terminates a supervisor and its associated indexing tasks, triggering the 
publishing of their segments. When terminated, a tombstone marker is placed in 
the database to prevent reloading on restart.
diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md 
b/docs/development/extensions-core/kafka-supervisor-operations.md
index e1de35eb2f..9b6265154c 100644
--- a/docs/development/extensions-core/kafka-supervisor-operations.md
+++ b/docs/development/extensions-core/kafka-supervisor-operations.md
@@ -131,6 +131,70 @@ to start and in flight tasks will fail. This operation 
enables you to recover fr
 
 Note that the supervisor must be running for this endpoint to be available.
 
+## Resetting Offsets for a Supervisor
+
+The supervisor must be running for this endpoint to be available.
+
+The `POST /druid/indexer/v1/supervisor/<supervisorId>/resetOffsets` operation 
clears stored
+offsets, causing the supervisor to start reading from the specified offsets. 
After resetting stored
+offsets, the supervisor kills and recreates any active tasks pertaining to the 
specified partitions,
+so that tasks begin reading from specified offsets. For partitions that are 
not specified in this operation, the supervisor
+will resume from the last stored offset.
+
+Use care when using this operation! Resetting offsets for a supervisor may 
cause Kafka messages to be skipped or read
+twice, resulting in missing or duplicate data.
+
+#### Sample request
+
+The following example shows how to reset offsets for a kafka supervisor with 
the name `social_media`. Let's say the supervisor is reading
+from two kafka topics `ads_media_foo` and `ads_media_bar` and has the stored 
offsets: `{"ads_media_foo:0": 0, "ads_media_foo:1": 10, "ads_media_bar:0": 20, 
"ads_media_bar:1": 40}`.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--cURL-->
+
+```shell
+curl --request POST 
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets";
+--header 'Content-Type: application/json'
+--data-raw 
'{"type":"kafka","partitions":{"type":"end","stream":"ads_media_foo|ads_media_bar","partitionOffsetMap":{"ads_media_foo:0":
 3, "ads_media_bar:1": 12}}}'
+```
+
+<!--HTTP-->
+
+```HTTP
+POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+Content-Type: application/json
+
+{
+  "type": "kafka",
+  "partitions": {
+    "type": "end",
+    "stream": "ads_media_foo|ads_media_bar",
+    "partitionOffsetMap": {
+      "ads_media_foo:0": 3,
+      "ads_media_bar:1": 12
+    }
+  }
+}
+```
+The above operation will reset offsets for `ads_media_foo` partition 0 and 
`ads_media_bar` partition 1 to offsets 3 and 12 respectively. After a 
successful reset,
+when the supervisor's tasks restart, they will resume reading from 
`{"ads_media_foo:0": 3, "ads_media_foo:1": 10, "ads_media_bar:0": 20, 
"ads_media_bar:1": 12}`.
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+#### Sample response
+
+<details>
+  <summary>Click to show sample response</summary>
+
+  ```json
+{
+    "id": "social_media"
+}
+  ```
+</details>
+
 ## Terminating Supervisors
 
 The `POST /druid/indexer/v1/supervisor/<supervisorId>/terminate` operation 
terminates a supervisor and causes all 
diff --git a/docs/development/extensions-core/kinesis-ingestion.md 
b/docs/development/extensions-core/kinesis-ingestion.md
index 24038d7c9d..0c3066d2f7 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -45,7 +45,7 @@ See [Supervisor API](../../api-reference/supervisor-api.md) 
for more information
 |--------|----|-----------|--------|
 |`type`|String|The supervisor type; this should always be `kinesis`.|Yes|
 |`spec`|Object|The container object for the supervisor configuration.|Yes|
-|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration) 
object for configuring Kafka connection and I/O-related settings for the 
supervisor and indexing task.|Yes|
+|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration) 
object for configuring Kinesis connection and I/O-related settings for the 
supervisor and indexing task.|Yes|
 |`dataSchema`|Object|The schema used by the Kinesis indexing task during 
ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema) for 
more information.|Yes|
 |`tuningConfig`|Object|The [tuning 
configuration](#supervisor-tuning-configuration) object for configuring 
performance-related settings for the supervisor and indexing tasks.|No|
 
@@ -593,6 +593,16 @@ for the generated segments to be accepted. If the messages 
at the expected start
 no longer available in Kinesis (typically because the message retention period 
has elapsed or the topic was
 removed and re-created) the supervisor will refuse to start and in-flight 
tasks will fail. This endpoint enables you to recover from this condition.
 
+### Resetting Offsets for a supervisor
+
+To reset partition offsets for a supervisor, send a `POST` request to the 
`/druid/indexer/v1/supervisor/:supervisorId/resetOffsets` endpoint. This 
endpoint clears stored
+sequence numbers, prompting the supervisor to start reading from the specified 
offsets.
+After resetting stored offsets, the supervisor kills and recreates any active 
tasks pertaining to the specified partitions,
+so that tasks begin reading specified offsets. For partitions that are not 
specified in this operation, the supervisor will resume from the last
+stored offset.
+
+Use this endpoint with caution as it may result in skipped messages, leading 
to data loss or duplicate data.
+
 ### Terminate a supervisor
 
 To terminate a supervisor and its associated indexing tasks, send a `POST` 
request to the `/druid/indexer/v1/supervisor/:supervisorId/terminate` endpoint.
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index fd9303c17b..ac2738534d 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -277,6 +277,12 @@ public class MaterializedViewSupervisor implements 
Supervisor
     }
   }
 
+  @Override
+  public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
+  {
+    throw new UnsupportedOperationException("Reset offsets not supported in 
MaterializedViewSupervisor");
+  }
+
   @Override
   public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)
   {
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index d822948881..e49b160c1e 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -387,4 +387,37 @@ public class MaterializedViewSupervisorTest
     EasyMock.replay(mock);
     supervisor.run();
   }
+
+  @Test
+  public void testResetOffsetsNotSupported()
+  {
+    MaterializedViewSupervisorSpec suspended = new 
MaterializedViewSupervisorSpec(
+        "base",
+        new DimensionsSpec(Collections.singletonList(new 
StringDimensionSchema("dim"))),
+        new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
+        HadoopTuningConfig.makeDefaultTuningConfig(),
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        sqlSegmentsMetadataManager,
+        indexerMetadataStorageCoordinator,
+        new MaterializedViewTaskConfig(),
+        EasyMock.createMock(AuthorizerMapper.class),
+        EasyMock.createMock(ChatHandlerProvider.class),
+        new SupervisorStateManagerConfig()
+    );
+    MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) 
suspended.createSupervisor();
+    Assert.assertThrows(
+        "Reset offsets not supported in MaterializedViewSupervisor",
+        UnsupportedOperationException.class,
+        () -> supervisor.resetOffsets(null)
+    );
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
index 45f61b61b0..800a1fedf9 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
@@ -19,11 +19,19 @@
 
 package org.apache.druid.indexing.kafka;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
 import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.utils.CollectionUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -163,6 +171,37 @@ public class KafkaDataSourceMetadataTest
     );
   }
 
+  @Test
+  public void testKafkaDataSourceMetadataSerdeRoundTrip() throws 
JsonProcessingException
+  {
+    ObjectMapper jsonMapper = createObjectMapper();
+
+    KafkaDataSourceMetadata kdm1 = endMetadata(ImmutableMap.of());
+    String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
+    DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, 
DataSourceMetadata.class);
+    Assert.assertEquals(kdm1, dsMeta1);
+
+    KafkaDataSourceMetadata kdm2 = endMetadata(ImmutableMap.of(1, 3L));
+    String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
+    DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, 
DataSourceMetadata.class);
+    Assert.assertEquals(kdm2, dsMeta2);
+  }
+
+  @Test
+  public void testKafkaDataSourceMetadataSerde() throws JsonProcessingException
+  {
+    ObjectMapper jsonMapper = createObjectMapper();
+    KafkaDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of(1, 3L));
+    String kdmStr1 = 
"{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":3},\"partitionOffsetMap\":{\"1\":3},\"exclusivePartitions\":[]}}\n";
+    DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, 
DataSourceMetadata.class);
+    Assert.assertEquals(dsMeta1, expectedKdm1);
+
+    KafkaDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of(1, 3L, 
2, 1900L));
+    String kdmStr2 = 
"{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":3,
 \"2\":1900},\"partitionOffsetMap\":{\"1\":3, 
\"2\":1900},\"exclusivePartitions\":[]}}\n";
+    DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, 
DataSourceMetadata.class);
+    Assert.assertEquals(dsMeta2, expectedKdm2);
+  }
+
   private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> 
offsets)
   {
     Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
@@ -188,4 +227,21 @@ public class KafkaDataSourceMetadataTest
     );
     return new KafkaDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>("foo", newOffsets));
   }
+
+  private static ObjectMapper createObjectMapper()
+  {
+    DruidModule module = new KafkaIndexTaskModule();
+    final Injector injector = new CoreInjectorBuilder(new 
StartupInjectorBuilder().build())
+        .addModule(
+            binder -> {
+              
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+              
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+              
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+            }
+        ).build();
+
+    ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+    module.getJacksonModules().forEach(objectMapper::registerModule);
+    return objectMapper;
+  }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
index fbb3d6405f..8798153170 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
@@ -20,10 +20,18 @@
 
 package org.apache.druid.indexing.kinesis;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -217,6 +225,37 @@ public class KinesisDataSourceMetadataTest
     );
   }
 
+  @Test
+  public void testKinesisDataSourceMetadataSerdeRoundTrip() throws 
JsonProcessingException
+  {
+    ObjectMapper jsonMapper = createObjectMapper();
+
+    KinesisDataSourceMetadata kdm1 = startMetadata(ImmutableMap.of(), 
ImmutableSet.of());
+    String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
+    DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, 
DataSourceMetadata.class);
+    Assert.assertEquals(kdm1, dsMeta1);
+
+    KinesisDataSourceMetadata kdm2 = startMetadata(ImmutableMap.of("1", "3"), 
ImmutableSet.of());
+    String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
+    DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, 
DataSourceMetadata.class);
+    Assert.assertEquals(kdm2, dsMeta2);
+  }
+
+  @Test
+  public void testKinesisDataSourceMetadataSerde() throws 
JsonProcessingException
+  {
+    ObjectMapper jsonMapper = createObjectMapper();
+    KinesisDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of("1", 
"5"));
+    String kdmStr1 = 
"{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":5},\"partitionOffsetMap\":{\"1\":5},\"exclusivePartitions\":[]}}\n";
+    DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, 
DataSourceMetadata.class);
+    Assert.assertEquals(dsMeta1, expectedKdm1);
+
+    KinesisDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of("1", 
"10", "2", "19"));
+    String kdmStr2 = 
"{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":10,
 \"2\":19},\"partitionOffsetMap\":{\"1\":10, 
\"2\":19},\"exclusivePartitions\":[]}}\n";
+    DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, 
DataSourceMetadata.class);
+    Assert.assertEquals(dsMeta2, expectedKdm2);
+  }
+
   private static KinesisDataSourceMetadata simpleStartMetadata(Map<String, 
String> sequences)
   {
     return startMetadata(sequences, sequences.keySet());
@@ -233,4 +272,20 @@ public class KinesisDataSourceMetadataTest
   {
     return new KinesisDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>("foo", sequences));
   }
+
+  private static ObjectMapper createObjectMapper()
+  {
+    DruidModule module = new KinesisIndexingServiceModule();
+    final Injector injector = new CoreInjectorBuilder(new 
StartupInjectorBuilder().build())
+        .addModule(
+            binder -> {
+              
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+              
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+              
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+            }
+        ).build();
+    ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+    module.getJacksonModules().forEach(objectMapper::registerModule);
+    return objectMapper;
+  }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 127683f8e4..39d943dbeb 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -2636,7 +2636,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     supervisor.resetInternal(null);
     verifyAll();
-
   }
 
   @Test
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index f582e9e9ae..2cd926bae9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -201,7 +201,7 @@ public class SupervisorManager
     return supervisor == null ? Optional.absent() : 
Optional.fromNullable(supervisor.lhs.isHealthy());
   }
 
-  public boolean resetSupervisor(String id, @Nullable DataSourceMetadata 
dataSourceMetadata)
+  public boolean resetSupervisor(String id, @Nullable DataSourceMetadata 
resetDataSourceMetadata)
   {
     Preconditions.checkState(started, "SupervisorManager not started");
     Preconditions.checkNotNull(id, "id");
@@ -212,7 +212,11 @@ public class SupervisorManager
       return false;
     }
 
-    supervisor.lhs.reset(dataSourceMetadata);
+    if (resetDataSourceMetadata == null) {
+      supervisor.lhs.reset(null);
+    } else {
+      supervisor.lhs.resetOffsets(resetDataSourceMetadata);
+    }
     SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
     if (autoscaler != null) {
       autoscaler.reset();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 4eaf5756c1..604c4073f3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import 
org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
 import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +46,7 @@ import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 
+import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
@@ -493,10 +495,31 @@ public class SupervisorResource
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(SupervisorResourceFilter.class)
   public Response reset(@PathParam("id") final String id)
+  {
+    return handleResetRequest(id, null);
+  }
+
+  @POST
+  @Path("/{id}/resetOffsets")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response resetOffsets(
+      @PathParam("id") final String id,
+      final DataSourceMetadata resetDataSourceMetadata
+  )
+  {
+    return handleResetRequest(id, resetDataSourceMetadata);
+  }
+
+  private Response handleResetRequest(
+      final String id,
+      @Nullable final DataSourceMetadata resetDataSourceMetadata
+  )
   {
     return asLeaderWithSupervisorManager(
         manager -> {
-          if (manager.resetSupervisor(id, null)) {
+          if (manager.resetSupervisor(id, resetDataSourceMetadata)) {
             return Response.ok(ImmutableMap.of("id", id)).build();
           } else {
             return Response.status(Response.Status.NOT_FOUND)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f8b41fce61..29fd16d1a4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -42,6 +42,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -61,6 +63,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl;
@@ -95,6 +98,7 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
@@ -608,6 +612,31 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  private class ResetOffsetsNotice implements Notice
+  {
+    final DataSourceMetadata dataSourceMetadata;
+    private static final String TYPE = "reset_offsets_notice";
+
+    ResetOffsetsNotice(
+        final DataSourceMetadata dataSourceMetadata
+    )
+    {
+      this.dataSourceMetadata = dataSourceMetadata;
+    }
+
+    @Override
+    public void handle()
+    {
+      resetOffsetsInternal(dataSourceMetadata);
+    }
+
+    @Override
+    public String getType()
+    {
+      return TYPE;
+    }
+  }
+
   protected class CheckpointNotice implements Notice
   {
     private final int taskGroupId;
@@ -998,12 +1027,59 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   }
 
   @Override
-  public void reset(DataSourceMetadata dataSourceMetadata)
+  public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
   {
-    log.info("Posting ResetNotice");
+    log.info("Posting ResetNotice with datasource metadata [%s]", 
dataSourceMetadata);
     addNotice(new ResetNotice(dataSourceMetadata));
   }
 
+  /**
+   * Reset offsets with provided dataSource metadata. Validates {@code 
resetDataSourceMetadata},
+   * creates a {@code ResetOffsetsNotice} with the metadata and adds it to the 
notice queue. The resulting stored offsets
+   * is a union of existing checkpointed offsets and provided offsets.
+   * @param resetDataSourceMetadata required datasource metadata with offsets 
to reset.
+   * @throws DruidException if any metadata attribute doesn't match the 
supervisor's.
+   */
+  @Override
+  public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
+  {
+    if (resetDataSourceMetadata == null) {
+      throw InvalidInput.exception("Reset dataSourceMetadata is required for 
resetOffsets.");
+    }
+
+    if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+      throw InvalidInput.exception(
+          "Datasource metadata instance does not match required, found 
instance of [%s].",
+          resetDataSourceMetadata.getClass()
+      );
+    }
+    @SuppressWarnings("unchecked")
+    final SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> resetMetadata =
+        (SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType>) resetDataSourceMetadata;
+
+    final SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
streamSequenceNumbers = resetMetadata.getSeekableStreamSequenceNumbers();
+    if (!(streamSequenceNumbers instanceof SeekableStreamEndSequenceNumbers)) {
+      throw InvalidInput.exception(
+          "Provided datasourceMetadata[%s] is invalid. Sequence numbers can 
only be of type[%s], but found[%s].",
+          resetMetadata,
+          SeekableStreamEndSequenceNumbers.class.getSimpleName(),
+          streamSequenceNumbers.getClass().getSimpleName()
+      );
+    }
+
+    final String resetStream = streamSequenceNumbers.getStream();
+    if (!ioConfig.getStream().equals(resetStream)) {
+      throw InvalidInput.exception(
+          "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is 
consuming stream[%s].",
+          resetStream,
+          supervisorId,
+          ioConfig.getStream()
+      );
+    }
+    log.info("Posting ResetOffsetsNotice with reset dataSource metadata[%s]", 
resetDataSourceMetadata);
+    addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
+  }
+
   public ReentrantLock getRecordSupplierLock()
   {
     return recordSupplierLock;
@@ -1693,6 +1769,70 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * Reset offsets with the data source metadata. If checkpoints exist, the 
resulting stored offsets will be a union of
+   * existing checkpointed offsets and provided offsets; any checkpointed 
offsets not specified in the metadata will be
+   * preserved as-is. If checkpoints don't exist, the provided reset 
datasource metdadata will be inserted into
+   * the metadata storage. Once the offsets are reset, any active tasks 
serving the partition offsets will be restarted.
+   * @param dataSourceMetadata Required reset data source metdata. Assumed 
that the metadata is validated.
+   */
+  public void resetOffsetsInternal(@Nonnull final DataSourceMetadata 
dataSourceMetadata)
+  {
+    log.info("Reset offsets for dataSource[%s] with metadata[%s]", dataSource, 
dataSourceMetadata);
+
+    @SuppressWarnings("unchecked")
+    final SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> resetMetadata =
+        (SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType>) dataSourceMetadata;
+
+    final boolean metadataUpdateSuccess;
+    final DataSourceMetadata metadata = 
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+    if (metadata == null) {
+      log.info("Checkpointed metadata in null for dataSource[%s] - inserting 
metadata[%s]", dataSource, resetMetadata);
+      metadataUpdateSuccess = 
indexerMetadataStorageCoordinator.insertDataSourceMetadata(dataSource, 
resetMetadata);
+    } else {
+      if (!checkSourceMetadataMatch(metadata)) {
+        throw InvalidInput.exception(
+            "Datasource metadata instance does not match required, found 
instance of [%s]",
+            metadata.getClass()
+        );
+      }
+      @SuppressWarnings("unchecked")
+      final SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> currentMetadata =
+          (SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType>) metadata;
+      final DataSourceMetadata newMetadata = 
currentMetadata.plus(resetMetadata);
+      log.info("Current checkpointed metadata[%s], new metadata[%s] for 
dataSource[%s]", currentMetadata, newMetadata, dataSource);
+      try {
+        metadataUpdateSuccess = 
indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, 
newMetadata);
+      }
+      catch (IOException e) {
+        log.error("Reset offsets for dataSource[%s] with metadata[%s] failed 
[%s]", dataSource, newMetadata, e.getMessage());
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (!metadataUpdateSuccess) {
+      throw new ISE("Unable to reset metadata[%s] for datasource[%s]", 
dataSource, dataSourceMetadata);
+    }
+
+    resetMetadata.getSeekableStreamSequenceNumbers()
+                 .getPartitionSequenceNumberMap()
+                 .keySet()
+                 .forEach(partition -> {
+                   final int groupId = getTaskGroupIdForPartition(partition);
+                   killTaskGroupForPartitions(
+                       ImmutableSet.of(partition),
+                       "DataSourceMetadata is updated while reset offsets is 
called"
+                   );
+                   activelyReadingTaskGroups.remove(groupId);
+                   // killTaskGroupForPartitions() cleans up partitionGroups.
+                   // Add the removed groups back.
+                   partitionGroups.computeIfAbsent(groupId, k -> new 
HashSet<>());
+                   partitionOffsets.put(partition, getNotSetMarker());
+                 });
+
+  }
+
+
   private void killTask(final String id, String reasonFormat, Object... args)
   {
     Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 822e035984..ba4b963e9b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -22,7 +22,10 @@ package org.apache.druid.indexing.overlord.supervisor;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.easymock.Capture;
@@ -327,6 +330,33 @@ public class SupervisorManagerTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testResetSupervisorWithSpecificOffsets()
+  {
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id1", new TestSupervisorSpec("id1", supervisor1)
+    );
+
+    DataSourceMetadata datasourceMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of("0", "10", "1", "20", "2", "30"),
+            ImmutableSet.of()
+        )
+    );
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    supervisor1.start();
+    supervisor1.resetOffsets(datasourceMetadata);
+    replayAll();
+
+    manager.start();
+    Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", 
datasourceMetadata));
+    Assert.assertFalse("resetInvalidSupervisor", 
manager.resetSupervisor("nobody_home", datasourceMetadata));
+
+    verifyAll();
+  }
+
   @Test
   public void testCreateSuspendResumeAndStopSupervisor()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 60cd027c0d..f1793db633 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.security.Access;
@@ -1198,6 +1200,53 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testResetOffsets()
+  {
+    Capture<String> id1 = Capture.newInstance();
+    Capture<String> id2 = Capture.newInstance();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
+    EasyMock.expect(supervisorManager.resetSupervisor(
+        EasyMock.capture(id1),
+        EasyMock.anyObject(DataSourceMetadata.class)
+    )).andReturn(true);
+    EasyMock.expect(supervisorManager.resetSupervisor(
+        EasyMock.capture(id2),
+        EasyMock.anyObject(DataSourceMetadata.class)
+    )).andReturn(false);
+    replayAll();
+
+    DataSourceMetadata datasourceMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of("0", "10", "1", "20", "2", "30"),
+            ImmutableSet.of()
+        )
+    );
+
+    Response response = supervisorResource.resetOffsets("my-id", 
datasourceMetadata);
+
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+
+    response = supervisorResource.resetOffsets("my-id-2", datasourceMetadata);
+
+    Assert.assertEquals(404, response.getStatus());
+    Assert.assertEquals("my-id", id1.getValue());
+    Assert.assertEquals("my-id-2", id2.getValue());
+    verifyAll();
+
+    resetAll();
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+    replayAll();
+
+    response = supervisorResource.terminate("my-id");
+
+    Assert.assertEquals(503, response.getStatus());
+    verifyAll();
+  }
+
   @Test
   public void testNoopSupervisorSpecSerde() throws Exception
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java
new file mode 100644
index 0000000000..7d87427e45
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+
+public class TestSeekableStreamDataSourceMetadata extends 
SeekableStreamDataSourceMetadata<String, String>
+{
+  @JsonCreator
+  public TestSeekableStreamDataSourceMetadata(
+      @JsonProperty("partitions") SeekableStreamSequenceNumbers<String, 
String> seekableStreamSequenceNumbers)
+  {
+    super(seekableStreamSequenceNumbers);
+  }
+
+  @Override
+  protected SeekableStreamDataSourceMetadata<String, String> 
createConcreteDataSourceMetaData(
+      SeekableStreamSequenceNumbers<String, String> 
seekableStreamSequenceNumbers
+  )
+  {
+    return new 
TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
+  }
+
+  @Override
+  public DataSourceMetadata asStartMetadata()
+  {
+    return null;
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 46f86bacb4..7e8afdf6bc 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -32,6 +32,8 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
@@ -56,6 +58,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
@@ -81,6 +84,7 @@ import 
org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -90,6 +94,7 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -171,8 +176,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     EasyMock.expectLastCall().times(0, 1);
 
-    EasyMock
-        
.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
     
EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
     
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("10").anyTimes();
   }
@@ -831,7 +835,6 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null
     );
 
-
     supervisor.start();
 
     Assert.assertTrue(supervisor.stateManager.isHealthy());
@@ -1039,6 +1042,523 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     );
   }
 
+  @Test
+  public void testSupervisorResetAllWithCheckpoints() throws 
InterruptedException
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(
+        true
+    );
+    taskQueue.shutdown("task1", "DataSourceMetadata is not found while reset");
+    EasyMock.expectLastCall();
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "5"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "6"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+    supervisor.reset(null);
+    validateSupervisorStateAfterResetOffsets(supervisor, ImmutableMap.of(), 0);
+  }
+
+  @Test
+  public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() 
throws InterruptedException, IOException
+  {
+    final ImmutableMap<String, String> checkpointOffsets = 
ImmutableMap.of("0", "0", "1", "10", "2", "20", "3", "30");
+    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", 
"1000", "2", "2500");
+    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", 
"1000", "1", "10", "2", "2500", "3", "30");
+
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new TestSeekableStreamDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<>(
+                STREAM,
+                checkpointOffsets
+            )
+        )
+    );
+    
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
 new TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            STREAM,
+            expectedOffsets
+        ))
+    )).andReturn(
+        true
+    );
+
+    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset 
offsets is called");
+    EasyMock.expectLastCall();
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        checkpointOffsets,
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            STREAM,
+            resetOffsets
+        )
+    );
+
+    Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+    supervisor.resetOffsets(resetMetadata);
+
+    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
+  }
+
+  @Test
+  public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws 
InterruptedException, IOException
+  {
+    final ImmutableMap<String, String> checkpointOffsets = 
ImmutableMap.of("0", "5", "1", "6", "2", "100");
+    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", 
"10", "1", "8");
+    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", 
"10", "1", "8", "2", "100");
+
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new TestSeekableStreamDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<>(
+                STREAM,
+                checkpointOffsets
+            )
+        )
+    );
+    
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
 new TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            "stream",
+            expectedOffsets
+        )
+    ))).andReturn(true);
+    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset 
offsets is called");
+    EasyMock.expectLastCall();
+
+    taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset 
offsets is called");
+    EasyMock.expectLastCall();
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    // Spin off two active tasks with each task serving one partition.
+    supervisor.getIoConfig().setTaskCount(3);
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "5"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "6"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("2"),
+        ImmutableMap.of("2", "100"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task3"),
+        ImmutableSet.of()
+    );
+
+    final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            STREAM,
+            resetOffsets
+        )
+    );
+
+    Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+    supervisor.resetOffsets(resetMetadata);
+
+    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
+  }
+
+  @Test
+  public void testSupervisorResetOffsetsWithNoCheckpoints() throws 
InterruptedException
+  {
+    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", 
"10", "1", "8");
+    final ImmutableMap<String, String> expectedOffsets = 
ImmutableMap.copyOf(resetOffsets);
+
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.insertDataSourceMetadata(DATASOURCE,
 new TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            "stream",
+            expectedOffsets
+        )
+    ))).andReturn(true);
+    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset 
offsets is called");
+    EasyMock.expectLastCall();
+
+    taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset 
offsets is called");
+    EasyMock.expectLastCall();
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    // Spin off three active tasks with each task serving one partition.
+    supervisor.getIoConfig().setTaskCount(3);
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "5"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "6"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("2"),
+        ImmutableMap.of("2", "100"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task3"),
+        ImmutableSet.of()
+    );
+
+    final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            STREAM,
+            resetOffsets
+        )
+    );
+
+    Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+    supervisor.resetOffsets(resetMetadata);
+
+    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
+  }
+
+
+  @Test
+  public void testSupervisorResetWithNoPartitions() throws IOException, 
InterruptedException
+  {
+    final ImmutableMap<String, String> checkpointOffsets = 
ImmutableMap.of("0", "5", "1", "6");
+    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of();
+    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", 
"5", "1", "6");
+
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new TestSeekableStreamDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<>(
+                STREAM,
+                checkpointOffsets
+            )
+        )
+    );
+    
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
 new TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            "stream",
+            expectedOffsets
+        )
+    ))).andReturn(true);
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    // Spin off two active tasks with each task serving one partition.
+    supervisor.getIoConfig().setTaskCount(2);
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "5"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "6"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            STREAM,
+            resetOffsets
+        )
+    );
+
+    Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+    supervisor.resetOffsets(resetMetadata);
+
+    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 2);
+  }
+
+  @Test
+  public void testSupervisorResetWithNewPartition() throws IOException, 
InterruptedException
+  {
+    final ImmutableMap<String, String> checkpointOffsets = 
ImmutableMap.of("0", "5", "1", "6");
+    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("2", 
"20");
+    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", 
"5", "1", "6", "2", "20");
+
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new TestSeekableStreamDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<>(
+                STREAM,
+                checkpointOffsets
+            )
+        )
+    );
+    
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
 new TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            "stream",
+            expectedOffsets
+        )
+    ))).andReturn(true);
+    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset 
offsets is called");
+    EasyMock.expectLastCall();
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    // Spin off two active tasks with each task serving one partition.
+    supervisor.getIoConfig().setTaskCount(2);
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "5"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "6"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            STREAM,
+            resetOffsets
+        )
+    );
+
+    Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+    supervisor.resetOffsets(resetMetadata);
+
+    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
+  }
+
+  @Test
+  public void testSupervisorNoResetDataSourceMetadata()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "0"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "0"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    verifyAll();
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(DruidException.class, () ->
+            supervisor.resetOffsets(null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "Reset dataSourceMetadata is required for resetOffsets."
+        )
+    );
+  }
+
+  @Test
+  public void testSupervisorResetWithInvalidStartSequenceMetadata()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "0"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "0"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    verifyAll();
+
+    final DataSourceMetadata dataSourceMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamStartSequenceNumbers<>(
+            "i-am-not-real",
+            ImmutableMap.of("0", "10", "1", "20", "2", "30"),
+            ImmutableSet.of()
+        )
+    );
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(DruidException.class, () ->
+            supervisor.resetOffsets(dataSourceMetadata)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            StringUtils.format(
+                "Provided datasourceMetadata[%s] is invalid. Sequence numbers 
can only be of type[SeekableStreamEndSequenceNumbers], but 
found[SeekableStreamStartSequenceNumbers].",
+                dataSourceMetadata
+            )
+        )
+    );
+  }
+
+  @Test
+  public void testSupervisorResetInvalidStream()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "0"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "0"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    verifyAll();
+
+    final DataSourceMetadata dataSourceMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(
+            "i-am-not-real",
+            ImmutableMap.of("0", "10", "1", "20", "2", "30")
+        )
+    );
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(DruidException.class, () ->
+            supervisor.resetOffsets(dataSourceMetadata)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "Stream[i-am-not-real] doesn't exist in the 
supervisor[testSupervisorId]. Supervisor is consuming stream[stream]."
+        )
+    );
+  }
+
   @Test
   public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
   {
@@ -1063,6 +1583,25 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertEquals(0, emitter.getEvents().size());
   }
 
+  private void validateSupervisorStateAfterResetOffsets(
+      final TestSeekableStreamSupervisor supervisor,
+      final ImmutableMap<String, String> expectedResetOffsets,
+      final int expectedActiveTaskCount
+  ) throws InterruptedException
+  {
+    // Wait for the notice queue to be drained asynchronously before we 
validate the supervisor's final state.
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+    Thread.sleep(1000);
+    Assert.assertEquals(expectedActiveTaskCount, 
supervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(expectedResetOffsets.size(), 
supervisor.getPartitionOffsets().size());
+    for (Map.Entry<String, String> entry : expectedResetOffsets.entrySet()) {
+      Assert.assertEquals(supervisor.getNotSetMarker(), 
supervisor.getPartitionOffsets().get(entry.getKey()));
+    }
+    verifyAll();
+  }
+
   @Test
   public void testScheduleReporting()
   {
@@ -1419,7 +1958,12 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     @Override
     protected int getTaskGroupIdForPartition(String partition)
     {
-      return 0;
+      try {
+        return Integer.parseInt(partition) % spec.getIoConfig().getTaskCount();
+      }
+      catch (NumberFormatException e) {
+        return 0;
+      }
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index cedf4677e5..e733ef6c23 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -163,6 +163,11 @@ public class NoopSupervisorSpec implements SupervisorSpec
       {
       }
 
+      @Override
+      public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
+      {
+      }
+
       @Override
       public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)
       {
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 10c48578a6..bcfc5ebe81 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -61,8 +62,20 @@ public interface Supervisor
     return null; // default implementation for interface compatability; 
returning null since true or false is misleading
   }
 
+  /**
+   * Resets all offsets for a dataSource.
+   * @param dataSourceMetadata optional dataSource metadata.
+   */
   void reset(DataSourceMetadata dataSourceMetadata);
 
+  /**
+   * Reset offsets with provided dataSource metadata. The resulting stored 
offsets should be a union of existing checkpointed
+   * offsets with provided offsets.
+   * @param resetDataSourceMetadata required datasource metadata with offsets 
to reset.
+   * @throws DruidException if any metadata attribute doesn't match the 
supervisor's state.
+   */
+  void resetOffsets(DataSourceMetadata resetDataSourceMetadata);
+
   /**
    * The definition of checkpoint is not very strict as currently it does not 
affect data or control path.
    * On this call Supervisor can potentially checkpoint data processed so far 
to some durable storage
diff --git 
a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java 
b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index 742854b26d..a6e4b8fe09 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -19,8 +19,10 @@
 
 package org.apache.druid.indexing;
 
+import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.junit.Assert;
@@ -71,4 +73,20 @@ public class NoopSupervisorSpecTest
     NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, 
Collections.singletonList("datasource1"));
     Assert.assertTrue(noopSupervisorSpec.getInputSourceResources().isEmpty());
   }
+
+  @Test
+  public void testNoppSupervisorResetOffsetsDoNothing()
+  {
+    NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
+    Supervisor noOpSupervisor = expectedSpec.createSupervisor();
+    Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+    noOpSupervisor.resetOffsets(null);
+    Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
noOpSupervisor.getState());
+
+    Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+    noOpSupervisor.resetOffsets(new ObjectMetadata("someObject"));
+    Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
noOpSupervisor.getState());
+  }
 }
diff --git a/website/.spelling b/website/.spelling
index 0f35993488..b34ad9c6e8 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1098,6 +1098,7 @@ numProcessors
 q.size
 repartitionTransitionDuration
 replicastaskCounttaskCount
+resetOffsets
 resetuseEarliestSequenceNumberPOST
 resumePOST
 
statusrecentErrorsdruid.supervisor.maxStoredExceptionEventsstatedetailedStatestatedetailedStatestatestatePENDINGRUNNINGSUSPENDEDSTOPPINGUNHEALTHY_SUPERVISORUNHEALTHY_TASKSdetailedStatestatedruid.supervisor.unhealthinessThresholddruid.supervisor.taskUnhealthinessThresholdtaskDurationtaskCountreplicasdetailedStatedetailedStateRUNNINGPOST


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to