kfaraz commented on code in PR #17955:
URL: https://github.com/apache/druid/pull/17955#discussion_r2065408957
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ @Override
+ public void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException
+ {
+ if (!(that instanceof KafkaSupervisorSpec)) {
+ throw new IllegalArgumentException("Cannot evolve to " + that.getType()
+ " from " + getType());
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+ if (this.getSource() == null || other.getSource() == null) {
+ // I don't think this is possible, but covering just in case.
+ throw new IllegalArgumentException(
+ "Cannot consider KafkaSupervisorSpec evolution when one or both of
the specs have not provided either a "
+ + "topic OR topicPattern");
+ }
+
+ // Future enhancements could allow for topicPattern to be changed in a way
where the new source is additive to the
+ // old source. If we did that now, there would be metadata issues due to
{@link KafkaDataSourceMetadata}
+ // implementation details that aren't set up to handle evolution of
metadata in this way.
+ if (!this.getSource().equals(other.getSource())) {
Review Comment:
Should this impl be present for all `SeekableStreamSupervisorSpec`s and not
just Kafka?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java:
##########
@@ -120,6 +120,12 @@ public String getSource()
return "";
}
+ @Override
+ public void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException
+ {
+ // No validation logic for compaction spec as of now
Review Comment:
Seems like this should have been the default impl in the interface itself.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ @Override
+ public void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException
+ {
+ if (!(that instanceof KafkaSupervisorSpec)) {
+ throw new IllegalArgumentException("Cannot evolve to " + that.getType()
+ " from " + getType());
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+ if (this.getSource() == null || other.getSource() == null) {
+ // I don't think this is possible, but covering just in case.
Review Comment:
```suggestion
// Not likely to happen, but covering just in case.
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -100,4 +100,16 @@ default Set<ResourceAction> getInputSourceResources()
throws UnsupportedOperatio
* @return source like stream or topic name
*/
String getSource();
+
+ /**
+ * Checks if a proposed evolution of the supervisor spec is allowed.
+ * <p>
+ * SupervisorSpec `that` is proposed to replace the current supervisor spec.
Implementations of this method determine
+ * if the system should allow this evolution.
+ * </p>
+ *
+ * @param that the proposed supervisor spec
+ * @throws IllegalArgumentException if the evolution is not allowed
+ */
+ void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException;
Review Comment:
Maybe use a simpler method name like `validateSpecUpdateTo`, and the arg can
be called `proposedSpec`.
We should throw a DruidException (of type not supported or invalid input)
instead.
Also, please add a default impl to avoid having to override in all impls
where this is not needed right now.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ @Override
+ public void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException
+ {
+ if (!(that instanceof KafkaSupervisorSpec)) {
+ throw new IllegalArgumentException("Cannot evolve to " + that.getType()
+ " from " + getType());
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+ if (this.getSource() == null || other.getSource() == null) {
+ // I don't think this is possible, but covering just in case.
+ throw new IllegalArgumentException(
+ "Cannot consider KafkaSupervisorSpec evolution when one or both of
the specs have not provided either a "
+ + "topic OR topicPattern");
+ }
+
+ // Future enhancements could allow for topicPattern to be changed in a way
where the new source is additive to the
+ // old source. If we did that now, there would be metadata issues due to
{@link KafkaDataSourceMetadata}
+ // implementation details that aren't set up to handle evolution of
metadata in this way.
+ if (!this.getSource().equals(other.getSource())) {
+ throw new IllegalArgumentException(
+ "Your proposed KafkaSupervisorSpec evolution is invalid. You are
attempting to change the topic/topicPattern "
+ + "from " + this.getSource() + " to " + other.getSource() + ". This
is not supported. If you "
+ + "want to change the topic or topicPattern for a supervisor, you
must first terminate the supervisor. "
+ + "Then create a new one in suspended state with the new topic or
topicPattern. Lastly, you will have to "
+ + "reset the supervisor offsets. Finally, you can resume the new
supervisor. Note that doing this reset can "
+ + "cause duplicate events or lost events if any topics who were in
the previous supervisor remain in the new "
+ + "one.");
Review Comment:
Suggestion for more concise language and formatted message.
```suggestion
"Update of topic/topicPattern from [%s] to [%s] is not supported
for a running Kafka supervisor."
+ "%nTo perform the update safely, follow these steps.
+ "%n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ "%n(2) Create a new supervisor with the new topic or
topicPattern."
+ "%nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too.");
```
I have updated the steps, please fix it up if it doesn't seem correct.
##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -134,6 +134,12 @@ If you enable multi-topic ingestion for a datasource,
downgrading to a version o
28.0.0 will cause the ingestion for that datasource to fail.
:::
+:::info
+Migrating an existing supervisor to use `topicPattern` instead of `topic` is
not supported. It is also not supported to change the `topicPattern` of an
existing supervisor to a different regex pattern.
Review Comment:
Thanks for adding this.
Maybe you can put it in a list format for readability like
```
The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -174,6 +174,12 @@ public boolean
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
+ if (shouldUpdateSpec) {
+ SupervisorSpec existingSpec = getSpec(spec.getId());
+ if (existingSpec != null) {
+ existingSpec.validateProposedSpecEvolution(spec);
+ }
Review Comment:
This logic should probably live in `shouldUpdateSupervisor` method itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]