kfaraz commented on code in PR #17955:
URL: https://github.com/apache/druid/pull/17955#discussion_r2065408957


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean 
suspend)
     );
   }
 
+  @Override
+  public void validateProposedSpecEvolution(SupervisorSpec that) throws 
IllegalArgumentException
+  {
+    if (!(that instanceof KafkaSupervisorSpec)) {
+      throw new IllegalArgumentException("Cannot evolve to " + that.getType() 
+ " from " + getType());
+    }
+    KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+    if (this.getSource() == null || other.getSource() == null) {
+      // I don't think this is possible, but covering just in case.
+      throw new IllegalArgumentException(
+          "Cannot consider KafkaSupervisorSpec evolution when one or both of 
the specs have not provided either a "
+          + "topic OR topicPattern");
+    }
+
+    // Future enhancements could allow for topicPattern to be changed in a way 
where the new source is additive to the
+    // old source. If we did that now, there would be metadata issues due to 
{@link KafkaDataSourceMetadata}
+    // implementation details that aren't set up to handle evolution of 
metadata in this way.
+    if (!this.getSource().equals(other.getSource())) {

Review Comment:
   Should this impl be present for all `SeekableStreamSupervisorSpec`s and not 
just Kafka?



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java:
##########
@@ -120,6 +120,12 @@ public String getSource()
     return "";
   }
 
+  @Override
+  public void validateProposedSpecEvolution(SupervisorSpec that) throws 
IllegalArgumentException
+  {
+    // No validation logic for compaction spec as of now

Review Comment:
   Seems like this should have been the default impl in the interface itself.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean 
suspend)
     );
   }
 
+  @Override
+  public void validateProposedSpecEvolution(SupervisorSpec that) throws 
IllegalArgumentException
+  {
+    if (!(that instanceof KafkaSupervisorSpec)) {
+      throw new IllegalArgumentException("Cannot evolve to " + that.getType() 
+ " from " + getType());
+    }
+    KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+    if (this.getSource() == null || other.getSource() == null) {
+      // I don't think this is possible, but covering just in case.

Review Comment:
   ```suggestion
         // Not likely to happen, but covering just in case.
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -100,4 +100,16 @@ default Set<ResourceAction> getInputSourceResources() 
throws UnsupportedOperatio
    * @return source like stream or topic name
    */
   String getSource();
+
+  /**
+   * Checks if a proposed evolution of the supervisor spec is allowed.
+   * <p>
+   * SupervisorSpec `that` is proposed to replace the current supervisor spec. 
Implementations of this method determine
+   * if the system should allow this evolution.
+   * </p>
+   *
+   * @param that the proposed supervisor spec
+   * @throws IllegalArgumentException if the evolution is not allowed
+   */
+  void validateProposedSpecEvolution(SupervisorSpec that) throws 
IllegalArgumentException;

Review Comment:
   Maybe use a simpler method name like `validateSpecUpdateTo`, and the arg can 
be called `proposedSpec`.
   
   We should throw a DruidException (of type not supported or invalid input) 
instead.
   
   Also, please add a default impl to avoid having to override in all impls 
where this is not needed right now.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean 
suspend)
     );
   }
 
+  @Override
+  public void validateProposedSpecEvolution(SupervisorSpec that) throws 
IllegalArgumentException
+  {
+    if (!(that instanceof KafkaSupervisorSpec)) {
+      throw new IllegalArgumentException("Cannot evolve to " + that.getType() 
+ " from " + getType());
+    }
+    KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+    if (this.getSource() == null || other.getSource() == null) {
+      // I don't think this is possible, but covering just in case.
+      throw new IllegalArgumentException(
+          "Cannot consider KafkaSupervisorSpec evolution when one or both of 
the specs have not provided either a "
+          + "topic OR topicPattern");
+    }
+
+    // Future enhancements could allow for topicPattern to be changed in a way 
where the new source is additive to the
+    // old source. If we did that now, there would be metadata issues due to 
{@link KafkaDataSourceMetadata}
+    // implementation details that aren't set up to handle evolution of 
metadata in this way.
+    if (!this.getSource().equals(other.getSource())) {
+      throw new IllegalArgumentException(
+          "Your proposed KafkaSupervisorSpec evolution is invalid. You are 
attempting to change the topic/topicPattern "
+          + "from " + this.getSource() + " to " + other.getSource() + ". This 
is not supported. If you "
+          + "want to change the topic or topicPattern for a supervisor, you 
must first terminate the supervisor. "
+          + "Then create a new one in suspended state with the new topic or 
topicPattern. Lastly, you will have to "
+          + "reset the supervisor offsets. Finally, you can resume the new 
supervisor. Note that doing this reset can "
+          + "cause duplicate events or lost events if any topics who were in 
the previous supervisor remain in the new "
+          + "one.");

Review Comment:
   Suggestion for more concise language and formatted message.
   
   ```suggestion
             "Update of topic/topicPattern from [%s] to [%s] is not supported 
for a running Kafka supervisor."
             + "%nTo perform the update safely, follow these steps.
             + "%n(1) Suspend this supervisor, reset its offsets and then 
terminate it. "
             + "%n(2) Create a new supervisor with the new topic or 
topicPattern."
             + "%nNote that doing the reset can cause data duplication or loss 
if any topic used in the old supervisor is included in the new one too.");
   ```
   
   I have updated the steps, please fix it up if it doesn't seem correct.



##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -134,6 +134,12 @@ If you enable multi-topic ingestion for a datasource, 
downgrading to a version o
 28.0.0 will cause the ingestion for that datasource to fail.
 :::
 
+:::info
+Migrating an existing supervisor to use `topicPattern` instead of `topic` is 
not supported. It is also not supported to change the `topicPattern` of an 
existing supervisor to a different regex pattern.

Review Comment:
   Thanks for adding this.
   
   Maybe you can put it in a list format for readability like 
   ```
   The following updates to a supervisor spec are not supported:
   - item 1
   - item 2
   - item 3
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -174,6 +174,12 @@ public boolean 
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
       final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
+      if (shouldUpdateSpec) {
+        SupervisorSpec existingSpec = getSpec(spec.getId());
+        if (existingSpec != null) {
+          existingSpec.validateProposedSpecEvolution(spec);
+        }

Review Comment:
   This logic should probably live in `shouldUpdateSupervisor` method itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to