This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e87ec7aa91 Restrict input stream modification in existing seekable 
stream supervisors (#17955)
1e87ec7aa91 is described below

commit 1e87ec7aa9151b3094ad432fbea80ce07e6f2260
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri May 9 02:14:26 2025 -0500

    Restrict input stream modification in existing seekable stream supervisors 
(#17955)
    
    * Restrict kafka topic evolution in existing supervisors
    
    * Refactor stream validation code based on review
    
    * Provide default impl of validateSpecUpdateTo with an enhanced override 
for Kafka
    
    Since all existing implementations of the SeekableStreamSupervisor do not 
support stream modification for existing supervisors, it makes sense to 
centralize a default implementation of spec update validation. Kafka requries 
some override because the idea of single and multi topic ingest within kafka is 
unique to kafka
    
    * Improve docs
    
    * Improve test coverage
    
    * Test improvments and general cleanup following review round
---
 docs/ingestion/kafka-ingestion.md                  |  12 +-
 extensions-core/kafka-indexing-service/pom.xml     |   5 +
 .../kafka/supervisor/KafkaSupervisorSpec.java      |  33 ++++
 .../kafka/supervisor/KafkaSupervisorSpecTest.java  | 185 +++++++++++++++++++++
 .../overlord/supervisor/SupervisorManager.java     |  20 ++-
 .../supervisor/SeekableStreamSupervisorSpec.java   |  36 ++++
 .../OverlordSecurityResourceFilterTest.java        |   1 +
 .../overlord/supervisor/SupervisorManagerTest.java |  61 +++++++
 .../SeekableStreamSupervisorSpecTest.java          | 162 ++++++++++++++++++
 .../overlord/supervisor/SupervisorSpec.java        |  16 ++
 10 files changed, 527 insertions(+), 4 deletions(-)

diff --git a/docs/ingestion/kafka-ingestion.md 
b/docs/ingestion/kafka-ingestion.md
index b7eccbf0151..67b1a96a3d7 100644
--- a/docs/ingestion/kafka-ingestion.md
+++ b/docs/ingestion/kafka-ingestion.md
@@ -120,7 +120,7 @@ For configuration properties shared across all streaming 
ingestion methods, refe
 
 |Property|Type|Description|Required|Default|
 |--------|----|-----------|--------|-------|
-|`topic`|String|The Kafka topic to read from. To ingest data from multiple 
topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
+|`topic`|String|The Kafka topic to read from. Note that once this value is 
established for a supervisor, updating it is not supported. To ingest data from 
multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
 |`topicPattern`|String|Multiple Kafka topics to read from, passed as a regex 
pattern. See [Ingest from multiple topics](#ingest-from-multiple-topics) for 
more information.|Yes if `topic` isn't set.||
 |`consumerProperties`|String, Object|A map of properties to pass to the Kafka 
consumer. See [Consumer properties](#consumer-properties) for details.|Yes. At 
the minimum, you must set the `bootstrap.servers` property to establish the 
initial connection to the Kafka cluster.||
 |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll 
records, in milliseconds.|No|100|
@@ -134,6 +134,14 @@ If you enable multi-topic ingestion for a datasource, 
downgrading to a version o
 28.0.0 will cause the ingestion for that datasource to fail.
 :::
 
+:::info
+Migrating an existing supervisor to use `topicPattern` instead of `topic` is 
not supported. It is also not supported to change the `topicPattern` of an 
existing supervisor to a different regex pattern.
+You can force the migration by doing the following:
+1. Suspend the supervisor.
+2. Reset the offsets.
+3. Submit updated supervisor.
+:::
+
 You can ingest data from one or multiple topics.
 When ingesting data from multiple topics, Druid assigns partitions based on 
the hashcode of the topic name and the ID of the partition within that topic. 
The partition assignment might not be uniform across all the tasks. Druid 
assumes that partitions across individual topics have similar load. If you want 
to ingest from both high and low load topics in the same supervisor, it is 
recommended that you have a higher number of partitions for a high load topic 
and a lower number of partition [...]
 
@@ -456,4 +464,4 @@ See the following topics for more information:
 * [Supervisor API](../api-reference/supervisor-api.md) for how to manage and 
monitor supervisors using the API.
 * [Supervisor](../ingestion/supervisor.md) for supervisor status and capacity 
planning.
 * [Loading from Apache Kafka](../tutorials/tutorial-kafka.md) for a tutorial 
on streaming data from Apache Kafka.
-* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the 
`kafka` input format.
\ No newline at end of file
+* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the 
`kafka` input format.
diff --git a/extensions-core/kafka-indexing-service/pom.xml 
b/extensions-core/kafka-indexing-service/pom.xml
index 761d7be1cca..0dc45401a98 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -113,6 +113,11 @@
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 5d84eeed1f8..13b3972f2ec 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -24,14 +24,18 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@@ -176,6 +180,35 @@ public class KafkaSupervisorSpec extends 
SeekableStreamSupervisorSpec
     );
   }
 
+  /**
+   * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to 
ensure that the proposed spec and current spec are either both multi-topic or 
both single-topic.
+   * <p>
+   * getSource() returns the same string (exampleTopic) for 
"topicPattern=exampleTopic" and "topic=exampleTopic".
+   * This override prevents this case from being considered a valid update.
+   * </p>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec is not a Kafka spec or if the 
proposed spec changes from multi-topic to single-topic or vice versa
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
+      throw InvalidInput.exception(
+          "Cannot change spec from type[%s] to type[%s]", 
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
+      );
+    }
+    KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec;
+    if (this.getSpec().getIOConfig().isMultiTopic() != 
other.getSpec().getIOConfig().isMultiTopic()) {
+      throw InvalidInput.exception(
+          
SeekableStreamSupervisorSpec.ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE,
+          StringUtils.format("(%s) %s", 
this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", 
this.getSource()),
+          StringUtils.format("(%s) %s", 
other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", 
other.getSource())
+      );
+    }
+
+    super.validateSpecUpdateTo(proposedSpec);
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index f8feeae9be7..03c5820be0b 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -22,6 +22,10 @@ package org.apache.druid.indexing.kafka.supervisor;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -29,16 +33,26 @@ import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.metadata.TestSupervisorSpec;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertThrows;
 
 public class KafkaSupervisorSpecTest
 {
@@ -578,4 +592,175 @@ public class KafkaSupervisorSpecTest
 
     Assert.assertFalse(runningSpec.isSuspended());
   }
+
+  @Test
+  public void test_validateSpecUpdateTo()
+  {
+    KafkaSupervisorSpec sourceSpec = getSpec("metrics", null);
+
+    // Proposed spec being non-kafka is not allowed
+    TestSupervisorSpec otherSpec = new TestSupervisorSpec("test", new 
Object());
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
sourceSpec.validateSpecUpdateTo(otherSpec)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            StringUtils.format("Cannot change spec from type[%s] to type[%s]", 
sourceSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName())
+        )
+    );
+
+    KafkaSupervisorSpec multiTopicProposedSpec = getSpec(null, "metrics-.*");
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
sourceSpec.validateSpecUpdateTo(multiTopicProposedSpec)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+             "Update of the input source stream from [(single-topic) metrics] 
to [(multi-topic) metrics-.*] is not supported for a running supervisor."
+             + "\nTo perform the update safely, follow these steps:"
+             + "\n(1) Suspend this supervisor, reset its offsets and then 
terminate it. "
+             + "\n(2) Create a new supervisor with the new input source 
stream."
+             + "\nNote that doing the reset can cause data duplication or loss 
if any topic used in the old supervisor is included in the new one too."
+         )
+    );
+
+    KafkaSupervisorSpec singleTopicNewStreamProposedSpec = 
getSpec("metricsNew", null);
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
sourceSpec.validateSpecUpdateTo(singleTopicNewStreamProposedSpec)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            "Update of the input source stream from [metrics] to [metricsNew] 
is not supported for a running supervisor."
+            + "\nTo perform the update safely, follow these steps:"
+            + "\n(1) Suspend this supervisor, reset its offsets and then 
terminate it. "
+            + "\n(2) Create a new supervisor with the new input source stream."
+            + "\nNote that doing the reset can cause data duplication or loss 
if any topic used in the old supervisor is included in the new one too."
+        )
+    );
+
+    KafkaSupervisorSpec multiTopicMatchingSourceString = getSpec(null, 
"metrics");
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
sourceSpec.validateSpecUpdateTo(multiTopicMatchingSourceString)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            "Update of the input source stream from [(single-topic) metrics] 
to [(multi-topic) metrics] is not supported for a running supervisor."
+            + "\nTo perform the update safely, follow these steps:"
+            + "\n(1) Suspend this supervisor, reset its offsets and then 
terminate it. "
+            + "\n(2) Create a new supervisor with the new input source stream."
+            + "\nNote that doing the reset can cause data duplication or loss 
if any topic used in the old supervisor is included in the new one too."
+        )
+    );
+
+    // test the inverse as well
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
multiTopicMatchingSourceString.validateSpecUpdateTo(sourceSpec)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            "Update of the input source stream from [(multi-topic) metrics] to 
[(single-topic) metrics] is not supported for a running supervisor."
+            + "\nTo perform the update safely, follow these steps:"
+            + "\n(1) Suspend this supervisor, reset its offsets and then 
terminate it. "
+            + "\n(2) Create a new supervisor with the new input source stream."
+            + "\nNote that doing the reset can cause data duplication or loss 
if any topic used in the old supervisor is included in the new one too."
+        )
+    );
+
+    // Test valid spec update. This spec changes context vs the sourceSpec
+    KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpec(
+        null,
+        DataSchema.builder().withDataSource("testDs").withAggregators(new 
CountAggregatorFactory("rows")).withGranularity(new 
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
+        null,
+        new KafkaSupervisorIOConfig(
+            "metrics",
+            null,
+            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
+            null,
+            null,
+            null,
+            Map.of("bootstrap.servers", "localhost:9092"),
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        Map.of(
+            "key1",
+            "value1",
+            "key2",
+            "value2"
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    sourceSpec.validateSpecUpdateTo(validDestSpec);
+  }
+
+  private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
+  {
+    return new KafkaSupervisorSpec(
+      null,
+      DataSchema.builder().withDataSource("testDs").withAggregators(new 
CountAggregatorFactory("rows")).withGranularity(new 
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
+      null,
+      new KafkaSupervisorIOConfig(
+          topic,
+          topicPattern,
+          new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
+          null,
+          null,
+          null,
+          Map.of("bootstrap.servers", "localhost:9092"),
+          null,
+          null,
+          null,
+          null,
+          true,
+          null,
+          null,
+          null,
+          null,
+          null,
+          null,
+          null,
+          false
+      ),
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      null
+  );
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 85b6d392550..0ada4679a63 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -197,8 +197,15 @@ public class SupervisorManager
       try {
         byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
         Pair<Supervisor, SupervisorSpec> currentSupervisor = 
supervisors.get(spec.getId());
-        return currentSupervisor == null
-               || !Arrays.equals(specAsBytes, 
jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
+        if (currentSupervisor == null || currentSupervisor.rhs == null) {
+          return true;
+        } else if (Arrays.equals(specAsBytes, 
jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
+          return false;
+        } else {
+          // The spec bytes are different, so we need to check if the update 
is allowed
+          currentSupervisor.rhs.validateSpecUpdateTo(spec);
+          return true;
+        }
       }
       catch (JsonProcessingException ex) {
         log.warn("Failed to write spec as bytes for spec_id[%s]", 
spec.getId());
@@ -529,4 +536,13 @@ public class SupervisorManager
                           );
     }
   }
+
+  @Nullable
+  private SupervisorSpec getSpec(String id)
+  {
+    synchronized (lock) {
+      Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
+      return supervisor == null ? null : supervisor.rhs;
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 7b5f46195e7..ba0012b29c9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
@@ -45,6 +47,11 @@ import java.util.Map;
 
 public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
 {
+  protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = 
"Update of the input source stream from [%s] to [%s] is not supported for a 
running supervisor."
+                                                                   + "%nTo 
perform the update safely, follow these steps:"
+                                                                   + "%n(1) 
Suspend this supervisor, reset its offsets and then terminate it. "
+                                                                   + "%n(2) 
Create a new supervisor with the new input source stream."
+                                                                   + "%nNote 
that doing the reset can cause data duplication or loss if any topic used in 
the old supervisor is included in the new one too.";
 
   private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
       SeekableStreamSupervisorIngestionSpec ingestionSchema
@@ -202,6 +209,35 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
     return suspended;
   }
 
+  /**
+   * Default implementation that prevents unsupported evolution of the 
supervisor spec
+   * <ul>
+   * <li>You cannot migrate between types of supervisors.</li>
+   * <li>You cannot change the input source stream of a running 
supervisor.</li>
+   * </ul>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec update is not allowed
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+      throw InvalidInput.exception(
+          "Cannot update supervisor spec from type[%s] to type[%s]", 
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
+      );
+    }
+    SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) 
proposedSpec;
+    if (this.getSource() == null || other.getSource() == null) {
+      // Not likely to happen, but covering just in case.
+      throw InvalidInput.exception("Cannot update supervisor spec since one or 
both of "
+                                   + "the specs have not provided an input 
source stream in the 'ioConfig'.");
+    }
+
+    if (!this.getSource().equals(other.getSource())) {
+      throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, 
this.getSource(), other.getSource());
+    }
+  }
+
   protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean 
suspend);
 
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
index a25c6aec621..2912e1da02e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
@@ -169,6 +169,7 @@ public class OverlordSecurityResourceFilterTest extends 
ResourceFilterTestHelper
         {
           return null;
         }
+
       };
       
EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString()))
               .andReturn(Optional.of(supervisorSpec))
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index e7276c6aead..d9afd136654 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -145,6 +146,42 @@ public class SupervisorManagerTest extends EasyMockSupport
     Assert.assertTrue(manager.getSupervisorIds().isEmpty());
   }
 
+  @Test
+  public void testCreateOrUpdateAndStartSupervisorIllegalEvolution()
+  {
+    SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1)
+    {
+      @Override
+      public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+      {
+        throw InvalidInput.exception("Illegal spec update proposed");
+      }
+    };
+    SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
+
+    Assert.assertTrue(manager.getSupervisorIds().isEmpty());
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+    metadataSupervisorManager.insert("id1", spec);
+    supervisor1.start();
+    replayAll();
+
+    manager.start();
+    Assert.assertEquals(0, manager.getSupervisorIds().size());
+
+    manager.createOrUpdateAndStartSupervisor(spec);
+    Assert.assertEquals(1, manager.getSupervisorIds().size());
+    Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get());
+    verifyAll();
+
+    resetAll();
+    exception.expect(DruidException.class);
+    replayAll();
+
+    manager.createOrUpdateAndStartSupervisor(spec2);
+    verifyAll();
+  }
+
   @Test
   public void testCreateOrUpdateAndStartSupervisorNotStarted()
   {
@@ -195,6 +232,30 @@ public class SupervisorManagerTest extends EasyMockSupport
     Assert.assertTrue(manager.shouldUpdateSupervisor(new 
NoopSupervisorSpec("id1", null)));
   }
 
+  @Test
+  public void testShouldUpdateSupervisorIllegalEvolution()
+  {
+    SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1)
+    {
+      @Override
+      public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+      {
+        throw InvalidInput.exception("Illegal spec update proposed");
+      }
+    };
+    SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id1", spec
+    );
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    supervisor1.start();
+    exception.expect(DruidException.class);
+    replayAll();
+    manager.start();
+    manager.shouldUpdateSupervisor(spec2);
+    verifyAll();
+  }
+
   @Test
   public void testStopAndRemoveSupervisorNotStarted()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 2d0dc315281..e8773001407 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -28,6 +28,8 @@ import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -53,11 +55,13 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAu
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.TestSupervisorSpec;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -65,6 +69,7 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -84,6 +89,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.junit.Assert.assertThrows;
+
 public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
 {
   private SeekableStreamSupervisorIngestionSpec ingestionSchema;
@@ -1305,6 +1312,161 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     Assert.assertEquals("value", spec.getContextValue("key"));
   }
 
+  @Test
+  public void test_validateSpecUpdateTo_ShortCircuits()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec originalSpec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        Map.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    TestSeekableStreamSupervisorSpec proposedSpec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        Map.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
originalSpec.validateSpecUpdateTo(proposedSpec)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            "Cannot update supervisor spec since one or both of the specs have 
not provided an input source stream in the 'ioConfig'."
+        )
+    );
+
+
+    TestSupervisorSpec otherSpec = new TestSupervisorSpec("fake", new 
Object());
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
originalSpec.validateSpecUpdateTo(otherSpec)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            StringUtils.format("Cannot update supervisor spec from type[%s] to 
type[%s]", proposedSpec.getClass().getSimpleName(), 
otherSpec.getClass().getSimpleName())
+        )
+    );
+  }
+
+  @Test
+  public void test_validateSpecUpdateTo_SourceStringComparisons()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec originalSpec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    )
+    {
+      @Override
+      public String getSource()
+      {
+        return "source1";
+      }
+    };
+    TestSeekableStreamSupervisorSpec proposedSpecDiffSource = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    )
+    {
+      @Override
+      public String getSource()
+      {
+        return "source2";
+      }
+    };
+    TestSeekableStreamSupervisorSpec proposedSpecSameSource = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    )
+    {
+      @Override
+      public String getSource()
+      {
+        return "source1";
+      }
+    };
+
+    // Mistmatched stream strings test
+    MatcherAssert.assertThat(
+        assertThrows(DruidException.class, () -> 
originalSpec.validateSpecUpdateTo(proposedSpecDiffSource)),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.INVALID_INPUT,
+            "invalidInput"
+        ).expectMessageIs(
+            "Update of the input source stream from [source1] to [source2] is 
not supported for a running supervisor."
+            + "\nTo perform the update safely, follow these steps:"
+            + "\n(1) Suspend this supervisor, reset its offsets and then 
terminate it. "
+            + "\n(2) Create a new supervisor with the new input source stream."
+            + "\nNote that doing the reset can cause data duplication or loss 
if any topic used in the old supervisor is included in the new one too."
+        )
+    );
+
+    // Happy path test
+    originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
+  }
+
   private void mockIngestionSchema()
   {
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 70e5fbd534e..a1a4aaaae62 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.supervisor;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.error.DruidException;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
@@ -100,4 +101,19 @@ public interface SupervisorSpec
    * @return source like stream or topic name
    */
   String getSource();
+
+  /**
+   * Checks if a spec can be replaced with a proposed spec (proposesSpec).
+   * <p>
+   * By default, this method does no validation checks. Implementations of 
this method can choose to define rules
+   * for spec updates and throw an exception if the update is not allowed.
+   * </p>
+   *
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the spec update is not allowed
+   */
+  default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    // The default implementation does not do any validation checks.
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to