kfaraz commented on code in PR #17137:
URL: https://github.com/apache/druid/pull/17137#discussion_r1772985860


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4170,7 +4167,7 @@ public SeekableStreamSupervisorIOConfig getIoConfig()
     return ioConfig;
   }
 
-  @Override
+
   public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)

Review Comment:
   Please add the javadocs originally present in `Supervisor` interface.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -445,4 +449,22 @@ private boolean 
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
 
     return true;
   }
+
+  private SeekableStreamSupervisor requireSeekableStreamSupervisor(final 
String supervisorId, final String operation)
+  {
+    Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+    if (supervisor.lhs instanceof SeekableStreamSupervisor) {
+      // cast and return

Review Comment:
   Nit: self-explanatory



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -445,4 +449,22 @@ private boolean 
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
 
     return true;
   }
+
+  private SeekableStreamSupervisor requireSeekableStreamSupervisor(final 
String supervisorId, final String operation)
+  {
+    Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+    if (supervisor.lhs instanceof SeekableStreamSupervisor) {
+      // cast and return
+      return (SeekableStreamSupervisor) supervisor.lhs;
+    } else {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.UNSUPPORTED)
+                          .build(
+                              "[%s] operation is not supported by the 
supervisor[%s] of type[%s].",

Review Comment:
   ```suggestion
                                 "Operation[%s] is not supported by 
supervisor[%s] of type[%s].",
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -63,44 +65,8 @@ default Boolean isHealthy()
   }
 
   /**
-   * Resets all offsets for a dataSource.
+   * Resets any stored metadata by the supervisor.
    * @param dataSourceMetadata optional dataSource metadata.
    */
-  void reset(DataSourceMetadata dataSourceMetadata);
-
-  /**
-   * Reset offsets with provided dataSource metadata. The resulting stored 
offsets should be a union of existing checkpointed
-   * offsets with provided offsets.
-   * @param resetDataSourceMetadata required datasource metadata with offsets 
to reset.
-   * @throws DruidException if any metadata attribute doesn't match the 
supervisor's state.
-   */
-  void resetOffsets(DataSourceMetadata resetDataSourceMetadata);
-
-  /**
-   * The definition of checkpoint is not very strict as currently it does not 
affect data or control path.
-   * On this call Supervisor can potentially checkpoint data processed so far 
to some durable storage
-   * for example - Kafka/Kinesis Supervisor uses this to merge and handoff 
segments containing at least the data
-   * represented by {@param currentCheckpoint} DataSourceMetadata
-   *
-   * @param taskGroupId        unique Identifier to figure out for which 
sequence to do checkpointing
-   * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
-   */
-  void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);

Review Comment:
   Can we move the javadoc for this method and others to 
`SeeakableStreamSupervisor`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -445,4 +449,22 @@ private boolean 
createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
 
     return true;
   }
+
+  private SeekableStreamSupervisor requireSeekableStreamSupervisor(final 
String supervisorId, final String operation)
+  {
+    Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+    if (supervisor.lhs instanceof SeekableStreamSupervisor) {
+      // cast and return
+      return (SeekableStreamSupervisor) supervisor.lhs;
+    } else {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.UNSUPPORTED)
+                          .build(
+                              "[%s] operation is not supported by the 
supervisor[%s] of type[%s].",
+                              operation,
+                              supervisorId,
+                              supervisor.rhs.getType()

Review Comment:
   Nit: Can be in the same line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to