lhotari commented on code in PR #25980:
URL: https://github.com/apache/pulsar/pull/25980#discussion_r3398002395


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * Pure, side-effect-free decision function for scalable-topic auto 
split/merge (PIP-483).
+ *
+ * <p>Given a snapshot of the current layout, per-segment load samples, 
per-subscription
+ * stream/checkpoint consumer counts, the resolved policy, and the current 
time, it returns
+ * exactly one {@link AutoScaleDecision}. It performs no I/O and holds no 
state — the caller
+ * (the controller leader) collects the inputs and dispatches the result.
+ *
+ * <p>It runs two passes and emits at most one action:
+ * <ol>
+ *   <li><b>Split</b> (fast, lightly coalesced by {@code splitCooldown}): 
consumer-count
+ *       scale-up first, then traffic-driven scale-up.</li>
+ *   <li><b>Merge</b> (lazy, gated by {@code mergeCooldown} + {@code 
mergeWindow} +
+ *       {@code maxDagDepth}): only if no split fired.</li>
+ * </ol>
+ */
+public final class AutoScalePolicyEvaluator {
+
+    private AutoScalePolicyEvaluator() {
+    }
+
+    /**
+     * Decide whether to split, merge, or do nothing.
+     *
+     * @param layout              the current segment layout
+     * @param loadBySegment       per active-segment load sample; a missing 
entry is treated
+     *                            as zero load with no age (never 
merge-eligible)
+     * @param streamConsumerCount per-subscription count of STREAM/CHECKPOINT 
(controller-managed)
+     *                            consumers; QUEUE subscriptions are excluded 
by the caller
+     * @param config              the resolved policy
+     * @param nowMs               current wall-clock time, epoch millis
+     * @param lastSplitAtMs       epoch millis of the last split on this topic 
(manual or auto),
+     *                            or {@code Long.MIN_VALUE} if none
+     * @param lastMergeAtMs       epoch millis of the last merge on this topic 
(manual or auto),
+     *                            or {@code Long.MIN_VALUE} if none
+     * @return the decision
+     */
+    public static AutoScaleDecision decide(
+            SegmentLayout layout,
+            Map<Long, SegmentLoadSample> loadBySegment,
+            Map<String, Integer> streamConsumerCount,
+            AutoScaleConfig config,
+            long nowMs,
+            long lastSplitAtMs,
+            long lastMergeAtMs) {
+
+        if (!config.enabled()) {
+            return AutoScaleDecision.NONE;
+        }
+
+        List<SegmentInfo> active = new 
ArrayList<>(layout.getActiveSegments().values());
+
+        AutoScaleDecision split = trySplit(active, loadBySegment, 
streamConsumerCount,
+                config, nowMs, lastSplitAtMs);
+        if (!(split instanceof AutoScaleDecision.NoAction)) {
+            return split;
+        }
+
+        return tryMerge(active, layout, loadBySegment, config, nowMs, 
lastMergeAtMs);
+    }
+
+    // --- Split pass ---
+
+    private static AutoScaleDecision trySplit(
+            List<SegmentInfo> active,
+            Map<Long, SegmentLoadSample> loadBySegment,
+            Map<String, Integer> streamConsumerCount,
+            AutoScaleConfig config,
+            long nowMs,
+            long lastSplitAtMs) {
+
+        if (active.size() >= config.maxSegments()) {
+            return AutoScaleDecision.NONE;
+        }
+        if (withinCooldown(nowMs, lastSplitAtMs, 
config.splitCooldown().toMillis())) {
+            return AutoScaleDecision.NONE;
+        }
+
+        // (a) Consumer-driven: per-subscription max. If any managed 
subscription has more
+        // consumers than there are active segments, add a segment so the 1:1 
assignment can
+        // give the extra consumer its own segment. Split the busiest segment 
by msgRateIn so
+        // the new pair lands where it relieves the most ingest.
+        int requiredConsumers = streamConsumerCount.values().stream()
+                .mapToInt(Integer::intValue).max().orElse(0);
+        if (requiredConsumers > active.size()) {
+            SegmentInfo target = busiestByMsgRateIn(active, loadBySegment);
+            if (target != null) {
+                return new AutoScaleDecision.Split(target.segmentId(), 
"consumer-count");
+            }
+        }
+
+        // (b) Load-driven: split the segment with the highest overload score 
among those over
+        // at least one split threshold.
+        SegmentInfo hottest = null;
+        double hottestScore = 1.0; // strictly over threshold means a 
per-metric ratio > 1.0
+        String hottestReason = null;
+        for (SegmentInfo segment : active) {
+            SegmentLoadStats stats = statsOf(segment.segmentId(), 
loadBySegment);
+            double score = 0.0;
+            String reason = null;
+            double[] ratios = {
+                    stats.msgRateIn() / config.splitMsgRateIn(),
+                    stats.bytesRateIn() / config.splitBytesRateIn(),
+                    stats.msgRateOut() / config.splitMsgRateOut(),
+                    stats.bytesRateOut() / config.splitBytesRateOut(),

Review Comment:
   does divide by 0 (NaN for double) behave in a correct way?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -237,6 +257,142 @@ private void runGcTickSafely() {
         }
     }
 
+    // --- Auto split/merge (PIP-483) ---
+
+    /**
+     * Schedule the periodic traffic-driven auto split/merge evaluation. Only 
fires on the
+     * controller leader; idempotent. Cancelled on close / leader-loss. 
Consumer-count
+     * changes are handled event-driven (see {@link 
#onConsumerCountChanged()}), not by this
+     * tick.
+     */
+    private synchronized void scheduleAutoScaleTask() {
+        if (closed || autoScaleTask != null) {
+            return;
+        }
+        ServiceConfiguration config = brokerConfig();
+        if (config == null || !config.isScalableTopicAutoScaleEnabled()) {
+            return;
+        }
+        long intervalMs = Duration.ofSeconds(
+                config.getScalableTopicAutoScaleIntervalSeconds()).toMillis();
+        if (intervalMs <= 0) {
+            return;
+        }
+        autoScaleTask = scheduler().scheduleAtFixedRate(
+                () -> runAutoScaleSafely("tick"), intervalMs, intervalMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void cancelAutoScaleTask() {
+        if (autoScaleTask != null) {
+            autoScaleTask.cancel(false);
+            autoScaleTask = null;
+        }
+    }
+
+    /**
+     * Event-driven trigger: a stream/checkpoint consumer registered or 
unregistered, which
+     * may change the per-subscription consumer count. Evaluates the 
consumer-count split rule
+     * within seconds rather than waiting for the periodic tick.
+     */
+    private void onConsumerCountChanged() {
+        runAutoScaleSafely("consumer-change");
+    }
+
+    private void runAutoScaleSafely(String trigger) {
+        if (!isLeader() || closed) {
+            return;
+        }
+        try {
+            evaluateAndAct(trigger).exceptionally(ex -> {
+                log.warn().attr("trigger", trigger).exceptionMessage(ex)
+                        .log("Auto split/merge evaluation failed");
+                return null;
+            });
+        } catch (Throwable t) {
+            log.warn().attr("trigger", trigger).exception(t)
+                    .log("Auto split/merge evaluation threw");
+        }
+    }
+
+    /**
+     * Collect the current inputs, run the pure {@link 
AutoScalePolicyEvaluator}, and dispatch
+     * the resulting action. At most one auto operation runs at a time: {@link 
#autoScaleInFlight}
+     * is held from before the decision through the end of the dispatched 
split/merge.
+     */
+    private CompletableFuture<Void> evaluateAndAct(String trigger) {
+        ServiceConfiguration brokerConfig = brokerConfig();
+        if (brokerConfig == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        AutoScaleConfig config = 
AutoScaleConfig.fromBrokerConfig(brokerConfig);
+        if (!config.enabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (!autoScaleInFlight.compareAndSet(false, true)) {

Review Comment:
   How does the concurrency of auto scaling logic work in the case that the 
user performs "manual" split or merge
   with org.apache.pulsar.broker.admin.v2.ScalableTopics#splitSegment 
(`/{tenant}/{namespace}/{topic}/split/{segmentId}`) and 
org.apache.pulsar.broker.admin.v2.ScalableTopics#mergeSegments 
(`/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}`) REST APIs?
   
   Is it planned as part of PIP-483 to be able to instead of splitting and 
merging manually with the REST API to change `maxSegments`, `minSegments` and 
`maxDagDepth` per scalable topic? It could be useful to have some setting to 
control whether the change is made immediately or only when the auto scaling 
rules take place. I guess scaling immediately after increasing `minSegments` 
would have some separate challenges for picking the segments which are worth 
splitting. I guess it could be done based on load stats also in that case. 
Decreasing `maxSegments` would be a similar case but in the other direction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to