FrankChen021 commented on code in PR #19541:
URL: https://github.com/apache/druid/pull/19541#discussion_r3348714774
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -186,14 +186,101 @@ public boolean
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
- final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
+ // Persist whenever the spec actually changed (or is new) — independent
of whether a restart is
+ // required. This stops/recreates the supervisor regardless; persistence
must not be gated on the
+ // restart decision, otherwise a no-restart change (e.g. taskCount under
autoscaling) would be
+ // applied to the running supervisor but lost from the metadata store.
+ final boolean specChanged = isSpecChangedAndValidate(spec);
SupervisorSpec existingSpec =
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
spec.merge(existingSpec);
- createAndStartSupervisorInternal(spec, shouldUpdateSpec);
- return shouldUpdateSpec;
+ createAndStartSupervisorInternal(spec, specChanged);
+ return specChanged;
}
}
+ /**
+ * Outcome of {@link #createOrUpdateAndStartSupervisor(SupervisorSpec,
boolean)}.
+ */
+ public enum SpecUpdateOutcome
+ {
+ /** Spec was byte-identical to the running spec and {@code
skipRestartIfUnmodified} was set: nothing done. */
+ UNCHANGED,
+ /** Spec changed but did not require a restart: persisted to metadata,
running supervisor left in place. */
+ PERSISTED_WITHOUT_RESTART,
+ /** Supervisor was (re)created and started; spec persisted if it changed.
*/
+ RESTARTED
+ }
+
+ /**
+ * Decides whether the submitted spec needs a restart and applies it under a
single lock, so the decision
+ * cannot go stale between deciding and acting (which would let a concurrent
POST drop a write or persist a
+ * spec that the running supervisor needs to be recreated for). With {@code
skipRestartIfUnmodified} set, an
+ * unchanged spec is a no-op and a changed spec whose {@link
SupervisorSpec#requireRestart} is false (e.g. a
+ * taskCount change under autoscaling) is persisted without recreating the
supervisor; otherwise the
+ * supervisor is stopped and recreated (the only behavior when the flag is
false).
+ */
+ public SpecUpdateOutcome createOrUpdateAndStartSupervisor(SupervisorSpec
spec, boolean skipRestartIfUnmodified)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(spec, "spec");
+ Preconditions.checkNotNull(spec.getId(), "spec.getId()");
+ Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
+
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ final Pair<Supervisor, SupervisorSpec> current =
supervisors.get(spec.getId());
+ final boolean isNew = current == null || current.rhs == null;
+ final boolean specChanged = isSpecChangedAndValidate(spec);
+
+ if (skipRestartIfUnmodified && !isNew) {
+ if (!specChanged) {
+ return SpecUpdateOutcome.UNCHANGED;
+ }
+ if (!spec.requireRestart(current.rhs)) {
Review Comment:
[P2] Compare restart requirements after merging carried-forward fields
The skip-restart path calls requireRestart before merge. For seekable-stream
resubmissions where ioConfig.taskCount is omitted, merge would carry the
current taskCount forward, making the effective spec unchanged, but
requireRestart sees the constructor default first and can force an unnecessary
stop/recreate. Compare against a merged copy, or teach requireRestart to ignore
unset taskCount.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -303,4 +371,112 @@ public abstract SeekableStreamSupervisorSpec
createBackfillSpec(
@Nullable Integer taskCount
);
+ /**
+ * Returns a builder pre-populated with this spec's values (including
injected services), so callers
+ * can produce a modified copy without mutating this instance. Subclasses
return their own builder.
+ */
+ public abstract Builder<?> toBuilder();
Review Comment:
[P2] Avoid making toBuilder abstract for extension subclasses
SeekableStreamSupervisorSpec is a public base for stream supervisor specs,
and this new abstract method forces downstream extension subclasses to
implement a new API. Existing extension jars can fail when requireRestart calls
it. Keep the default behavior conservative by providing a non-abstract fallback
that reports restart-required when no builder is available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]