kfaraz commented on code in PR #19091:
URL: https://github.com/apache/druid/pull/19091#discussion_r2907206664


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -427,6 +430,48 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
     return false;
   }
 
+  /**
+   * Checks if there is a Task distinct from the given {@code taskId} or its 
replicas
+   * that is currently waiting to publish offsets for the given partitions.
+   */
+  public boolean isAnotherTaskGroupPublishingToPartitions(
+      String supervisorId,
+      String taskId,
+      DataSourceMetadata startMetadata
+  )
+  {
+    try {
+      InvalidInput.conditionalException(supervisorId != null, "'supervisorId' 
cannot be null");
+      if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+        throw InvalidInput.exception("Start metadata[%s] is not streaming 
metadata", supervisorId);
+      }
+
+      Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+      if (supervisor == null) {
+        throw NotFound.exception("Could not find supervisor[%s]", 
supervisorId);
+      }
+      if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
+        throw InvalidInput.exception("Supervisor[%s] is not a streaming 
supervisor", supervisorId);

Review Comment:
   updated.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -427,6 +430,48 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
     return false;
   }
 
+  /**
+   * Checks if there is a Task distinct from the given {@code taskId} or its 
replicas
+   * that is currently waiting to publish offsets for the given partitions.
+   */
+  public boolean isAnotherTaskGroupPublishingToPartitions(
+      String supervisorId,
+      String taskId,
+      DataSourceMetadata startMetadata
+  )
+  {
+    try {
+      InvalidInput.conditionalException(supervisorId != null, "'supervisorId' 
cannot be null");
+      if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+        throw InvalidInput.exception("Start metadata[%s] is not streaming 
metadata", supervisorId);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to