kfaraz commented on code in PR #17955:
URL: https://github.com/apache/druid/pull/17955#discussion_r2079542083
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1412,4 +1545,38 @@ private static Map<String, Object> getScaleInProperties()
return autoScalerConfig;
}
+ private static class OtherSupervisorSpec implements SupervisorSpec
Review Comment:
There is already a `TestSupervisorSpec` class that you may be able to use.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -100,4 +101,19 @@ default Set<ResourceAction> getInputSourceResources()
throws UnsupportedOperatio
* @return source like stream or topic name
*/
String getSource();
+
+ /**
+ * Checks if a spec can be replaced with a proposed spec (proposesSpec).
+ * <p>
+ * By default, this method does no validation checks. Implementations of
this method can choose to define rules
Review Comment:
```suggestion
* By default, this method does no validation checks. Implementations of
this method can choose to define rules
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -202,6 +205,55 @@ public boolean isSuspended()
return suspended;
}
+ /**
+ * Default implementation that prevents unsupported evolution of the
supervisor spec
+ * <ul>
+ * <li>You cannot migrate between types of supervisors.</li>
+ * <li>You cannot change the input source stream of a running
supervisor.</li>
+ * </ul>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec update is not allowed
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+ throw InvalidInput.exception(
+ StringUtils.format("Cannot evolve to [%s] from [%s]",
getClass().getName(), proposedSpec.getClass().getName())
+ );
+ }
+ SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec)
proposedSpec;
+ if (this.getSource() == null || other.getSource() == null) {
+ // Not likely to happen, but covering just in case.
+ throw InvalidInput.exception("Cannot consider
SeekableStreamSupervisorSpec evolution when one or both of "
+ + "the specs have not provided an input
source stream in the IOConfig.");
+ }
+
+ if (!this.getSource().equals(other.getSource())) {
+ throw
InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage(this.getSource(),
other.getSource()));
+ }
+ }
+
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean
suspend);
+ /**
+ * Returns an error message for illegal update of the input source stream.
+ * <p>
+ * This is a reasonable default message, but subclasses may override it to
provide more domain specific terminology.
+ * </p>
+ * @param existingSource The existing input source stream
+ * @param proposedSource The proposed input source stream
+ * @return A formatted error message
+ */
+ protected String getIllegalInputSourceUpdateErrorMessage(String
existingSource, String proposedSource)
Review Comment:
This method shouldn't be a part of the contract of this interface.
I would suggest just making the string a constant and formatting it when
needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -202,6 +205,55 @@ public boolean isSuspended()
return suspended;
}
+ /**
+ * Default implementation that prevents unsupported evolution of the
supervisor spec
+ * <ul>
+ * <li>You cannot migrate between types of supervisors.</li>
+ * <li>You cannot change the input source stream of a running
supervisor.</li>
+ * </ul>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec update is not allowed
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+ throw InvalidInput.exception(
+ StringUtils.format("Cannot evolve to [%s] from [%s]",
getClass().getName(), proposedSpec.getClass().getName())
+ );
+ }
+ SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec)
proposedSpec;
+ if (this.getSource() == null || other.getSource() == null) {
+ // Not likely to happen, but covering just in case.
+ throw InvalidInput.exception("Cannot consider
SeekableStreamSupervisorSpec evolution when one or both of "
+ + "the specs have not provided an input
source stream in the IOConfig.");
Review Comment:
```suggestion
throw InvalidInput.exception("Cannot update supervisor spec since one
or both of "
+ "the specs have not provided an input
source stream in the 'ioConfig'.");
```
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java:
##########
@@ -578,4 +585,362 @@ public void testSuspendResume() throws IOException
Assert.assertFalse(runningSpec.isSuspended());
}
+
+ @Test
+ public void testValidateSpecUpdateTo() throws JsonProcessingException
+ {
+ String sourceSpecJson = "{\n"
+ + " \"type\": \"kafka\",\n"
+ + " \"dataSchema\": {\n"
+ + " \"dataSource\": \"metrics-kafka\",\n"
+ + " \"parser\": {\n"
+ + " \"type\": \"string\",\n"
+ + " \"parseSpec\": {\n"
+ + " \"format\": \"json\",\n"
+ + " \"timestampSpec\": {\n"
+ + " \"column\": \"timestamp\",\n"
+ + " \"format\": \"auto\"\n"
+ + " },\n"
+ + " \"dimensionsSpec\": {\n"
+ + " \"dimensions\": [],\n"
+ + " \"dimensionExclusions\": [\n"
+ + " \"timestamp\",\n"
+ + " \"value\"\n"
+ + " ]\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"metricsSpec\": [\n"
+ + " {\n"
+ + " \"name\": \"count\",\n"
+ + " \"type\": \"count\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value_sum\",\n"
+ + " \"fieldName\": \"value\",\n"
+ + " \"type\": \"doubleSum\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value_min\",\n"
+ + " \"fieldName\": \"value\",\n"
+ + " \"type\": \"doubleMin\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value_max\",\n"
+ + " \"fieldName\": \"value\",\n"
+ + " \"type\": \"doubleMax\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"granularitySpec\": {\n"
+ + " \"type\": \"uniform\",\n"
+ + " \"segmentGranularity\": \"HOUR\",\n"
+ + " \"queryGranularity\": \"NONE\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"ioConfig\": {\n"
+ + " \"topic\": \"metrics\",\n"
+ + " \"consumerProperties\": {\n"
+ + " \"bootstrap.servers\": \"localhost:9092\"\n"
+ + " },\n"
+ + " \"taskCount\": 1\n"
+ + " }\n"
+ + "}";
+ KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson,
KafkaSupervisorSpec.class);
+
+ // Proposed spec being non-kafka is not allowed
+ OtherSupervisorSpec otherSpec = new OtherSupervisorSpec();
+ assertThrows(
+ DruidException.class,
+ () -> sourceSpec.validateSpecUpdateTo(otherSpec)
+ );
+
+ // Change from topic to topicPattern is not allowed
+ String invalidDestSpecJson = "{\n"
+ + " \"type\": \"kafka\",\n"
+ + " \"dataSchema\": {\n"
+ + " \"dataSource\": \"metrics-kafka\",\n"
+ + " \"parser\": {\n"
+ + " \"type\": \"string\",\n"
+ + " \"parseSpec\": {\n"
+ + " \"format\": \"json\",\n"
+ + " \"timestampSpec\": {\n"
+ + " \"column\": \"timestamp\",\n"
+ + " \"format\": \"auto\"\n"
+ + " },\n"
+ + " \"dimensionsSpec\": {\n"
+ + " \"dimensions\": [],\n"
+ + " \"dimensionExclusions\": [\n"
+ + " \"timestamp\",\n"
+ + " \"value\"\n"
+ + " ]\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"metricsSpec\": [\n"
+ + " {\n"
+ + " \"name\": \"count\",\n"
+ + " \"type\": \"count\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value_sum\",\n"
+ + " \"fieldName\": \"value\",\n"
+ + " \"type\": \"doubleSum\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value_min\",\n"
+ + " \"fieldName\": \"value\",\n"
+ + " \"type\": \"doubleMin\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value_max\",\n"
+ + " \"fieldName\": \"value\",\n"
+ + " \"type\": \"doubleMax\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"granularitySpec\": {\n"
+ + " \"type\": \"uniform\",\n"
+ + " \"segmentGranularity\": \"HOUR\",\n"
+ + " \"queryGranularity\": \"NONE\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"ioConfig\": {\n"
+ + " \"topicPattern\": \"metrics-.*\",\n"
+ + " \"consumerProperties\": {\n"
+ + " \"bootstrap.servers\": \"localhost:9092\"\n"
+ + " },\n"
+ + " \"taskCount\": 1\n"
+ + " }\n"
+ + "}";
+ KafkaSupervisorSpec invalidDestSpec =
mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class);
+
+ assertThrows(
+ DruidException.class,
+ () -> sourceSpec.validateSpecUpdateTo(invalidDestSpec)
+ );
+
+ // Changing topic name is not allowed
+ String invalidDestSpecTwoJson = "{\n"
Review Comment:
Do we need to write the specs as json String or can we use objects and then
just validate them, same as the other tests?
If using the json form is essential to the test, you can consider using the
utility `TestUtils.singleQuoteToStandardJson()` to make the json easier to read.
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
Assert.assertEquals("value", spec.getContextValue("key"));
}
+ @Test
+ public void testValidateSpecUpdateToShortCircuits()
Review Comment:
Suggestion:
My latest personal preference on test naming convention is something like
this:
```
test_validateSpecUpdateTo_shortCircuits_whenSomethingHappens()
```
This is only slightly different from the convention you have already
followed here,
in that it uses underscores. The underscores help in clearly spelling out
which method
is being tested and under what conditions.
That said, you may stick to the current convention if you find it more
appealing since
there is no established Druid naming convention yet as long as the test
names make sense.
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
Assert.assertEquals("value", spec.getContextValue("key"));
}
+ @Test
+ public void testValidateSpecUpdateToShortCircuits()
+ {
+ mockIngestionSchema();
+ TestSeekableStreamSupervisorSpec originalSpec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ ImmutableMap.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+ TestSeekableStreamSupervisorSpec proposedSpec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ ImmutableMap.of("key", "value"),
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+ assertThrows(
+ DruidException.class,
+ () -> originalSpec.validateSpecUpdateTo(proposedSpec)
+ );
+
+ OtherSupervisorSpec otherSpec = new OtherSupervisorSpec();
+ assertThrows(
Review Comment:
Maybe use `DruidExceptionMatcher` to verify error message too.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -202,6 +205,55 @@ public boolean isSuspended()
return suspended;
}
+ /**
+ * Default implementation that prevents unsupported evolution of the
supervisor spec
+ * <ul>
+ * <li>You cannot migrate between types of supervisors.</li>
+ * <li>You cannot change the input source stream of a running
supervisor.</li>
+ * </ul>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec update is not allowed
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+ throw InvalidInput.exception(
+ StringUtils.format("Cannot evolve to [%s] from [%s]",
getClass().getName(), proposedSpec.getClass().getName())
Review Comment:
```suggestion
"Cannot update supervisor spec from type[%s] to type[%s]",
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
Assert.assertEquals("value", spec.getContextValue("key"));
}
+ @Test
+ public void testValidateSpecUpdateToShortCircuits()
+ {
+ mockIngestionSchema();
+ TestSeekableStreamSupervisorSpec originalSpec = new
TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ ImmutableMap.of("key", "value"),
Review Comment:
Nit: You may also use `Map.of(key, value)`
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +180,47 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ /**
+ * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to
ensure that the proposed spec and current spec are either both multi-topic or
both single-topic.
+ * <p>
+ * getSource() returns the same string (exampleTopic) for
"topicPattern=exampleTopic" and "topic=exampleTopic".
+ * This override prevents this case from being considered a valid update.
+ * </p>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec is not a Kafka spec or if the
proposed spec changes from multi-topic to single-topic or vice versa
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
+ throw InvalidInput.exception(
+ StringUtils.format("Cannot evolve to [%s] from [%s]",
getClass().getName(), proposedSpec.getClass().getName())
Review Comment:
```suggestion
"Cannot change spec from type[%s] to type[%s]",
getClass().getName(), proposedSpec.getClass().getName()
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -100,4 +101,19 @@ default Set<ResourceAction> getInputSourceResources()
throws UnsupportedOperatio
* @return source like stream or topic name
*/
String getSource();
+
+ /**
+ * Checks if a spec can be replaced with a proposed spec (proposesSpec).
+ * <p>
+ * By default, this method does no validation checks. Implementations of
this method can choose to define rules
+ * for spec updates and throw an exception if the update is not allowed.
+ * </p>
+ *
+ * @param proposedSpec the proposed supervisor spec
+ * @throws IllegalArgumentException if the spec update is not allowed
Review Comment:
I think the code throws `DruidException`s of type invalid input now.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -197,8 +197,17 @@ public boolean shouldUpdateSupervisor(SupervisorSpec spec)
try {
byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
Pair<Supervisor, SupervisorSpec> currentSupervisor =
supervisors.get(spec.getId());
- return currentSupervisor == null
- || !Arrays.equals(specAsBytes,
jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
+ if (currentSupervisor == null || currentSupervisor.rhs == null) {
+ return true;
+ } else {
+ if (!Arrays.equals(specAsBytes,
jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
+ // The spec bytes are different, so we need to check if the
replacement is allowed
+ currentSupervisor.rhs.validateSpecUpdateTo(spec);
+ return true;
+ } else {
+ return false;
+ }
+ }
Review Comment:
Cleaner with the nesting reduced and positive condition first:
```suggestion
} else if (Arrays.equals(specAsBytes,
jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
return false;
} else {
// The spec bytes are different, so we need to check if the update
is allowed
currentSupervisor.rhs.validateSpecUpdateTo(spec);
return true;
}
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +180,47 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ /**
+ * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to
ensure that the proposed spec and current spec are either both multi-topic or
both single-topic.
+ * <p>
+ * getSource() returns the same string (exampleTopic) for
"topicPattern=exampleTopic" and "topic=exampleTopic".
+ * This override prevents this case from being considered a valid update.
+ * </p>
+ * @param proposedSpec the proposed supervisor spec
+ * @throws DruidException if the proposed spec is not a Kafka spec or if the
proposed spec changes from multi-topic to single-topic or vice versa
+ */
+ @Override
+ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws
DruidException
+ {
+ if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
+ throw InvalidInput.exception(
+ StringUtils.format("Cannot evolve to [%s] from [%s]",
getClass().getName(), proposedSpec.getClass().getName())
+ );
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec;
+ if (this.getSpec().getIOConfig().isMultiTopic() !=
other.getSpec().getIOConfig().isMultiTopic()) {
+ throw
InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage("(%s) %s", "(%s)
%s"),
+ this.getSpec().getIOConfig().isMultiTopic()
? "multi-topic" : "single-topic",
+ this.getSource(),
+
other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic",
+ other.getSource());
+ }
+
+ super.validateSpecUpdateTo(proposedSpec);
+ }
+
+ @Override
+ protected String getIllegalInputSourceUpdateErrorMessage(String
existingSource, String proposedSource)
Review Comment:
We shouldn't need to override this method.
We can either use the constant defined in `SeekableStreamSupervisorSpec` as
is or a different String altogether.
A cleaner way to include the type name in that String would be to have a
placeholder for type and fill it with `getType()` while formatting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]