kfaraz commented on code in PR #17137:
URL: https://github.com/apache/druid/pull/17137#discussion_r1772985860
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4170,7 +4167,7 @@ public SeekableStreamSupervisorIOConfig getIoConfig()
return ioConfig;
}
- @Override
+
public void checkpoint(int taskGroupId, DataSourceMetadata
checkpointMetadata)
Review Comment:
Please add the javadocs originally present in `Supervisor` interface.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -445,4 +449,22 @@ private boolean
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
return true;
}
+
+ private SeekableStreamSupervisor requireSeekableStreamSupervisor(final
String supervisorId, final String operation)
+ {
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ if (supervisor.lhs instanceof SeekableStreamSupervisor) {
+ // cast and return
Review Comment:
Nit: self-explanatory
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -445,4 +449,22 @@ private boolean
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
return true;
}
+
+ private SeekableStreamSupervisor requireSeekableStreamSupervisor(final
String supervisorId, final String operation)
+ {
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ if (supervisor.lhs instanceof SeekableStreamSupervisor) {
+ // cast and return
+ return (SeekableStreamSupervisor) supervisor.lhs;
+ } else {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.UNSUPPORTED)
+ .build(
+ "[%s] operation is not supported by the
supervisor[%s] of type[%s].",
Review Comment:
```suggestion
"Operation[%s] is not supported by
supervisor[%s] of type[%s].",
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -63,44 +65,8 @@ default Boolean isHealthy()
}
/**
- * Resets all offsets for a dataSource.
+ * Resets any stored metadata by the supervisor.
* @param dataSourceMetadata optional dataSource metadata.
*/
- void reset(DataSourceMetadata dataSourceMetadata);
-
- /**
- * Reset offsets with provided dataSource metadata. The resulting stored
offsets should be a union of existing checkpointed
- * offsets with provided offsets.
- * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
- * @throws DruidException if any metadata attribute doesn't match the
supervisor's state.
- */
- void resetOffsets(DataSourceMetadata resetDataSourceMetadata);
-
- /**
- * The definition of checkpoint is not very strict as currently it does not
affect data or control path.
- * On this call Supervisor can potentially checkpoint data processed so far
to some durable storage
- * for example - Kafka/Kinesis Supervisor uses this to merge and handoff
segments containing at least the data
- * represented by {@param currentCheckpoint} DataSourceMetadata
- *
- * @param taskGroupId unique Identifier to figure out for which
sequence to do checkpointing
- * @param checkpointMetadata metadata for the sequence to currently
checkpoint
- */
- void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
Review Comment:
Can we move the javadoc for this method and others to
`SeeakableStreamSupervisor`?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -445,4 +449,22 @@ private boolean
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
return true;
}
+
+ private SeekableStreamSupervisor requireSeekableStreamSupervisor(final
String supervisorId, final String operation)
+ {
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ if (supervisor.lhs instanceof SeekableStreamSupervisor) {
+ // cast and return
+ return (SeekableStreamSupervisor) supervisor.lhs;
+ } else {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.UNSUPPORTED)
+ .build(
+ "[%s] operation is not supported by the
supervisor[%s] of type[%s].",
+ operation,
+ supervisorId,
+ supervisor.rhs.getType()
Review Comment:
Nit: Can be in the same line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]