lhotari commented on code in PR #25980: URL: https://github.com/apache/pulsar/pull/25980#discussion_r3398002395
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java: ########## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.common.scalable.SegmentInfo; +import org.apache.pulsar.common.scalable.SegmentLoadStats; + +/** + * Pure, side-effect-free decision function for scalable-topic auto split/merge (PIP-483). + * + * <p>Given a snapshot of the current layout, per-segment load samples, per-subscription + * stream/checkpoint consumer counts, the resolved policy, and the current time, it returns + * exactly one {@link AutoScaleDecision}. It performs no I/O and holds no state — the caller + * (the controller leader) collects the inputs and dispatches the result. + * + * <p>It runs two passes and emits at most one action: + * <ol> + * <li><b>Split</b> (fast, lightly coalesced by {@code splitCooldown}): consumer-count + * scale-up first, then traffic-driven scale-up.</li> + * <li><b>Merge</b> (lazy, gated by {@code mergeCooldown} + {@code mergeWindow} + + * {@code maxDagDepth}): only if no split fired.</li> + * </ol> + */ +public final class AutoScalePolicyEvaluator { + + private AutoScalePolicyEvaluator() { + } + + /** + * Decide whether to split, merge, or do nothing. + * + * @param layout the current segment layout + * @param loadBySegment per active-segment load sample; a missing entry is treated + * as zero load with no age (never merge-eligible) + * @param streamConsumerCount per-subscription count of STREAM/CHECKPOINT (controller-managed) + * consumers; QUEUE subscriptions are excluded by the caller + * @param config the resolved policy + * @param nowMs current wall-clock time, epoch millis + * @param lastSplitAtMs epoch millis of the last split on this topic (manual or auto), + * or {@code Long.MIN_VALUE} if none + * @param lastMergeAtMs epoch millis of the last merge on this topic (manual or auto), + * or {@code Long.MIN_VALUE} if none + * @return the decision + */ + public static AutoScaleDecision decide( + SegmentLayout layout, + Map<Long, SegmentLoadSample> loadBySegment, + Map<String, Integer> streamConsumerCount, + AutoScaleConfig config, + long nowMs, + long lastSplitAtMs, + long lastMergeAtMs) { + + if (!config.enabled()) { + return AutoScaleDecision.NONE; + } + + List<SegmentInfo> active = new ArrayList<>(layout.getActiveSegments().values()); + + AutoScaleDecision split = trySplit(active, loadBySegment, streamConsumerCount, + config, nowMs, lastSplitAtMs); + if (!(split instanceof AutoScaleDecision.NoAction)) { + return split; + } + + return tryMerge(active, layout, loadBySegment, config, nowMs, lastMergeAtMs); + } + + // --- Split pass --- + + private static AutoScaleDecision trySplit( + List<SegmentInfo> active, + Map<Long, SegmentLoadSample> loadBySegment, + Map<String, Integer> streamConsumerCount, + AutoScaleConfig config, + long nowMs, + long lastSplitAtMs) { + + if (active.size() >= config.maxSegments()) { + return AutoScaleDecision.NONE; + } + if (withinCooldown(nowMs, lastSplitAtMs, config.splitCooldown().toMillis())) { + return AutoScaleDecision.NONE; + } + + // (a) Consumer-driven: per-subscription max. If any managed subscription has more + // consumers than there are active segments, add a segment so the 1:1 assignment can + // give the extra consumer its own segment. Split the busiest segment by msgRateIn so + // the new pair lands where it relieves the most ingest. + int requiredConsumers = streamConsumerCount.values().stream() + .mapToInt(Integer::intValue).max().orElse(0); + if (requiredConsumers > active.size()) { + SegmentInfo target = busiestByMsgRateIn(active, loadBySegment); + if (target != null) { + return new AutoScaleDecision.Split(target.segmentId(), "consumer-count"); + } + } + + // (b) Load-driven: split the segment with the highest overload score among those over + // at least one split threshold. + SegmentInfo hottest = null; + double hottestScore = 1.0; // strictly over threshold means a per-metric ratio > 1.0 + String hottestReason = null; + for (SegmentInfo segment : active) { + SegmentLoadStats stats = statsOf(segment.segmentId(), loadBySegment); + double score = 0.0; + String reason = null; + double[] ratios = { + stats.msgRateIn() / config.splitMsgRateIn(), + stats.bytesRateIn() / config.splitBytesRateIn(), + stats.msgRateOut() / config.splitMsgRateOut(), + stats.bytesRateOut() / config.splitBytesRateOut(), Review Comment: does divide by 0 (NaN for double) behave in a correct way? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java: ########## @@ -237,6 +257,142 @@ private void runGcTickSafely() { } } + // --- Auto split/merge (PIP-483) --- + + /** + * Schedule the periodic traffic-driven auto split/merge evaluation. Only fires on the + * controller leader; idempotent. Cancelled on close / leader-loss. Consumer-count + * changes are handled event-driven (see {@link #onConsumerCountChanged()}), not by this + * tick. + */ + private synchronized void scheduleAutoScaleTask() { + if (closed || autoScaleTask != null) { + return; + } + ServiceConfiguration config = brokerConfig(); + if (config == null || !config.isScalableTopicAutoScaleEnabled()) { + return; + } + long intervalMs = Duration.ofSeconds( + config.getScalableTopicAutoScaleIntervalSeconds()).toMillis(); + if (intervalMs <= 0) { + return; + } + autoScaleTask = scheduler().scheduleAtFixedRate( + () -> runAutoScaleSafely("tick"), intervalMs, intervalMs, TimeUnit.MILLISECONDS); + } + + private synchronized void cancelAutoScaleTask() { + if (autoScaleTask != null) { + autoScaleTask.cancel(false); + autoScaleTask = null; + } + } + + /** + * Event-driven trigger: a stream/checkpoint consumer registered or unregistered, which + * may change the per-subscription consumer count. Evaluates the consumer-count split rule + * within seconds rather than waiting for the periodic tick. + */ + private void onConsumerCountChanged() { + runAutoScaleSafely("consumer-change"); + } + + private void runAutoScaleSafely(String trigger) { + if (!isLeader() || closed) { + return; + } + try { + evaluateAndAct(trigger).exceptionally(ex -> { + log.warn().attr("trigger", trigger).exceptionMessage(ex) + .log("Auto split/merge evaluation failed"); + return null; + }); + } catch (Throwable t) { + log.warn().attr("trigger", trigger).exception(t) + .log("Auto split/merge evaluation threw"); + } + } + + /** + * Collect the current inputs, run the pure {@link AutoScalePolicyEvaluator}, and dispatch + * the resulting action. At most one auto operation runs at a time: {@link #autoScaleInFlight} + * is held from before the decision through the end of the dispatched split/merge. + */ + private CompletableFuture<Void> evaluateAndAct(String trigger) { + ServiceConfiguration brokerConfig = brokerConfig(); + if (brokerConfig == null) { + return CompletableFuture.completedFuture(null); + } + AutoScaleConfig config = AutoScaleConfig.fromBrokerConfig(brokerConfig); + if (!config.enabled()) { + return CompletableFuture.completedFuture(null); + } + if (!autoScaleInFlight.compareAndSet(false, true)) { Review Comment: How does the concurrency of auto scaling logic work in the case that the user performs "manual" split or merge with org.apache.pulsar.broker.admin.v2.ScalableTopics#splitSegment (`/{tenant}/{namespace}/{topic}/split/{segmentId}`) and org.apache.pulsar.broker.admin.v2.ScalableTopics#mergeSegments (`/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}`) REST APIs? Is it planned as part of PIP-483 to be able to instead of splitting and merging manually with the REST API to change `maxSegments`, `minSegments` and `maxDagDepth` per scalable topic? It could be useful to have some setting to control whether the change is made immediately or only when the auto scaling rules take place. I guess scaling immediately after increasing `minSegments` would have some separate challenges for picking the segments which are worth splitting. I guess it could be done based on load stats also in that case. Decreasing `maxSegments` would be a similar case but in the other direction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
