jon-wei commented on code in PR #14772:
URL: https://github.com/apache/druid/pull/14772#discussion_r1287787896
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -998,10 +1006,66 @@ public void stop(boolean stopGracefully)
}
@Override
- public void reset(DataSourceMetadata dataSourceMetadata)
+ public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
+ {
+ log.info("Posting ResetNotice with datasource metadata [%s]",
dataSourceMetadata);
+ addNotice(new ResetNotice(dataSourceMetadata, false));
+ }
+
+ /**
+ * Reset offsets with provided dataSource metadata. Validates {@code
resetDataSourceMetadata},
+ * creates a {@code ResetNotice} with the metadata and adds it to the notice
queue. The resulting stored offsets
+ * is a union of existing checkpointed offsets with provided offsets.
+ * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
+ * @throws DruidException if any metadata attribute doesn't match the
supervisor's.
+ */
+ @Override
+ public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
- log.info("Posting ResetNotice");
- addNotice(new ResetNotice(dataSourceMetadata));
+ if (resetDataSourceMetadata == null) {
+ throw InvalidInput.exception("Reset dataSourceMetadata is required for
resetOffsets.");
+ }
+
+ if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+ throw InvalidInput.exception(
+ "Datasource metadata instance does not match required, found
instance of [%s].",
+ resetDataSourceMetadata.getClass()
+ );
+ }
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> resetMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) resetDataSourceMetadata;
+ String resetStream =
resetMetadata.getSeekableStreamSequenceNumbers().getStream();
+ if
(!resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream()))
{
Review Comment:
nit: resetStream is already declared above and can be used in this if
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3614,7 +3692,8 @@ private OrderedSequenceNumber<SequenceOffsetType>
getOffsetFromStorageForPartiti
if (!checkOffsetAvailability(partition, sequence)) {
if (taskTuningConfig.isResetOffsetAutomatically()) {
resetInternal(
- createDataSourceMetaDataForReset(ioConfig.getStream(),
ImmutableMap.of(partition, sequence))
+ createDataSourceMetaDataForReset(ioConfig.getStream(),
ImmutableMap.of(partition, sequence)),
+ false
Review Comment:
what does the `false` parameter here mean?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -998,10 +1006,66 @@ public void stop(boolean stopGracefully)
}
@Override
- public void reset(DataSourceMetadata dataSourceMetadata)
+ public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
+ {
+ log.info("Posting ResetNotice with datasource metadata [%s]",
dataSourceMetadata);
+ addNotice(new ResetNotice(dataSourceMetadata, false));
+ }
+
+ /**
+ * Reset offsets with provided dataSource metadata. Validates {@code
resetDataSourceMetadata},
+ * creates a {@code ResetNotice} with the metadata and adds it to the notice
queue. The resulting stored offsets
+ * is a union of existing checkpointed offsets with provided offsets.
+ * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
+ * @throws DruidException if any metadata attribute doesn't match the
supervisor's.
+ */
+ @Override
+ public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
- log.info("Posting ResetNotice");
- addNotice(new ResetNotice(dataSourceMetadata));
+ if (resetDataSourceMetadata == null) {
+ throw InvalidInput.exception("Reset dataSourceMetadata is required for
resetOffsets.");
+ }
+
+ if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+ throw InvalidInput.exception(
+ "Datasource metadata instance does not match required, found
instance of [%s].",
+ resetDataSourceMetadata.getClass()
+ );
+ }
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> resetMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) resetDataSourceMetadata;
+ String resetStream =
resetMetadata.getSeekableStreamSequenceNumbers().getStream();
+ if
(!resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream()))
{
+ throw InvalidInput.exception(
+ "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is
consuming stream[%s].",
+ resetStream,
+ supervisorId,
+ ioConfig.getStream()
+ );
+ }
+ for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset :
resetMetadata
+ .getSeekableStreamSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .entrySet()) {
+ final TaskGroup taskGroup =
activelyReadingTaskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
Review Comment:
if the supervisor is suspended, would there be any
activelyReadingTaskGroups, and would that block the reset?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -998,10 +1006,66 @@ public void stop(boolean stopGracefully)
}
@Override
- public void reset(DataSourceMetadata dataSourceMetadata)
+ public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
+ {
+ log.info("Posting ResetNotice with datasource metadata [%s]",
dataSourceMetadata);
+ addNotice(new ResetNotice(dataSourceMetadata, false));
+ }
+
+ /**
+ * Reset offsets with provided dataSource metadata. Validates {@code
resetDataSourceMetadata},
+ * creates a {@code ResetNotice} with the metadata and adds it to the notice
queue. The resulting stored offsets
+ * is a union of existing checkpointed offsets with provided offsets.
+ * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
+ * @throws DruidException if any metadata attribute doesn't match the
supervisor's.
+ */
+ @Override
+ public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
- log.info("Posting ResetNotice");
- addNotice(new ResetNotice(dataSourceMetadata));
+ if (resetDataSourceMetadata == null) {
+ throw InvalidInput.exception("Reset dataSourceMetadata is required for
resetOffsets.");
+ }
+
+ if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+ throw InvalidInput.exception(
+ "Datasource metadata instance does not match required, found
instance of [%s].",
+ resetDataSourceMetadata.getClass()
+ );
+ }
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> resetMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) resetDataSourceMetadata;
+ String resetStream =
resetMetadata.getSeekableStreamSequenceNumbers().getStream();
+ if
(!resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream()))
{
+ throw InvalidInput.exception(
+ "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is
consuming stream[%s].",
+ resetStream,
+ supervisorId,
+ ioConfig.getStream()
+ );
+ }
+ for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset :
resetMetadata
+ .getSeekableStreamSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .entrySet()) {
+ final TaskGroup taskGroup =
activelyReadingTaskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
+ if (taskGroup == null) {
+ throw InvalidInput.exception("No task found serving stream[%s],
partition[%s] in supervisor[%s].",
+ resetStream,
resetPartitionOffset.getKey(), supervisorId);
+ } else if
(!taskGroup.startingSequences.containsKey(resetPartitionOffset.getKey())) {
+ throw InvalidInput.exception(
+ "Partition[%s] doesn't exist in checkpointed metadata for
stream[%s] and supervisor[%s]."
Review Comment:
Do we want to enforce that the partition must exist in the current
checkpoint? (e.g., maybe someone repartitioned their stream and then wants to
start from a specific offset in the newly added partitions)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]