This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1e87ec7aa91 Restrict input stream modification in existing seekable
stream supervisors (#17955)
1e87ec7aa91 is described below
commit 1e87ec7aa9151b3094ad432fbea80ce07e6f2260
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri May 9 02:14:26 2025 -0500
Restrict input stream modification in existing seekable stream supervisors
(#17955)
* Restrict kafka topic evolution in existing supervisors
* Refactor stream validation code based on review
* Provide default impl of validateSpecUpdateTo with an enhanced override
for Kafka
Since all existing implementations of the SeekableStreamSupervisor do not
support stream modification for existing supervisors, it makes sense to
centralize a default implementation of spec update validation. Kafka requries
some override because the idea of single and multi topic ingest within kafka is
unique to kafka
* Improve docs
* Improve test coverage
* Test improvments and general cleanup following review round
---
docs/ingestion/kafka-ingestion.md | 12 +-
extensions-core/kafka-indexing-service/pom.xml | 5 +
.../kafka/supervisor/KafkaSupervisorSpec.java | 33 ++++
.../kafka/supervisor/KafkaSupervisorSpecTest.java | 185 +++++++++++++++++++++
.../overlord/supervisor/SupervisorManager.java | 20 ++-
.../supervisor/SeekableStreamSupervisorSpec.java | 36 ++++
.../OverlordSecurityResourceFilterTest.java | 1 +
.../overlord/supervisor/SupervisorManagerTest.java | 61 +++++++
.../SeekableStreamSupervisorSpecTest.java | 162 ++++++++++++++++++
.../overlord/supervisor/SupervisorSpec.java | 16 ++
10 files changed, 527 insertions(+), 4 deletions(-)
diff --git a/docs/ingestion/kafka-ingestion.md
b/docs/ingestion/kafka-ingestion.md
index b7eccbf0151..67b1a96a3d7 100644
--- a/docs/ingestion/kafka-ingestion.md
+++ b/docs/ingestion/kafka-ingestion.md
@@ -120,7 +120,7 @@ For configuration properties shared across all streaming
ingestion methods, refe
|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
-|`topic`|String|The Kafka topic to read from. To ingest data from multiple
topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
+|`topic`|String|The Kafka topic to read from. Note that once this value is
established for a supervisor, updating it is not supported. To ingest data from
multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
|`topicPattern`|String|Multiple Kafka topics to read from, passed as a regex
pattern. See [Ingest from multiple topics](#ingest-from-multiple-topics) for
more information.|Yes if `topic` isn't set.||
|`consumerProperties`|String, Object|A map of properties to pass to the Kafka
consumer. See [Consumer properties](#consumer-properties) for details.|Yes. At
the minimum, you must set the `bootstrap.servers` property to establish the
initial connection to the Kafka cluster.||
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll
records, in milliseconds.|No|100|
@@ -134,6 +134,14 @@ If you enable multi-topic ingestion for a datasource,
downgrading to a version o
28.0.0 will cause the ingestion for that datasource to fail.
:::
+:::info
+Migrating an existing supervisor to use `topicPattern` instead of `topic` is
not supported. It is also not supported to change the `topicPattern` of an
existing supervisor to a different regex pattern.
+You can force the migration by doing the following:
+1. Suspend the supervisor.
+2. Reset the offsets.
+3. Submit updated supervisor.
+:::
+
You can ingest data from one or multiple topics.
When ingesting data from multiple topics, Druid assigns partitions based on
the hashcode of the topic name and the ID of the partition within that topic.
The partition assignment might not be uniform across all the tasks. Druid
assumes that partitions across individual topics have similar load. If you want
to ingest from both high and low load topics in the same supervisor, it is
recommended that you have a higher number of partitions for a high load topic
and a lower number of partition [...]
@@ -456,4 +464,4 @@ See the following topics for more information:
* [Supervisor API](../api-reference/supervisor-api.md) for how to manage and
monitor supervisors using the API.
* [Supervisor](../ingestion/supervisor.md) for supervisor status and capacity
planning.
* [Loading from Apache Kafka](../tutorials/tutorial-kafka.md) for a tutorial
on streaming data from Apache Kafka.
-* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the
`kafka` input format.
\ No newline at end of file
+* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the
`kafka` input format.
diff --git a/extensions-core/kafka-indexing-service/pom.xml
b/extensions-core/kafka-indexing-service/pom.xml
index 761d7be1cca..0dc45401a98 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -113,6 +113,11 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 5d84eeed1f8..13b3972f2ec 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -24,14 +24,18 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@@ -176,6 +180,35 @@ public class KafkaSupervisorSpec extends
SeekableStreamSupervisorSpec
);
}
+ /**
+ * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to
ensure that the proposed spec and current spec are either both multi-topic or
both single-topic.
+ * <p>
+ * getSource() returns the same string (exampleTopic) for
"topicPattern=exampleTopic" and "topic=exampleTopic".
+ * This override prevents this case from being considered a valid update.
+ * </p>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec is not a Kafka spec or if the
proposed spec changes from multi-topic to single-topic or vice versa
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
+ throw InvalidInput.exception(
+ "Cannot change spec from type[%s] to type[%s]",
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
+ );
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec;
+ if (this.getSpec().getIOConfig().isMultiTopic() !=
other.getSpec().getIOConfig().isMultiTopic()) {
+ throw InvalidInput.exception(
+
SeekableStreamSupervisorSpec.ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE,
+ StringUtils.format("(%s) %s",
this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic",
this.getSource()),
+ StringUtils.format("(%s) %s",
other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic",
other.getSource())
+ );
+ }
+
+ super.validateSpecUpdateTo(proposedSpec);
+ }
+
@Override
public String toString()
{
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index f8feeae9be7..03c5820be0b 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -22,6 +22,10 @@ package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -29,16 +33,26 @@ import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.metadata.TestSupervisorSpec;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertThrows;
public class KafkaSupervisorSpecTest
{
@@ -578,4 +592,175 @@ public class KafkaSupervisorSpecTest
Assert.assertFalse(runningSpec.isSuspended());
}
+
+ @Test
+ public void test_validateSpecUpdateTo()
+ {
+ KafkaSupervisorSpec sourceSpec = getSpec("metrics", null);
+
+ // Proposed spec being non-kafka is not allowed
+ TestSupervisorSpec otherSpec = new TestSupervisorSpec("test", new
Object());
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
sourceSpec.validateSpecUpdateTo(otherSpec)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ StringUtils.format("Cannot change spec from type[%s] to type[%s]",
sourceSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName())
+ )
+ );
+
+ KafkaSupervisorSpec multiTopicProposedSpec = getSpec(null, "metrics-.*");
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
sourceSpec.validateSpecUpdateTo(multiTopicProposedSpec)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "Update of the input source stream from [(single-topic) metrics]
to [(multi-topic) metrics-.*] is not supported for a running supervisor."
+ + "\nTo perform the update safely, follow these steps:"
+ + "\n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ + "\n(2) Create a new supervisor with the new input source
stream."
+ + "\nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too."
+ )
+ );
+
+ KafkaSupervisorSpec singleTopicNewStreamProposedSpec =
getSpec("metricsNew", null);
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
sourceSpec.validateSpecUpdateTo(singleTopicNewStreamProposedSpec)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "Update of the input source stream from [metrics] to [metricsNew]
is not supported for a running supervisor."
+ + "\nTo perform the update safely, follow these steps:"
+ + "\n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ + "\n(2) Create a new supervisor with the new input source stream."
+ + "\nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too."
+ )
+ );
+
+ KafkaSupervisorSpec multiTopicMatchingSourceString = getSpec(null,
"metrics");
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
sourceSpec.validateSpecUpdateTo(multiTopicMatchingSourceString)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "Update of the input source stream from [(single-topic) metrics]
to [(multi-topic) metrics] is not supported for a running supervisor."
+ + "\nTo perform the update safely, follow these steps:"
+ + "\n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ + "\n(2) Create a new supervisor with the new input source stream."
+ + "\nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too."
+ )
+ );
+
+ // test the inverse as well
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
multiTopicMatchingSourceString.validateSpecUpdateTo(sourceSpec)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "Update of the input source stream from [(multi-topic) metrics] to
[(single-topic) metrics] is not supported for a running supervisor."
+ + "\nTo perform the update safely, follow these steps:"
+ + "\n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ + "\n(2) Create a new supervisor with the new input source stream."
+ + "\nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too."
+ )
+ );
+
+ // Test valid spec update. This spec changes context vs the sourceSpec
+ KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpec(
+ null,
+ DataSchema.builder().withDataSource("testDs").withAggregators(new
CountAggregatorFactory("rows")).withGranularity(new
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
+ null,
+ new KafkaSupervisorIOConfig(
+ "metrics",
+ null,
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
+ null,
+ null,
+ null,
+ Map.of("bootstrap.servers", "localhost:9092"),
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ ),
+ Map.of(
+ "key1",
+ "value1",
+ "key2",
+ "value2"
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ sourceSpec.validateSpecUpdateTo(validDestSpec);
+ }
+
+ private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
+ {
+ return new KafkaSupervisorSpec(
+ null,
+ DataSchema.builder().withDataSource("testDs").withAggregators(new
CountAggregatorFactory("rows")).withGranularity(new
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
+ null,
+ new KafkaSupervisorIOConfig(
+ topic,
+ topicPattern,
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
+ null,
+ null,
+ null,
+ Map.of("bootstrap.servers", "localhost:9092"),
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 85b6d392550..0ada4679a63 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -197,8 +197,15 @@ public class SupervisorManager
try {
byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
Pair<Supervisor, SupervisorSpec> currentSupervisor =
supervisors.get(spec.getId());
- return currentSupervisor == null
- || !Arrays.equals(specAsBytes,
jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
+ if (currentSupervisor == null || currentSupervisor.rhs == null) {
+ return true;
+ } else if (Arrays.equals(specAsBytes,
jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
+ return false;
+ } else {
+ // The spec bytes are different, so we need to check if the update
is allowed
+ currentSupervisor.rhs.validateSpecUpdateTo(spec);
+ return true;
+ }
}
catch (JsonProcessingException ex) {
log.warn("Failed to write spec as bytes for spec_id[%s]",
spec.getId());
@@ -529,4 +536,13 @@ public class SupervisorManager
);
}
}
+
+ @Nullable
+ private SupervisorSpec getSpec(String id)
+ {
+ synchronized (lock) {
+ Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
+ return supervisor == null ? null : supervisor.rhs;
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 7b5f46195e7..ba0012b29c9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -45,6 +47,11 @@ import java.util.Map;
public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
{
+ protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE =
"Update of the input source stream from [%s] to [%s] is not supported for a
running supervisor."
+ + "%nTo
perform the update safely, follow these steps:"
+ + "%n(1)
Suspend this supervisor, reset its offsets and then terminate it. "
+ + "%n(2)
Create a new supervisor with the new input source stream."
+ + "%nNote
that doing the reset can cause data duplication or loss if any topic used in
the old supervisor is included in the new one too.";
private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
SeekableStreamSupervisorIngestionSpec ingestionSchema
@@ -202,6 +209,35 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
return suspended;
}
+ /**
+ * Default implementation that prevents unsupported evolution of the
supervisor spec
+ * <ul>
+ * <li>You cannot migrate between types of supervisors.</li>
+ * <li>You cannot change the input source stream of a running
supervisor.</li>
+ * </ul>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec update is not allowed
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+ throw InvalidInput.exception(
+ "Cannot update supervisor spec from type[%s] to type[%s]",
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
+ );
+ }
+ SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec)
proposedSpec;
+ if (this.getSource() == null || other.getSource() == null) {
+ // Not likely to happen, but covering just in case.
+ throw InvalidInput.exception("Cannot update supervisor spec since one or
both of "
+ + "the specs have not provided an input
source stream in the 'ioConfig'.");
+ }
+
+ if (!this.getSource().equals(other.getSource())) {
+ throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE,
this.getSource(), other.getSource());
+ }
+ }
+
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean
suspend);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
index a25c6aec621..2912e1da02e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
@@ -169,6 +169,7 @@ public class OverlordSecurityResourceFilterTest extends
ResourceFilterTestHelper
{
return null;
}
+
};
EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString()))
.andReturn(Optional.of(supervisorSpec))
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index e7276c6aead..d9afd136654 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -145,6 +146,42 @@ public class SupervisorManagerTest extends EasyMockSupport
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
}
+ @Test
+ public void testCreateOrUpdateAndStartSupervisorIllegalEvolution()
+ {
+ SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1)
+ {
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ throw InvalidInput.exception("Illegal spec update proposed");
+ }
+ };
+ SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
+
+ Assert.assertTrue(manager.getSupervisorIds().isEmpty());
+
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+ metadataSupervisorManager.insert("id1", spec);
+ supervisor1.start();
+ replayAll();
+
+ manager.start();
+ Assert.assertEquals(0, manager.getSupervisorIds().size());
+
+ manager.createOrUpdateAndStartSupervisor(spec);
+ Assert.assertEquals(1, manager.getSupervisorIds().size());
+ Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get());
+ verifyAll();
+
+ resetAll();
+ exception.expect(DruidException.class);
+ replayAll();
+
+ manager.createOrUpdateAndStartSupervisor(spec2);
+ verifyAll();
+ }
+
@Test
public void testCreateOrUpdateAndStartSupervisorNotStarted()
{
@@ -195,6 +232,30 @@ public class SupervisorManagerTest extends EasyMockSupport
Assert.assertTrue(manager.shouldUpdateSupervisor(new
NoopSupervisorSpec("id1", null)));
}
+ @Test
+ public void testShouldUpdateSupervisorIllegalEvolution()
+ {
+ SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1)
+ {
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ throw InvalidInput.exception("Illegal spec update proposed");
+ }
+ };
+ SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
+ Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+ "id1", spec
+ );
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+ supervisor1.start();
+ exception.expect(DruidException.class);
+ replayAll();
+ manager.start();
+ manager.shouldUpdateSupervisor(spec2);
+ verifyAll();
+ }
+
@Test
public void testStopAndRemoveSupervisorNotStarted()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 2d0dc315281..e8773001407 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -28,6 +28,8 @@ import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -53,11 +55,13 @@ import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAu
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.TestSupervisorSpec;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@@ -65,6 +69,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
+import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -84,6 +89,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
+import static org.junit.Assert.assertThrows;
+
public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
{
private SeekableStreamSupervisorIngestionSpec ingestionSchema;
@@ -1305,6 +1312,161 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Assert.assertEquals("value", spec.getContextValue("key"));
}
+ @Test
+ public void test_validateSpecUpdateTo_ShortCircuits()
+ {
+ mockIngestionSchema();
+ TestSeekableStreamSupervisorSpec originalSpec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ Map.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+ TestSeekableStreamSupervisorSpec proposedSpec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ Map.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
originalSpec.validateSpecUpdateTo(proposedSpec)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "Cannot update supervisor spec since one or both of the specs have
not provided an input source stream in the 'ioConfig'."
+ )
+ );
+
+
+ TestSupervisorSpec otherSpec = new TestSupervisorSpec("fake", new
Object());
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
originalSpec.validateSpecUpdateTo(otherSpec)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ StringUtils.format("Cannot update supervisor spec from type[%s] to
type[%s]", proposedSpec.getClass().getSimpleName(),
otherSpec.getClass().getSimpleName())
+ )
+ );
+ }
+
+ @Test
+ public void test_validateSpecUpdateTo_SourceStringComparisons()
+ {
+ mockIngestionSchema();
+ TestSeekableStreamSupervisorSpec originalSpec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ ImmutableMap.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ )
+ {
+ @Override
+ public String getSource()
+ {
+ return "source1";
+ }
+ };
+ TestSeekableStreamSupervisorSpec proposedSpecDiffSource = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ ImmutableMap.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ )
+ {
+ @Override
+ public String getSource()
+ {
+ return "source2";
+ }
+ };
+ TestSeekableStreamSupervisorSpec proposedSpecSameSource = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ ImmutableMap.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ )
+ {
+ @Override
+ public String getSource()
+ {
+ return "source1";
+ }
+ };
+
+ // Mistmatched stream strings test
+ MatcherAssert.assertThat(
+ assertThrows(DruidException.class, () ->
originalSpec.validateSpecUpdateTo(proposedSpecDiffSource)),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "Update of the input source stream from [source1] to [source2] is
not supported for a running supervisor."
+ + "\nTo perform the update safely, follow these steps:"
+ + "\n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ + "\n(2) Create a new supervisor with the new input source stream."
+ + "\nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too."
+ )
+ );
+
+ // Happy path test
+ originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
+ }
+
private void mockIngestionSchema()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 70e5fbd534e..a1a4aaaae62 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.error.DruidException;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
@@ -100,4 +101,19 @@ public interface SupervisorSpec
* @return source like stream or topic name
*/
String getSource();
+
+ /**
+ * Checks if a spec can be replaced with a proposed spec (proposesSpec).
+ * <p>
+ * By default, this method does no validation checks. Implementations of
this method can choose to define rules
+ * for spec updates and throw an exception if the update is not allowed.
+ * </p>
+ *
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the spec update is not allowed
+ */
+ default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ // The default implementation does not do any validation checks.
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]