abhishekrb19 commented on code in PR #14772:
URL: https://github.com/apache/druid/pull/14772#discussion_r1291757329
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -998,10 +1006,66 @@ public void stop(boolean stopGracefully)
}
@Override
- public void reset(DataSourceMetadata dataSourceMetadata)
+ public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
+ {
+ log.info("Posting ResetNotice with datasource metadata [%s]",
dataSourceMetadata);
+ addNotice(new ResetNotice(dataSourceMetadata, false));
+ }
+
+ /**
+ * Reset offsets with provided dataSource metadata. Validates {@code
resetDataSourceMetadata},
+ * creates a {@code ResetNotice} with the metadata and adds it to the notice
queue. The resulting stored offsets
+ * is a union of existing checkpointed offsets with provided offsets.
+ * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
+ * @throws DruidException if any metadata attribute doesn't match the
supervisor's.
+ */
+ @Override
+ public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
- log.info("Posting ResetNotice");
- addNotice(new ResetNotice(dataSourceMetadata));
+ if (resetDataSourceMetadata == null) {
+ throw InvalidInput.exception("Reset dataSourceMetadata is required for
resetOffsets.");
+ }
+
+ if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+ throw InvalidInput.exception(
+ "Datasource metadata instance does not match required, found
instance of [%s].",
+ resetDataSourceMetadata.getClass()
+ );
+ }
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> resetMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) resetDataSourceMetadata;
+ String resetStream =
resetMetadata.getSeekableStreamSequenceNumbers().getStream();
+ if
(!resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream()))
{
+ throw InvalidInput.exception(
+ "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is
consuming stream[%s].",
+ resetStream,
+ supervisorId,
+ ioConfig.getStream()
+ );
+ }
+ for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset :
resetMetadata
+ .getSeekableStreamSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .entrySet()) {
+ final TaskGroup taskGroup =
activelyReadingTaskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
+ if (taskGroup == null) {
+ throw InvalidInput.exception("No task found serving stream[%s],
partition[%s] in supervisor[%s].",
+ resetStream,
resetPartitionOffset.getKey(), supervisorId);
+ } else if
(!taskGroup.startingSequences.containsKey(resetPartitionOffset.getKey())) {
+ throw InvalidInput.exception(
+ "Partition[%s] doesn't exist in checkpointed metadata for
stream[%s] and supervisor[%s]."
Review Comment:
Good point, removed this check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]