kfaraz commented on code in PR #17955:
URL: https://github.com/apache/druid/pull/17955#discussion_r2079542083


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1412,4 +1545,38 @@ private static Map<String, Object> getScaleInProperties()
     return autoScalerConfig;
   }
 
+  private static class OtherSupervisorSpec implements SupervisorSpec

Review Comment:
   There is already a `TestSupervisorSpec` class that you may be able to use.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -100,4 +101,19 @@ default Set<ResourceAction> getInputSourceResources() 
throws UnsupportedOperatio
    * @return source like stream or topic name
    */
   String getSource();
+
+  /**
+   * Checks if a spec can be replaced with a proposed spec (proposesSpec).
+   * <p>
+   * By default, this  method does no validation checks. Implementations of 
this method can choose to define rules

Review Comment:
   ```suggestion
      * By default, this method does no validation checks. Implementations of 
this method can choose to define rules
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -202,6 +205,55 @@ public boolean isSuspended()
     return suspended;
   }
 
+  /**
+   * Default implementation that prevents unsupported evolution of the 
supervisor spec
+   * <ul>
+   * <li>You cannot migrate between types of supervisors.</li>
+   * <li>You cannot change the input source stream of a running 
supervisor.</li>
+   * </ul>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec update is not allowed
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+      throw InvalidInput.exception(
+          StringUtils.format("Cannot evolve to [%s] from [%s]", 
getClass().getName(), proposedSpec.getClass().getName())
+      );
+    }
+    SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) 
proposedSpec;
+    if (this.getSource() == null || other.getSource() == null) {
+      // Not likely to happen, but covering just in case.
+      throw InvalidInput.exception("Cannot consider 
SeekableStreamSupervisorSpec evolution when one or both of "
+                                   + "the specs have not provided an input 
source stream in the IOConfig.");
+    }
+
+    if (!this.getSource().equals(other.getSource())) {
+      throw 
InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage(this.getSource(),
 other.getSource()));
+    }
+  }
+
   protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean 
suspend);
 
+  /**
+   * Returns an error message for illegal update of the input source stream.
+   * <p>
+   * This is a reasonable default message, but subclasses may override it to 
provide more domain specific terminology.
+   * </p>
+   * @param existingSource The existing input source stream
+   * @param proposedSource The proposed input source stream
+   * @return A formatted error message
+   */
+  protected String getIllegalInputSourceUpdateErrorMessage(String 
existingSource, String proposedSource)

Review Comment:
   This method shouldn't be a part of the contract of this interface.
   I would suggest just making the string a constant and formatting it when 
needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -202,6 +205,55 @@ public boolean isSuspended()
     return suspended;
   }
 
+  /**
+   * Default implementation that prevents unsupported evolution of the 
supervisor spec
+   * <ul>
+   * <li>You cannot migrate between types of supervisors.</li>
+   * <li>You cannot change the input source stream of a running 
supervisor.</li>
+   * </ul>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec update is not allowed
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+      throw InvalidInput.exception(
+          StringUtils.format("Cannot evolve to [%s] from [%s]", 
getClass().getName(), proposedSpec.getClass().getName())
+      );
+    }
+    SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) 
proposedSpec;
+    if (this.getSource() == null || other.getSource() == null) {
+      // Not likely to happen, but covering just in case.
+      throw InvalidInput.exception("Cannot consider 
SeekableStreamSupervisorSpec evolution when one or both of "
+                                   + "the specs have not provided an input 
source stream in the IOConfig.");

Review Comment:
   ```suggestion
         throw InvalidInput.exception("Cannot update supervisor spec since one 
or both of "
                                      + "the specs have not provided an input 
source stream in the 'ioConfig'.");
   ```



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java:
##########
@@ -578,4 +585,362 @@ public void testSuspendResume() throws IOException
 
     Assert.assertFalse(runningSpec.isSuspended());
   }
+
+  @Test
+  public void testValidateSpecUpdateTo() throws JsonProcessingException
+  {
+    String sourceSpecJson = "{\n"
+                  + "  \"type\": \"kafka\",\n"
+                  + "  \"dataSchema\": {\n"
+                  + "    \"dataSource\": \"metrics-kafka\",\n"
+                  + "    \"parser\": {\n"
+                  + "      \"type\": \"string\",\n"
+                  + "      \"parseSpec\": {\n"
+                  + "        \"format\": \"json\",\n"
+                  + "        \"timestampSpec\": {\n"
+                  + "          \"column\": \"timestamp\",\n"
+                  + "          \"format\": \"auto\"\n"
+                  + "        },\n"
+                  + "        \"dimensionsSpec\": {\n"
+                  + "          \"dimensions\": [],\n"
+                  + "          \"dimensionExclusions\": [\n"
+                  + "            \"timestamp\",\n"
+                  + "            \"value\"\n"
+                  + "          ]\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"metricsSpec\": [\n"
+                  + "      {\n"
+                  + "        \"name\": \"count\",\n"
+                  + "        \"type\": \"count\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_sum\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleSum\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_min\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMin\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_max\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMax\"\n"
+                  + "      }\n"
+                  + "    ],\n"
+                  + "    \"granularitySpec\": {\n"
+                  + "      \"type\": \"uniform\",\n"
+                  + "      \"segmentGranularity\": \"HOUR\",\n"
+                  + "      \"queryGranularity\": \"NONE\"\n"
+                  + "    }\n"
+                  + "  },\n"
+                  + "  \"ioConfig\": {\n"
+                  + "    \"topic\": \"metrics\",\n"
+                  + "    \"consumerProperties\": {\n"
+                  + "      \"bootstrap.servers\": \"localhost:9092\"\n"
+                  + "    },\n"
+                  + "    \"taskCount\": 1\n"
+                  + "  }\n"
+                  + "}";
+    KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson, 
KafkaSupervisorSpec.class);
+
+    // Proposed spec being non-kafka is not allowed
+    OtherSupervisorSpec otherSpec = new OtherSupervisorSpec();
+    assertThrows(
+        DruidException.class,
+        () -> sourceSpec.validateSpecUpdateTo(otherSpec)
+    );
+
+    // Change from topic to topicPattern is not allowed
+    String invalidDestSpecJson = "{\n"
+                  + "  \"type\": \"kafka\",\n"
+                  + "  \"dataSchema\": {\n"
+                  + "    \"dataSource\": \"metrics-kafka\",\n"
+                  + "    \"parser\": {\n"
+                  + "      \"type\": \"string\",\n"
+                  + "      \"parseSpec\": {\n"
+                  + "        \"format\": \"json\",\n"
+                  + "        \"timestampSpec\": {\n"
+                  + "          \"column\": \"timestamp\",\n"
+                  + "          \"format\": \"auto\"\n"
+                  + "        },\n"
+                  + "        \"dimensionsSpec\": {\n"
+                  + "          \"dimensions\": [],\n"
+                  + "          \"dimensionExclusions\": [\n"
+                  + "            \"timestamp\",\n"
+                  + "            \"value\"\n"
+                  + "          ]\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"metricsSpec\": [\n"
+                  + "      {\n"
+                  + "        \"name\": \"count\",\n"
+                  + "        \"type\": \"count\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_sum\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleSum\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_min\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMin\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_max\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMax\"\n"
+                  + "      }\n"
+                  + "    ],\n"
+                  + "    \"granularitySpec\": {\n"
+                  + "      \"type\": \"uniform\",\n"
+                  + "      \"segmentGranularity\": \"HOUR\",\n"
+                  + "      \"queryGranularity\": \"NONE\"\n"
+                  + "    }\n"
+                  + "  },\n"
+                  + "  \"ioConfig\": {\n"
+                  + "    \"topicPattern\": \"metrics-.*\",\n"
+                  + "    \"consumerProperties\": {\n"
+                  + "      \"bootstrap.servers\": \"localhost:9092\"\n"
+                  + "    },\n"
+                  + "    \"taskCount\": 1\n"
+                  + "  }\n"
+                  + "}";
+    KafkaSupervisorSpec invalidDestSpec = 
mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class);
+
+    assertThrows(
+        DruidException.class,
+        () -> sourceSpec.validateSpecUpdateTo(invalidDestSpec)
+    );
+
+    // Changing topic name is not allowed
+    String invalidDestSpecTwoJson = "{\n"

Review Comment:
   Do we need to write the specs as json String or can we use objects and then 
just validate them, same as the other tests?
   
   If using the json form is essential to the test, you can consider using the 
utility `TestUtils.singleQuoteToStandardJson()` to make the json easier to read.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
     Assert.assertEquals("value", spec.getContextValue("key"));
   }
 
+  @Test
+  public void testValidateSpecUpdateToShortCircuits()

Review Comment:
   Suggestion:
   
   My latest personal preference on test naming convention is something like 
this:
   ```
   test_validateSpecUpdateTo_shortCircuits_whenSomethingHappens()
   ```
   
   This is only slightly different from the convention you have already 
followed here,
   in that it uses underscores. The underscores help in clearly spelling out 
which method
   is being tested and under what conditions.
   
   That said, you may stick to the current convention if you find it more 
appealing since
   there is no established Druid naming convention yet as long as the test 
names make sense.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
     Assert.assertEquals("value", spec.getContextValue("key"));
   }
 
+  @Test
+  public void testValidateSpecUpdateToShortCircuits()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec originalSpec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    TestSeekableStreamSupervisorSpec proposedSpec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    assertThrows(
+        DruidException.class,
+        () -> originalSpec.validateSpecUpdateTo(proposedSpec)
+    );
+
+    OtherSupervisorSpec otherSpec = new OtherSupervisorSpec();
+    assertThrows(

Review Comment:
   Maybe use `DruidExceptionMatcher` to verify error message too.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -202,6 +205,55 @@ public boolean isSuspended()
     return suspended;
   }
 
+  /**
+   * Default implementation that prevents unsupported evolution of the 
supervisor spec
+   * <ul>
+   * <li>You cannot migrate between types of supervisors.</li>
+   * <li>You cannot change the input source stream of a running 
supervisor.</li>
+   * </ul>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec update is not allowed
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
+      throw InvalidInput.exception(
+          StringUtils.format("Cannot evolve to [%s] from [%s]", 
getClass().getName(), proposedSpec.getClass().getName())

Review Comment:
   ```suggestion
             "Cannot update supervisor spec from type[%s] to type[%s]", 
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
     Assert.assertEquals("value", spec.getContextValue("key"));
   }
 
+  @Test
+  public void testValidateSpecUpdateToShortCircuits()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec originalSpec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),

Review Comment:
   Nit: You may also use `Map.of(key, value)`



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +180,47 @@ protected KafkaSupervisorSpec toggleSuspend(boolean 
suspend)
     );
   }
 
+  /**
+   * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to 
ensure that the proposed spec and current spec are either both multi-topic or 
both single-topic.
+   * <p>
+   * getSource() returns the same string (exampleTopic) for 
"topicPattern=exampleTopic" and "topic=exampleTopic".
+   * This override prevents this case from being considered a valid update.
+   * </p>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec is not a Kafka spec or if the 
proposed spec changes from multi-topic to single-topic or vice versa
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
+      throw InvalidInput.exception(
+          StringUtils.format("Cannot evolve to [%s] from [%s]", 
getClass().getName(), proposedSpec.getClass().getName())

Review Comment:
   ```suggestion
             "Cannot change spec from type[%s] to type[%s]", 
getClass().getName(), proposedSpec.getClass().getName()
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -100,4 +101,19 @@ default Set<ResourceAction> getInputSourceResources() 
throws UnsupportedOperatio
    * @return source like stream or topic name
    */
   String getSource();
+
+  /**
+   * Checks if a spec can be replaced with a proposed spec (proposesSpec).
+   * <p>
+   * By default, this  method does no validation checks. Implementations of 
this method can choose to define rules
+   * for spec updates and throw an exception if the update is not allowed.
+   * </p>
+   *
+   * @param proposedSpec the proposed supervisor spec
+   * @throws IllegalArgumentException if the spec update is not allowed

Review Comment:
   I think the code throws `DruidException`s of type invalid input now.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -197,8 +197,17 @@ public boolean shouldUpdateSupervisor(SupervisorSpec spec)
       try {
         byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
         Pair<Supervisor, SupervisorSpec> currentSupervisor = 
supervisors.get(spec.getId());
-        return currentSupervisor == null
-               || !Arrays.equals(specAsBytes, 
jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
+        if (currentSupervisor == null || currentSupervisor.rhs == null) {
+          return true;
+        } else {
+          if (!Arrays.equals(specAsBytes, 
jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
+            // The spec bytes are different, so we need to check if the 
replacement is allowed
+            currentSupervisor.rhs.validateSpecUpdateTo(spec);
+            return true;
+          } else {
+            return false;
+          }
+        }

Review Comment:
   Cleaner with the nesting reduced and positive condition first:
   
   ```suggestion
           } else if (Arrays.equals(specAsBytes, 
jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
             return false;
           } else {
             // The spec bytes are different, so we need to check if the update 
is allowed
             currentSupervisor.rhs.validateSpecUpdateTo(spec);
             return true;
           }
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +180,47 @@ protected KafkaSupervisorSpec toggleSuspend(boolean 
suspend)
     );
   }
 
+  /**
+   * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to 
ensure that the proposed spec and current spec are either both multi-topic or 
both single-topic.
+   * <p>
+   * getSource() returns the same string (exampleTopic) for 
"topicPattern=exampleTopic" and "topic=exampleTopic".
+   * This override prevents this case from being considered a valid update.
+   * </p>
+   * @param proposedSpec the proposed supervisor spec
+   * @throws DruidException if the proposed spec is not a Kafka spec or if the 
proposed spec changes from multi-topic to single-topic or vice versa
+   */
+  @Override
+  public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws 
DruidException
+  {
+    if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
+      throw InvalidInput.exception(
+          StringUtils.format("Cannot evolve to [%s] from [%s]", 
getClass().getName(), proposedSpec.getClass().getName())
+      );
+    }
+    KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec;
+    if (this.getSpec().getIOConfig().isMultiTopic() != 
other.getSpec().getIOConfig().isMultiTopic()) {
+      throw 
InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage("(%s) %s", "(%s) 
%s"),
+                                   this.getSpec().getIOConfig().isMultiTopic() 
? "multi-topic" : "single-topic",
+                                   this.getSource(),
+                                   
other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic",
+                                   other.getSource());
+    }
+
+    super.validateSpecUpdateTo(proposedSpec);
+  }
+
+  @Override
+  protected String getIllegalInputSourceUpdateErrorMessage(String 
existingSource, String proposedSource)

Review Comment:
   We shouldn't need to override this method.
   We can either use the constant defined in `SeekableStreamSupervisorSpec` as 
is or a different String altogether.
   A cleaner way to include the type name in that String would be to have a 
placeholder for type and fill it with `getType()` while formatting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to