abhishekrb19 commented on code in PR #14772:
URL: https://github.com/apache/druid/pull/14772#discussion_r1291921303


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -998,10 +1006,66 @@ public void stop(boolean stopGracefully)
   }
 
   @Override
-  public void reset(DataSourceMetadata dataSourceMetadata)
+  public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
+  {
+    log.info("Posting ResetNotice with datasource metadata [%s]", 
dataSourceMetadata);
+    addNotice(new ResetNotice(dataSourceMetadata, false));
+  }
+
+  /**
+   * Reset offsets with provided dataSource metadata. Validates {@code 
resetDataSourceMetadata},
+   * creates a {@code ResetNotice} with the metadata and adds it to the notice 
queue. The resulting stored offsets
+   * is a union of existing checkpointed offsets with provided offsets.
+   * @param resetDataSourceMetadata required datasource metadata with offsets 
to reset.
+   * @throws DruidException if any metadata attribute doesn't match the 
supervisor's.
+   */
+  @Override
+  public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
   {
-    log.info("Posting ResetNotice");
-    addNotice(new ResetNotice(dataSourceMetadata));
+    if (resetDataSourceMetadata == null) {
+      throw InvalidInput.exception("Reset dataSourceMetadata is required for 
resetOffsets.");
+    }
+
+    if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+      throw InvalidInput.exception(
+          "Datasource metadata instance does not match required, found 
instance of [%s].",
+          resetDataSourceMetadata.getClass()
+      );
+    }
+    @SuppressWarnings("unchecked")
+    final SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> resetMetadata =
+        (SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType>) resetDataSourceMetadata;
+    String resetStream = 
resetMetadata.getSeekableStreamSequenceNumbers().getStream();
+    if 
(!resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream()))
 {
+      throw InvalidInput.exception(
+          "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is 
consuming stream[%s].",
+          resetStream,
+          supervisorId,
+          ioConfig.getStream()
+      );
+    }
+    for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset : 
resetMetadata
+        .getSeekableStreamSequenceNumbers()
+        .getPartitionSequenceNumberMap()
+        .entrySet()) {
+      final TaskGroup taskGroup = 
activelyReadingTaskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));

Review Comment:
   Adjusted based on our discussions, thanks! A user can issue a reset offsets 
even when the supervisor is suspended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to