FrankChen021 commented on code in PR #19541:
URL: https://github.com/apache/druid/pull/19541#discussion_r3348714774


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -186,14 +186,101 @@ public boolean 
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
 
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
-      final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
+      // Persist whenever the spec actually changed (or is new) — independent 
of whether a restart is
+      // required. This stops/recreates the supervisor regardless; persistence 
must not be gated on the
+      // restart decision, otherwise a no-restart change (e.g. taskCount under 
autoscaling) would be
+      // applied to the running supervisor but lost from the metadata store.
+      final boolean specChanged = isSpecChangedAndValidate(spec);
       SupervisorSpec existingSpec = 
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
       spec.merge(existingSpec);
-      createAndStartSupervisorInternal(spec, shouldUpdateSpec);
-      return shouldUpdateSpec;
+      createAndStartSupervisorInternal(spec, specChanged);
+      return specChanged;
     }
   }
 
+  /**
+   * Outcome of {@link #createOrUpdateAndStartSupervisor(SupervisorSpec, 
boolean)}.
+   */
+  public enum SpecUpdateOutcome
+  {
+    /** Spec was byte-identical to the running spec and {@code 
skipRestartIfUnmodified} was set: nothing done. */
+    UNCHANGED,
+    /** Spec changed but did not require a restart: persisted to metadata, 
running supervisor left in place. */
+    PERSISTED_WITHOUT_RESTART,
+    /** Supervisor was (re)created and started; spec persisted if it changed. 
*/
+    RESTARTED
+  }
+
+  /**
+   * Decides whether the submitted spec needs a restart and applies it under a 
single lock, so the decision
+   * cannot go stale between deciding and acting (which would let a concurrent 
POST drop a write or persist a
+   * spec that the running supervisor needs to be recreated for). With {@code 
skipRestartIfUnmodified} set, an
+   * unchanged spec is a no-op and a changed spec whose {@link 
SupervisorSpec#requireRestart} is false (e.g. a
+   * taskCount change under autoscaling) is persisted without recreating the 
supervisor; otherwise the
+   * supervisor is stopped and recreated (the only behavior when the flag is 
false).
+   */
+  public SpecUpdateOutcome createOrUpdateAndStartSupervisor(SupervisorSpec 
spec, boolean skipRestartIfUnmodified)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(spec, "spec");
+    Preconditions.checkNotNull(spec.getId(), "spec.getId()");
+    Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
+
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      final Pair<Supervisor, SupervisorSpec> current = 
supervisors.get(spec.getId());
+      final boolean isNew = current == null || current.rhs == null;
+      final boolean specChanged = isSpecChangedAndValidate(spec);
+
+      if (skipRestartIfUnmodified && !isNew) {
+        if (!specChanged) {
+          return SpecUpdateOutcome.UNCHANGED;
+        }
+        if (!spec.requireRestart(current.rhs)) {

Review Comment:
   [P2] Compare restart requirements after merging carried-forward fields
   
   The skip-restart path calls requireRestart before merge. For seekable-stream 
resubmissions where ioConfig.taskCount is omitted, merge would carry the 
current taskCount forward, making the effective spec unchanged, but 
requireRestart sees the constructor default first and can force an unnecessary 
stop/recreate. Compare against a merged copy, or teach requireRestart to ignore 
unset taskCount.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -303,4 +371,112 @@ public abstract SeekableStreamSupervisorSpec 
createBackfillSpec(
       @Nullable Integer taskCount
   );
 
+  /**
+   * Returns a builder pre-populated with this spec's values (including 
injected services), so callers
+   * can produce a modified copy without mutating this instance. Subclasses 
return their own builder.
+   */
+  public abstract Builder<?> toBuilder();

Review Comment:
   [P2] Avoid making toBuilder abstract for extension subclasses
   
   SeekableStreamSupervisorSpec is a public base for stream supervisor specs, 
and this new abstract method forces downstream extension subclasses to 
implement a new API. Existing extension jars can fail when requireRestart calls 
it. Keep the default behavior conservative by providing a non-abstract fallback 
that reports restart-required when no builder is available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to