kfaraz commented on code in PR #17955:
URL: https://github.com/apache/druid/pull/17955#discussion_r2065417350
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ @Override
+ public void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException
+ {
+ if (!(that instanceof KafkaSupervisorSpec)) {
+ throw new IllegalArgumentException("Cannot evolve to " + that.getType()
+ " from " + getType());
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+ if (this.getSource() == null || other.getSource() == null) {
+ // I don't think this is possible, but covering just in case.
+ throw new IllegalArgumentException(
+ "Cannot consider KafkaSupervisorSpec evolution when one or both of
the specs have not provided either a "
+ + "topic OR topicPattern");
+ }
+
+ // Future enhancements could allow for topicPattern to be changed in a way
where the new source is additive to the
+ // old source. If we did that now, there would be metadata issues due to
{@link KafkaDataSourceMetadata}
+ // implementation details that aren't set up to handle evolution of
metadata in this way.
+ if (!this.getSource().equals(other.getSource())) {
+ throw new IllegalArgumentException(
+ "Your proposed KafkaSupervisorSpec evolution is invalid. You are
attempting to change the topic/topicPattern "
+ + "from " + this.getSource() + " to " + other.getSource() + ". This
is not supported. If you "
+ + "want to change the topic or topicPattern for a supervisor, you
must first terminate the supervisor. "
+ + "Then create a new one in suspended state with the new topic or
topicPattern. Lastly, you will have to "
+ + "reset the supervisor offsets. Finally, you can resume the new
supervisor. Note that doing this reset can "
+ + "cause duplicate events or lost events if any topics who were in
the previous supervisor remain in the new "
+ + "one.");
Review Comment:
Suggestion for more concise language and formatted message.
```suggestion
"Update of topic/topicPattern from [%s] to [%s] is not supported
for a running Kafka supervisor."
+ "%nTo perform the update safely, follow these steps."
+ "%n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ "%n(2) Create a new supervisor with the new topic or
topicPattern."
+ "%nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too.");
```
I have updated the steps, please fix it up if it doesn't seem correct.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java:
##########
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean
suspend)
);
}
+ @Override
+ public void validateProposedSpecEvolution(SupervisorSpec that) throws
IllegalArgumentException
+ {
+ if (!(that instanceof KafkaSupervisorSpec)) {
+ throw new IllegalArgumentException("Cannot evolve to " + that.getType()
+ " from " + getType());
+ }
+ KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
+ if (this.getSource() == null || other.getSource() == null) {
+ // I don't think this is possible, but covering just in case.
+ throw new IllegalArgumentException(
+ "Cannot consider KafkaSupervisorSpec evolution when one or both of
the specs have not provided either a "
+ + "topic OR topicPattern");
+ }
+
+ // Future enhancements could allow for topicPattern to be changed in a way
where the new source is additive to the
+ // old source. If we did that now, there would be metadata issues due to
{@link KafkaDataSourceMetadata}
+ // implementation details that aren't set up to handle evolution of
metadata in this way.
+ if (!this.getSource().equals(other.getSource())) {
+ throw new IllegalArgumentException(
+ "Your proposed KafkaSupervisorSpec evolution is invalid. You are
attempting to change the topic/topicPattern "
+ + "from " + this.getSource() + " to " + other.getSource() + ". This
is not supported. If you "
+ + "want to change the topic or topicPattern for a supervisor, you
must first terminate the supervisor. "
+ + "Then create a new one in suspended state with the new topic or
topicPattern. Lastly, you will have to "
+ + "reset the supervisor offsets. Finally, you can resume the new
supervisor. Note that doing this reset can "
+ + "cause duplicate events or lost events if any topics who were in
the previous supervisor remain in the new "
+ + "one.");
Review Comment:
Suggestion for more concise language and formatted message.
```suggestion
"Update of topic/topicPattern from [%s] to [%s] is not supported
for a running Kafka supervisor."
+ "%nTo perform the update safely, follow these steps:"
+ "%n(1) Suspend this supervisor, reset its offsets and then
terminate it. "
+ "%n(2) Create a new supervisor with the new topic or
topicPattern."
+ "%nNote that doing the reset can cause data duplication or loss
if any topic used in the old supervisor is included in the new one too.");
```
I have updated the steps, please fix it up if it doesn't seem correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]